Edit on GitHub

sqlmesh.core.plan.explainer

  1from __future__ import annotations
  2
  3import abc
  4import typing as t
  5import logging
  6from dataclasses import dataclass
  7from collections import defaultdict
  8
  9from rich.console import Console as RichConsole
 10from rich.tree import Tree
 11from sqlglot.dialects.dialect import DialectType
 12from sqlmesh.core import constants as c
 13from sqlmesh.core.console import Console, TerminalConsole, get_console
 14from sqlmesh.core.environment import EnvironmentNamingInfo
 15from sqlmesh.core.plan.common import (
 16    SnapshotIntervalClearRequest,
 17    identify_restatement_intervals_across_snapshot_versions,
 18)
 19from sqlmesh.core.plan.definition import EvaluatablePlan, SnapshotIntervals
 20from sqlmesh.core.plan import stages
 21from sqlmesh.core.plan.evaluator import (
 22    PlanEvaluator,
 23)
 24from sqlmesh.core.state_sync import StateReader
 25from sqlmesh.core.snapshot.definition import (
 26    SnapshotInfoMixin,
 27    SnapshotIdAndVersion,
 28    model_display_name,
 29)
 30from sqlmesh.utils import Verbosity, rich as srich, to_snake_case
 31from sqlmesh.utils.date import to_ts
 32from sqlmesh.utils.errors import SQLMeshError
 33
 34
 35logger = logging.getLogger(__name__)
 36
 37
 38class PlanExplainer(PlanEvaluator):
 39    def __init__(
 40        self,
 41        state_reader: StateReader,
 42        default_catalog: t.Optional[str],
 43        console: t.Optional[Console] = None,
 44    ):
 45        self.state_reader = state_reader
 46        self.default_catalog = default_catalog
 47        self.console = console or get_console()
 48
 49    def evaluate(
 50        self,
 51        plan: EvaluatablePlan,
 52        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
 53    ) -> None:
 54        plan_stages = stages.build_plan_stages(plan, self.state_reader, self.default_catalog)
 55        explainer_console = _get_explainer_console(
 56            self.console, plan.environment, self.default_catalog
 57        )
 58
 59        # add extra metadata that's only needed at this point for better --explain output
 60        plan_stages = [
 61            ExplainableRestatementStage.from_restatement_stage(stage, self.state_reader, plan)
 62            if isinstance(stage, stages.RestatementStage)
 63            else stage
 64            for stage in plan_stages
 65        ]
 66
 67        explainer_console.explain(plan_stages)
 68
 69
 70class ExplainerConsole(abc.ABC):
 71    @abc.abstractmethod
 72    def explain(self, stages: t.List[stages.PlanStage]) -> None:
 73        pass
 74
 75
 76@dataclass
 77class ExplainableRestatementStage(stages.RestatementStage):
 78    """
 79    This brings forward some calculations that would usually be done in the evaluator so the user can be given a better indication
 80    of what might happen when they ask for the plan to be explained
 81    """
 82
 83    snapshot_intervals_to_clear: t.Dict[str, t.List[SnapshotIntervalClearRequest]]
 84    """Which snapshots from other environments would have intervals cleared as part of restatement, grouped by name."""
 85
 86    @classmethod
 87    def from_restatement_stage(
 88        cls: t.Type[ExplainableRestatementStage],
 89        stage: stages.RestatementStage,
 90        state_reader: StateReader,
 91        plan: EvaluatablePlan,
 92    ) -> ExplainableRestatementStage:
 93        all_restatement_intervals = identify_restatement_intervals_across_snapshot_versions(
 94            state_reader=state_reader,
 95            prod_restatements=plan.restatements,
 96            disable_restatement_models=plan.disabled_restatement_models,
 97            loaded_snapshots={s.snapshot_id: s for s in stage.all_snapshots.values()},
 98        )
 99
100        # Group the interval clear requests by snapshot name to make them easier to write to the console
101        snapshot_intervals_to_clear = defaultdict(list)
102        for clear_request in all_restatement_intervals.values():
103            snapshot_intervals_to_clear[clear_request.snapshot.name].append(clear_request)
104
105        return cls(
106            snapshot_intervals_to_clear=snapshot_intervals_to_clear,
107            all_snapshots=stage.all_snapshots,
108        )
109
110
111MAX_TREE_LENGTH = 10
112
113
114class RichExplainerConsole(ExplainerConsole):
115    def __init__(
116        self,
117        environment_naming_info: EnvironmentNamingInfo,
118        dialect: DialectType,
119        default_catalog: t.Optional[str],
120        verbosity: Verbosity = Verbosity.DEFAULT,
121        console: t.Optional[RichConsole] = None,
122    ):
123        self.environment_naming_info = environment_naming_info
124        self.dialect = dialect
125        self.default_catalog = default_catalog
126        self.verbosity = verbosity
127        self.console: RichConsole = console or srich.console
128
129    def explain(self, stages: t.List[stages.PlanStage]) -> None:
130        tree = Tree("[bold]Explained plan[/bold]")
131        for stage in stages:
132            handler_name = f"visit_{to_snake_case(stage.__class__.__name__)}"
133            if not hasattr(self, handler_name):
134                logger.error("Unexpected stage: %s", stage.__class__.__name__)
135                continue
136            handler = getattr(self, handler_name)
137            result = handler(stage)
138            if result:
139                tree.add(self._limit_tree(result))
140        self.console.print(tree)
141
142    def visit_before_all_stage(self, stage: stages.BeforeAllStage) -> Tree:
143        return Tree("[bold]Execute before all statements[/bold]")
144
145    def visit_after_all_stage(self, stage: stages.AfterAllStage) -> Tree:
146        return Tree("[bold]Execute after all statements[/bold]")
147
148    def visit_physical_layer_update_stage(self, stage: stages.PhysicalLayerUpdateStage) -> Tree:
149        snapshots = [
150            s for s in stage.snapshots if s.snapshot_id in stage.snapshots_with_missing_intervals
151        ]
152        if not snapshots:
153            return Tree("[bold]SKIP: No physical layer updates to perform[/bold]")
154
155        tree = Tree(
156            "[bold]Validate SQL and create physical layer tables and views if they do not exist[/bold]"
157        )
158        for snapshot in snapshots:
159            is_deployable = (
160                stage.deployability_index.is_deployable(snapshot)
161                if self.environment_naming_info.name != c.PROD
162                else True
163            )
164            display_name = self._display_name(snapshot)
165            table_name = snapshot.table_name(is_deployable)
166            model_tree = Tree(f"{display_name} -> {table_name}")
167
168            if snapshot.is_model:
169                if snapshot.model.pre_statements:
170                    model_tree.add("Run pre-statements")
171                if snapshot.model.annotated:
172                    model_tree.add("Dry run model query without inserting results")
173
174            if snapshot.is_view:
175                create_tree = Tree("Create view if it doesn't exist")
176            elif (
177                snapshot.is_forward_only and snapshot.previous_versions and not snapshot.is_managed
178            ):
179                prod_table = snapshot.table_name(True)
180                create_tree = Tree(
181                    f"Clone {prod_table} into {table_name} and then update its schema if it doesn't exist"
182                )
183            else:
184                create_tree = Tree("Create table if it doesn't exist")
185
186            if not is_deployable:
187                create_tree.add("[orange1]preview[/orange1]: data will NOT be reused in production")
188            model_tree.add(create_tree)
189
190            if snapshot.is_model and snapshot.model.post_statements:
191                model_tree.add("Run post-statements")
192
193            tree.add(model_tree)
194        return tree
195
196    def visit_audit_only_run_stage(self, stage: stages.AuditOnlyRunStage) -> Tree:
197        tree = Tree("[bold]Audit-only execution[/bold]")
198        for snapshot in stage.snapshots:
199            display_name = self._display_name(snapshot)
200            tree.add(display_name)
201        return tree
202
203    def visit_explainable_restatement_stage(self, stage: ExplainableRestatementStage) -> Tree:
204        return self.visit_restatement_stage(stage)
205
206    def visit_restatement_stage(
207        self, stage: t.Union[ExplainableRestatementStage, stages.RestatementStage]
208    ) -> Tree:
209        tree = Tree(
210            "[bold]Invalidate data intervals in state for development environments to prevent old data from being promoted[/bold]\n"
211            "This only affects state and will not clear physical data from the tables until the next plan for each environment"
212        )
213
214        if isinstance(stage, ExplainableRestatementStage) and (
215            snapshot_intervals := stage.snapshot_intervals_to_clear
216        ):
217            for name, clear_requests in snapshot_intervals.items():
218                display_name = model_display_name(
219                    name, self.environment_naming_info, self.default_catalog, self.dialect
220                )
221                interval_start = min(cr.interval[0] for cr in clear_requests)
222                interval_end = max(cr.interval[1] for cr in clear_requests)
223
224                if not interval_start or not interval_end:
225                    continue
226
227                node = tree.add(f"{display_name} [{to_ts(interval_start)} - {to_ts(interval_end)}]")
228
229                all_environment_names = sorted(
230                    set(env_name for cr in clear_requests for env_name in cr.environment_names)
231                )
232                node.add("in environments: " + ", ".join(all_environment_names))
233
234        return tree
235
236    def visit_backfill_stage(self, stage: stages.BackfillStage) -> Tree:
237        if not stage.snapshot_to_intervals:
238            return Tree("[bold]SKIP: No model batches to execute[/bold]")
239
240        tree = Tree(
241            "[bold]Backfill models by running their queries and run standalone audits[/bold]"
242        )
243        for snapshot, intervals in stage.snapshot_to_intervals.items():
244            display_name = self._display_name(snapshot)
245            if snapshot.is_model:
246                is_deployable = stage.deployability_index.is_deployable(snapshot)
247                table_name = snapshot.table_name(is_deployable)
248                model_tree = Tree(f"{display_name} -> {table_name}")
249
250                for signal_name, _ in snapshot.model.signals:
251                    model_tree.add(f"Check '{signal_name}' signal")
252
253                if snapshot.model.pre_statements:
254                    model_tree.add("Run pre-statements")
255
256                backfill_tree = Tree("Fully refresh table")
257                if snapshot.is_incremental:
258                    current_intervals = (
259                        snapshot.intervals
260                        if stage.deployability_index.is_deployable(snapshot)
261                        else snapshot.dev_intervals
262                    )
263                    # If there are no intervals, the table will be fully refreshed
264                    if current_intervals:
265                        formatted_range = SnapshotIntervals(
266                            snapshot_id=snapshot.snapshot_id, intervals=intervals
267                        ).format_intervals(snapshot.node.interval_unit)
268                        backfill_tree = Tree(
269                            f"Incrementally insert records within the range [{formatted_range}]"
270                        )
271                elif snapshot.is_view:
272                    backfill_tree = Tree("Recreate view")
273
274                if not is_deployable:
275                    backfill_tree.add(
276                        "[orange1]preview[/orange1]: data will NOT be reused in production"
277                    )
278
279                model_tree.add(backfill_tree)
280
281                if snapshot.model.post_statements:
282                    model_tree.add("Run post-statements")
283
284                if snapshot.model.audits:
285                    for audit_name, _ in snapshot.model.audits:
286                        model_tree.add(f"Run '{audit_name}' audit")
287
288                tree.add(model_tree)
289            else:
290                tree.add(f"{display_name} \\[standalone audit]")
291        return tree
292
293    def visit_migrate_schemas_stage(self, stage: stages.MigrateSchemasStage) -> Tree:
294        tree = Tree(
295            "[bold]Update schemas (add, drop, alter columns) of production physical tables to reflect forward-only changes[/bold]"
296        )
297        for snapshot in stage.snapshots:
298            display_name = self._display_name(snapshot)
299            table_name = snapshot.table_name(True)
300            tree.add(f"{display_name} -> {table_name}")
301        return tree
302
303    def visit_virtual_layer_update_stage(self, stage: stages.VirtualLayerUpdateStage) -> Tree:
304        tree = Tree(
305            f"[bold]Update the virtual layer for environment '{self.environment_naming_info.name}'[/bold]"
306        )
307        promote_tree = Tree(
308            "[bold]Create or update views in the virtual layer to point at new physical tables and views[/bold]"
309        )
310        for snapshot in stage.promoted_snapshots:
311            display_name = self._display_name(snapshot)
312            table_name = snapshot.table_name(stage.deployability_index.is_representative(snapshot))
313            promote_tree.add(f"{display_name} -> {table_name}")
314
315        demote_tree = Tree(
316            "[bold]Delete views in the virtual layer for models that were removed[/bold]"
317        )
318        for snapshot in stage.demoted_snapshots:
319            display_name = self._display_name(snapshot, stage.demoted_environment_naming_info)
320            demote_tree.add(display_name)
321
322        if stage.promoted_snapshots:
323            tree.add(self._limit_tree(promote_tree))
324        if stage.demoted_snapshots:
325            tree.add(self._limit_tree(demote_tree))
326        return tree
327
328    def visit_create_snapshot_records_stage(
329        self, stage: stages.CreateSnapshotRecordsStage
330    ) -> t.Optional[Tree]:
331        return None
332
333    def visit_environment_record_update_stage(
334        self, stage: stages.EnvironmentRecordUpdateStage
335    ) -> t.Optional[Tree]:
336        return None
337
338    def visit_unpause_stage(self, stage: stages.UnpauseStage) -> t.Optional[Tree]:
339        return None
340
341    def visit_finalize_environment_stage(
342        self, stage: stages.FinalizeEnvironmentStage
343    ) -> t.Optional[Tree]:
344        return None
345
346    def _display_name(
347        self,
348        snapshot: t.Union[SnapshotInfoMixin, SnapshotIdAndVersion],
349        environment_naming_info: t.Optional[EnvironmentNamingInfo] = None,
350    ) -> str:
351        return snapshot.display_name(
352            environment_naming_info=environment_naming_info or self.environment_naming_info,
353            default_catalog=self.default_catalog
354            if self.verbosity < Verbosity.VERY_VERBOSE
355            else None,
356            dialect=self.dialect,
357        )
358
359    def _limit_tree(self, tree: Tree) -> Tree:
360        tree_length = len(tree.children)
361        if tree_length <= MAX_TREE_LENGTH:
362            return tree
363        if self.verbosity < Verbosity.VERY_VERBOSE:
364            tree.children = [
365                tree.children[0],
366                Tree(f".... {tree_length - 2} more ...."),
367                tree.children[-1],
368            ]
369        return tree
370
371
372def _get_explainer_console(
373    console: t.Optional[Console],
374    environment_naming_info: EnvironmentNamingInfo,
375    default_catalog: t.Optional[str],
376) -> ExplainerConsole:
377    console = console or get_console()
378    if not isinstance(console, TerminalConsole):
379        raise SQLMeshError("Plain explaination is only supported in the terminal.")
380    return RichExplainerConsole(
381        environment_naming_info=environment_naming_info,
382        dialect=console.dialect,
383        default_catalog=default_catalog,
384        verbosity=console.verbosity,
385        console=console.console,
386    )
logger = <Logger sqlmesh.core.plan.explainer (WARNING)>
class PlanExplainer(sqlmesh.core.plan.evaluator.PlanEvaluator):
39class PlanExplainer(PlanEvaluator):
40    def __init__(
41        self,
42        state_reader: StateReader,
43        default_catalog: t.Optional[str],
44        console: t.Optional[Console] = None,
45    ):
46        self.state_reader = state_reader
47        self.default_catalog = default_catalog
48        self.console = console or get_console()
49
50    def evaluate(
51        self,
52        plan: EvaluatablePlan,
53        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
54    ) -> None:
55        plan_stages = stages.build_plan_stages(plan, self.state_reader, self.default_catalog)
56        explainer_console = _get_explainer_console(
57            self.console, plan.environment, self.default_catalog
58        )
59
60        # add extra metadata that's only needed at this point for better --explain output
61        plan_stages = [
62            ExplainableRestatementStage.from_restatement_stage(stage, self.state_reader, plan)
63            if isinstance(stage, stages.RestatementStage)
64            else stage
65            for stage in plan_stages
66        ]
67
68        explainer_console.explain(plan_stages)

Helper class that provides a standard way to create an ABC using inheritance.

PlanExplainer( state_reader: sqlmesh.core.state_sync.base.StateReader, default_catalog: Optional[str], console: Optional[sqlmesh.core.console.Console] = None)
40    def __init__(
41        self,
42        state_reader: StateReader,
43        default_catalog: t.Optional[str],
44        console: t.Optional[Console] = None,
45    ):
46        self.state_reader = state_reader
47        self.default_catalog = default_catalog
48        self.console = console or get_console()
state_reader
default_catalog
console
def evaluate( self, plan: sqlmesh.core.plan.definition.EvaluatablePlan, circuit_breaker: Optional[Callable[[], bool]] = None) -> None:
50    def evaluate(
51        self,
52        plan: EvaluatablePlan,
53        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
54    ) -> None:
55        plan_stages = stages.build_plan_stages(plan, self.state_reader, self.default_catalog)
56        explainer_console = _get_explainer_console(
57            self.console, plan.environment, self.default_catalog
58        )
59
60        # add extra metadata that's only needed at this point for better --explain output
61        plan_stages = [
62            ExplainableRestatementStage.from_restatement_stage(stage, self.state_reader, plan)
63            if isinstance(stage, stages.RestatementStage)
64            else stage
65            for stage in plan_stages
66        ]
67
68        explainer_console.explain(plan_stages)

Evaluates a plan by pushing snapshots and backfilling data.

Given a plan, it pushes snapshots into the state and then kicks off the backfill process for all affected snapshots. Once backfill is done, snapshots that are part of the plan are promoted in the environment targeted by this plan.

Arguments:
  • plan: The plan to evaluate.
  • circuit_breaker: The circuit breaker to use.
class ExplainerConsole(abc.ABC):
71class ExplainerConsole(abc.ABC):
72    @abc.abstractmethod
73    def explain(self, stages: t.List[stages.PlanStage]) -> None:
74        pass

Helper class that provides a standard way to create an ABC using inheritance.

@dataclass
class ExplainableRestatementStage(sqlmesh.core.plan.stages.RestatementStage):
 77@dataclass
 78class ExplainableRestatementStage(stages.RestatementStage):
 79    """
 80    This brings forward some calculations that would usually be done in the evaluator so the user can be given a better indication
 81    of what might happen when they ask for the plan to be explained
 82    """
 83
 84    snapshot_intervals_to_clear: t.Dict[str, t.List[SnapshotIntervalClearRequest]]
 85    """Which snapshots from other environments would have intervals cleared as part of restatement, grouped by name."""
 86
 87    @classmethod
 88    def from_restatement_stage(
 89        cls: t.Type[ExplainableRestatementStage],
 90        stage: stages.RestatementStage,
 91        state_reader: StateReader,
 92        plan: EvaluatablePlan,
 93    ) -> ExplainableRestatementStage:
 94        all_restatement_intervals = identify_restatement_intervals_across_snapshot_versions(
 95            state_reader=state_reader,
 96            prod_restatements=plan.restatements,
 97            disable_restatement_models=plan.disabled_restatement_models,
 98            loaded_snapshots={s.snapshot_id: s for s in stage.all_snapshots.values()},
 99        )
100
101        # Group the interval clear requests by snapshot name to make them easier to write to the console
102        snapshot_intervals_to_clear = defaultdict(list)
103        for clear_request in all_restatement_intervals.values():
104            snapshot_intervals_to_clear[clear_request.snapshot.name].append(clear_request)
105
106        return cls(
107            snapshot_intervals_to_clear=snapshot_intervals_to_clear,
108            all_snapshots=stage.all_snapshots,
109        )

This brings forward some calculations that would usually be done in the evaluator so the user can be given a better indication of what might happen when they ask for the plan to be explained

ExplainableRestatementStage( all_snapshots: Dict[str, sqlmesh.core.snapshot.definition.Snapshot], snapshot_intervals_to_clear: Dict[str, List[sqlmesh.core.plan.common.SnapshotIntervalClearRequest]])
snapshot_intervals_to_clear: Dict[str, List[sqlmesh.core.plan.common.SnapshotIntervalClearRequest]]

Which snapshots from other environments would have intervals cleared as part of restatement, grouped by name.

 87    @classmethod
 88    def from_restatement_stage(
 89        cls: t.Type[ExplainableRestatementStage],
 90        stage: stages.RestatementStage,
 91        state_reader: StateReader,
 92        plan: EvaluatablePlan,
 93    ) -> ExplainableRestatementStage:
 94        all_restatement_intervals = identify_restatement_intervals_across_snapshot_versions(
 95            state_reader=state_reader,
 96            prod_restatements=plan.restatements,
 97            disable_restatement_models=plan.disabled_restatement_models,
 98            loaded_snapshots={s.snapshot_id: s for s in stage.all_snapshots.values()},
 99        )
100
101        # Group the interval clear requests by snapshot name to make them easier to write to the console
102        snapshot_intervals_to_clear = defaultdict(list)
103        for clear_request in all_restatement_intervals.values():
104            snapshot_intervals_to_clear[clear_request.snapshot.name].append(clear_request)
105
106        return cls(
107            snapshot_intervals_to_clear=snapshot_intervals_to_clear,
108            all_snapshots=stage.all_snapshots,
109        )
MAX_TREE_LENGTH = 10
class RichExplainerConsole(ExplainerConsole):
115class RichExplainerConsole(ExplainerConsole):
116    def __init__(
117        self,
118        environment_naming_info: EnvironmentNamingInfo,
119        dialect: DialectType,
120        default_catalog: t.Optional[str],
121        verbosity: Verbosity = Verbosity.DEFAULT,
122        console: t.Optional[RichConsole] = None,
123    ):
124        self.environment_naming_info = environment_naming_info
125        self.dialect = dialect
126        self.default_catalog = default_catalog
127        self.verbosity = verbosity
128        self.console: RichConsole = console or srich.console
129
130    def explain(self, stages: t.List[stages.PlanStage]) -> None:
131        tree = Tree("[bold]Explained plan[/bold]")
132        for stage in stages:
133            handler_name = f"visit_{to_snake_case(stage.__class__.__name__)}"
134            if not hasattr(self, handler_name):
135                logger.error("Unexpected stage: %s", stage.__class__.__name__)
136                continue
137            handler = getattr(self, handler_name)
138            result = handler(stage)
139            if result:
140                tree.add(self._limit_tree(result))
141        self.console.print(tree)
142
143    def visit_before_all_stage(self, stage: stages.BeforeAllStage) -> Tree:
144        return Tree("[bold]Execute before all statements[/bold]")
145
146    def visit_after_all_stage(self, stage: stages.AfterAllStage) -> Tree:
147        return Tree("[bold]Execute after all statements[/bold]")
148
149    def visit_physical_layer_update_stage(self, stage: stages.PhysicalLayerUpdateStage) -> Tree:
150        snapshots = [
151            s for s in stage.snapshots if s.snapshot_id in stage.snapshots_with_missing_intervals
152        ]
153        if not snapshots:
154            return Tree("[bold]SKIP: No physical layer updates to perform[/bold]")
155
156        tree = Tree(
157            "[bold]Validate SQL and create physical layer tables and views if they do not exist[/bold]"
158        )
159        for snapshot in snapshots:
160            is_deployable = (
161                stage.deployability_index.is_deployable(snapshot)
162                if self.environment_naming_info.name != c.PROD
163                else True
164            )
165            display_name = self._display_name(snapshot)
166            table_name = snapshot.table_name(is_deployable)
167            model_tree = Tree(f"{display_name} -> {table_name}")
168
169            if snapshot.is_model:
170                if snapshot.model.pre_statements:
171                    model_tree.add("Run pre-statements")
172                if snapshot.model.annotated:
173                    model_tree.add("Dry run model query without inserting results")
174
175            if snapshot.is_view:
176                create_tree = Tree("Create view if it doesn't exist")
177            elif (
178                snapshot.is_forward_only and snapshot.previous_versions and not snapshot.is_managed
179            ):
180                prod_table = snapshot.table_name(True)
181                create_tree = Tree(
182                    f"Clone {prod_table} into {table_name} and then update its schema if it doesn't exist"
183                )
184            else:
185                create_tree = Tree("Create table if it doesn't exist")
186
187            if not is_deployable:
188                create_tree.add("[orange1]preview[/orange1]: data will NOT be reused in production")
189            model_tree.add(create_tree)
190
191            if snapshot.is_model and snapshot.model.post_statements:
192                model_tree.add("Run post-statements")
193
194            tree.add(model_tree)
195        return tree
196
197    def visit_audit_only_run_stage(self, stage: stages.AuditOnlyRunStage) -> Tree:
198        tree = Tree("[bold]Audit-only execution[/bold]")
199        for snapshot in stage.snapshots:
200            display_name = self._display_name(snapshot)
201            tree.add(display_name)
202        return tree
203
204    def visit_explainable_restatement_stage(self, stage: ExplainableRestatementStage) -> Tree:
205        return self.visit_restatement_stage(stage)
206
207    def visit_restatement_stage(
208        self, stage: t.Union[ExplainableRestatementStage, stages.RestatementStage]
209    ) -> Tree:
210        tree = Tree(
211            "[bold]Invalidate data intervals in state for development environments to prevent old data from being promoted[/bold]\n"
212            "This only affects state and will not clear physical data from the tables until the next plan for each environment"
213        )
214
215        if isinstance(stage, ExplainableRestatementStage) and (
216            snapshot_intervals := stage.snapshot_intervals_to_clear
217        ):
218            for name, clear_requests in snapshot_intervals.items():
219                display_name = model_display_name(
220                    name, self.environment_naming_info, self.default_catalog, self.dialect
221                )
222                interval_start = min(cr.interval[0] for cr in clear_requests)
223                interval_end = max(cr.interval[1] for cr in clear_requests)
224
225                if not interval_start or not interval_end:
226                    continue
227
228                node = tree.add(f"{display_name} [{to_ts(interval_start)} - {to_ts(interval_end)}]")
229
230                all_environment_names = sorted(
231                    set(env_name for cr in clear_requests for env_name in cr.environment_names)
232                )
233                node.add("in environments: " + ", ".join(all_environment_names))
234
235        return tree
236
237    def visit_backfill_stage(self, stage: stages.BackfillStage) -> Tree:
238        if not stage.snapshot_to_intervals:
239            return Tree("[bold]SKIP: No model batches to execute[/bold]")
240
241        tree = Tree(
242            "[bold]Backfill models by running their queries and run standalone audits[/bold]"
243        )
244        for snapshot, intervals in stage.snapshot_to_intervals.items():
245            display_name = self._display_name(snapshot)
246            if snapshot.is_model:
247                is_deployable = stage.deployability_index.is_deployable(snapshot)
248                table_name = snapshot.table_name(is_deployable)
249                model_tree = Tree(f"{display_name} -> {table_name}")
250
251                for signal_name, _ in snapshot.model.signals:
252                    model_tree.add(f"Check '{signal_name}' signal")
253
254                if snapshot.model.pre_statements:
255                    model_tree.add("Run pre-statements")
256
257                backfill_tree = Tree("Fully refresh table")
258                if snapshot.is_incremental:
259                    current_intervals = (
260                        snapshot.intervals
261                        if stage.deployability_index.is_deployable(snapshot)
262                        else snapshot.dev_intervals
263                    )
264                    # If there are no intervals, the table will be fully refreshed
265                    if current_intervals:
266                        formatted_range = SnapshotIntervals(
267                            snapshot_id=snapshot.snapshot_id, intervals=intervals
268                        ).format_intervals(snapshot.node.interval_unit)
269                        backfill_tree = Tree(
270                            f"Incrementally insert records within the range [{formatted_range}]"
271                        )
272                elif snapshot.is_view:
273                    backfill_tree = Tree("Recreate view")
274
275                if not is_deployable:
276                    backfill_tree.add(
277                        "[orange1]preview[/orange1]: data will NOT be reused in production"
278                    )
279
280                model_tree.add(backfill_tree)
281
282                if snapshot.model.post_statements:
283                    model_tree.add("Run post-statements")
284
285                if snapshot.model.audits:
286                    for audit_name, _ in snapshot.model.audits:
287                        model_tree.add(f"Run '{audit_name}' audit")
288
289                tree.add(model_tree)
290            else:
291                tree.add(f"{display_name} \\[standalone audit]")
292        return tree
293
294    def visit_migrate_schemas_stage(self, stage: stages.MigrateSchemasStage) -> Tree:
295        tree = Tree(
296            "[bold]Update schemas (add, drop, alter columns) of production physical tables to reflect forward-only changes[/bold]"
297        )
298        for snapshot in stage.snapshots:
299            display_name = self._display_name(snapshot)
300            table_name = snapshot.table_name(True)
301            tree.add(f"{display_name} -> {table_name}")
302        return tree
303
304    def visit_virtual_layer_update_stage(self, stage: stages.VirtualLayerUpdateStage) -> Tree:
305        tree = Tree(
306            f"[bold]Update the virtual layer for environment '{self.environment_naming_info.name}'[/bold]"
307        )
308        promote_tree = Tree(
309            "[bold]Create or update views in the virtual layer to point at new physical tables and views[/bold]"
310        )
311        for snapshot in stage.promoted_snapshots:
312            display_name = self._display_name(snapshot)
313            table_name = snapshot.table_name(stage.deployability_index.is_representative(snapshot))
314            promote_tree.add(f"{display_name} -> {table_name}")
315
316        demote_tree = Tree(
317            "[bold]Delete views in the virtual layer for models that were removed[/bold]"
318        )
319        for snapshot in stage.demoted_snapshots:
320            display_name = self._display_name(snapshot, stage.demoted_environment_naming_info)
321            demote_tree.add(display_name)
322
323        if stage.promoted_snapshots:
324            tree.add(self._limit_tree(promote_tree))
325        if stage.demoted_snapshots:
326            tree.add(self._limit_tree(demote_tree))
327        return tree
328
329    def visit_create_snapshot_records_stage(
330        self, stage: stages.CreateSnapshotRecordsStage
331    ) -> t.Optional[Tree]:
332        return None
333
334    def visit_environment_record_update_stage(
335        self, stage: stages.EnvironmentRecordUpdateStage
336    ) -> t.Optional[Tree]:
337        return None
338
339    def visit_unpause_stage(self, stage: stages.UnpauseStage) -> t.Optional[Tree]:
340        return None
341
342    def visit_finalize_environment_stage(
343        self, stage: stages.FinalizeEnvironmentStage
344    ) -> t.Optional[Tree]:
345        return None
346
347    def _display_name(
348        self,
349        snapshot: t.Union[SnapshotInfoMixin, SnapshotIdAndVersion],
350        environment_naming_info: t.Optional[EnvironmentNamingInfo] = None,
351    ) -> str:
352        return snapshot.display_name(
353            environment_naming_info=environment_naming_info or self.environment_naming_info,
354            default_catalog=self.default_catalog
355            if self.verbosity < Verbosity.VERY_VERBOSE
356            else None,
357            dialect=self.dialect,
358        )
359
360    def _limit_tree(self, tree: Tree) -> Tree:
361        tree_length = len(tree.children)
362        if tree_length <= MAX_TREE_LENGTH:
363            return tree
364        if self.verbosity < Verbosity.VERY_VERBOSE:
365            tree.children = [
366                tree.children[0],
367                Tree(f".... {tree_length - 2} more ...."),
368                tree.children[-1],
369            ]
370        return tree

Helper class that provides a standard way to create an ABC using inheritance.

RichExplainerConsole( environment_naming_info: sqlmesh.core.environment.EnvironmentNamingInfo, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType], default_catalog: Optional[str], verbosity: sqlmesh.utils.Verbosity = <Verbosity.DEFAULT: 0>, console: Optional[rich.console.Console] = None)
116    def __init__(
117        self,
118        environment_naming_info: EnvironmentNamingInfo,
119        dialect: DialectType,
120        default_catalog: t.Optional[str],
121        verbosity: Verbosity = Verbosity.DEFAULT,
122        console: t.Optional[RichConsole] = None,
123    ):
124        self.environment_naming_info = environment_naming_info
125        self.dialect = dialect
126        self.default_catalog = default_catalog
127        self.verbosity = verbosity
128        self.console: RichConsole = console or srich.console
environment_naming_info
dialect
default_catalog
verbosity
console: rich.console.Console
130    def explain(self, stages: t.List[stages.PlanStage]) -> None:
131        tree = Tree("[bold]Explained plan[/bold]")
132        for stage in stages:
133            handler_name = f"visit_{to_snake_case(stage.__class__.__name__)}"
134            if not hasattr(self, handler_name):
135                logger.error("Unexpected stage: %s", stage.__class__.__name__)
136                continue
137            handler = getattr(self, handler_name)
138            result = handler(stage)
139            if result:
140                tree.add(self._limit_tree(result))
141        self.console.print(tree)
def visit_before_all_stage(self, stage: sqlmesh.core.plan.stages.BeforeAllStage) -> rich.tree.Tree:
143    def visit_before_all_stage(self, stage: stages.BeforeAllStage) -> Tree:
144        return Tree("[bold]Execute before all statements[/bold]")
def visit_after_all_stage(self, stage: sqlmesh.core.plan.stages.AfterAllStage) -> rich.tree.Tree:
146    def visit_after_all_stage(self, stage: stages.AfterAllStage) -> Tree:
147        return Tree("[bold]Execute after all statements[/bold]")
def visit_physical_layer_update_stage( self, stage: sqlmesh.core.plan.stages.PhysicalLayerUpdateStage) -> rich.tree.Tree:
149    def visit_physical_layer_update_stage(self, stage: stages.PhysicalLayerUpdateStage) -> Tree:
150        snapshots = [
151            s for s in stage.snapshots if s.snapshot_id in stage.snapshots_with_missing_intervals
152        ]
153        if not snapshots:
154            return Tree("[bold]SKIP: No physical layer updates to perform[/bold]")
155
156        tree = Tree(
157            "[bold]Validate SQL and create physical layer tables and views if they do not exist[/bold]"
158        )
159        for snapshot in snapshots:
160            is_deployable = (
161                stage.deployability_index.is_deployable(snapshot)
162                if self.environment_naming_info.name != c.PROD
163                else True
164            )
165            display_name = self._display_name(snapshot)
166            table_name = snapshot.table_name(is_deployable)
167            model_tree = Tree(f"{display_name} -> {table_name}")
168
169            if snapshot.is_model:
170                if snapshot.model.pre_statements:
171                    model_tree.add("Run pre-statements")
172                if snapshot.model.annotated:
173                    model_tree.add("Dry run model query without inserting results")
174
175            if snapshot.is_view:
176                create_tree = Tree("Create view if it doesn't exist")
177            elif (
178                snapshot.is_forward_only and snapshot.previous_versions and not snapshot.is_managed
179            ):
180                prod_table = snapshot.table_name(True)
181                create_tree = Tree(
182                    f"Clone {prod_table} into {table_name} and then update its schema if it doesn't exist"
183                )
184            else:
185                create_tree = Tree("Create table if it doesn't exist")
186
187            if not is_deployable:
188                create_tree.add("[orange1]preview[/orange1]: data will NOT be reused in production")
189            model_tree.add(create_tree)
190
191            if snapshot.is_model and snapshot.model.post_statements:
192                model_tree.add("Run post-statements")
193
194            tree.add(model_tree)
195        return tree
def visit_audit_only_run_stage( self, stage: sqlmesh.core.plan.stages.AuditOnlyRunStage) -> rich.tree.Tree:
197    def visit_audit_only_run_stage(self, stage: stages.AuditOnlyRunStage) -> Tree:
198        tree = Tree("[bold]Audit-only execution[/bold]")
199        for snapshot in stage.snapshots:
200            display_name = self._display_name(snapshot)
201            tree.add(display_name)
202        return tree
def visit_explainable_restatement_stage( self, stage: ExplainableRestatementStage) -> rich.tree.Tree:
204    def visit_explainable_restatement_stage(self, stage: ExplainableRestatementStage) -> Tree:
205        return self.visit_restatement_stage(stage)
def visit_restatement_stage( self, stage: Union[ExplainableRestatementStage, sqlmesh.core.plan.stages.RestatementStage]) -> rich.tree.Tree:
207    def visit_restatement_stage(
208        self, stage: t.Union[ExplainableRestatementStage, stages.RestatementStage]
209    ) -> Tree:
210        tree = Tree(
211            "[bold]Invalidate data intervals in state for development environments to prevent old data from being promoted[/bold]\n"
212            "This only affects state and will not clear physical data from the tables until the next plan for each environment"
213        )
214
215        if isinstance(stage, ExplainableRestatementStage) and (
216            snapshot_intervals := stage.snapshot_intervals_to_clear
217        ):
218            for name, clear_requests in snapshot_intervals.items():
219                display_name = model_display_name(
220                    name, self.environment_naming_info, self.default_catalog, self.dialect
221                )
222                interval_start = min(cr.interval[0] for cr in clear_requests)
223                interval_end = max(cr.interval[1] for cr in clear_requests)
224
225                if not interval_start or not interval_end:
226                    continue
227
228                node = tree.add(f"{display_name} [{to_ts(interval_start)} - {to_ts(interval_end)}]")
229
230                all_environment_names = sorted(
231                    set(env_name for cr in clear_requests for env_name in cr.environment_names)
232                )
233                node.add("in environments: " + ", ".join(all_environment_names))
234
235        return tree
def visit_backfill_stage(self, stage: sqlmesh.core.plan.stages.BackfillStage) -> rich.tree.Tree:
237    def visit_backfill_stage(self, stage: stages.BackfillStage) -> Tree:
238        if not stage.snapshot_to_intervals:
239            return Tree("[bold]SKIP: No model batches to execute[/bold]")
240
241        tree = Tree(
242            "[bold]Backfill models by running their queries and run standalone audits[/bold]"
243        )
244        for snapshot, intervals in stage.snapshot_to_intervals.items():
245            display_name = self._display_name(snapshot)
246            if snapshot.is_model:
247                is_deployable = stage.deployability_index.is_deployable(snapshot)
248                table_name = snapshot.table_name(is_deployable)
249                model_tree = Tree(f"{display_name} -> {table_name}")
250
251                for signal_name, _ in snapshot.model.signals:
252                    model_tree.add(f"Check '{signal_name}' signal")
253
254                if snapshot.model.pre_statements:
255                    model_tree.add("Run pre-statements")
256
257                backfill_tree = Tree("Fully refresh table")
258                if snapshot.is_incremental:
259                    current_intervals = (
260                        snapshot.intervals
261                        if stage.deployability_index.is_deployable(snapshot)
262                        else snapshot.dev_intervals
263                    )
264                    # If there are no intervals, the table will be fully refreshed
265                    if current_intervals:
266                        formatted_range = SnapshotIntervals(
267                            snapshot_id=snapshot.snapshot_id, intervals=intervals
268                        ).format_intervals(snapshot.node.interval_unit)
269                        backfill_tree = Tree(
270                            f"Incrementally insert records within the range [{formatted_range}]"
271                        )
272                elif snapshot.is_view:
273                    backfill_tree = Tree("Recreate view")
274
275                if not is_deployable:
276                    backfill_tree.add(
277                        "[orange1]preview[/orange1]: data will NOT be reused in production"
278                    )
279
280                model_tree.add(backfill_tree)
281
282                if snapshot.model.post_statements:
283                    model_tree.add("Run post-statements")
284
285                if snapshot.model.audits:
286                    for audit_name, _ in snapshot.model.audits:
287                        model_tree.add(f"Run '{audit_name}' audit")
288
289                tree.add(model_tree)
290            else:
291                tree.add(f"{display_name} \\[standalone audit]")
292        return tree
def visit_migrate_schemas_stage( self, stage: sqlmesh.core.plan.stages.MigrateSchemasStage) -> rich.tree.Tree:
294    def visit_migrate_schemas_stage(self, stage: stages.MigrateSchemasStage) -> Tree:
295        tree = Tree(
296            "[bold]Update schemas (add, drop, alter columns) of production physical tables to reflect forward-only changes[/bold]"
297        )
298        for snapshot in stage.snapshots:
299            display_name = self._display_name(snapshot)
300            table_name = snapshot.table_name(True)
301            tree.add(f"{display_name} -> {table_name}")
302        return tree
def visit_virtual_layer_update_stage( self, stage: sqlmesh.core.plan.stages.VirtualLayerUpdateStage) -> rich.tree.Tree:
304    def visit_virtual_layer_update_stage(self, stage: stages.VirtualLayerUpdateStage) -> Tree:
305        tree = Tree(
306            f"[bold]Update the virtual layer for environment '{self.environment_naming_info.name}'[/bold]"
307        )
308        promote_tree = Tree(
309            "[bold]Create or update views in the virtual layer to point at new physical tables and views[/bold]"
310        )
311        for snapshot in stage.promoted_snapshots:
312            display_name = self._display_name(snapshot)
313            table_name = snapshot.table_name(stage.deployability_index.is_representative(snapshot))
314            promote_tree.add(f"{display_name} -> {table_name}")
315
316        demote_tree = Tree(
317            "[bold]Delete views in the virtual layer for models that were removed[/bold]"
318        )
319        for snapshot in stage.demoted_snapshots:
320            display_name = self._display_name(snapshot, stage.demoted_environment_naming_info)
321            demote_tree.add(display_name)
322
323        if stage.promoted_snapshots:
324            tree.add(self._limit_tree(promote_tree))
325        if stage.demoted_snapshots:
326            tree.add(self._limit_tree(demote_tree))
327        return tree
def visit_create_snapshot_records_stage( self, stage: sqlmesh.core.plan.stages.CreateSnapshotRecordsStage) -> Optional[rich.tree.Tree]:
329    def visit_create_snapshot_records_stage(
330        self, stage: stages.CreateSnapshotRecordsStage
331    ) -> t.Optional[Tree]:
332        return None
def visit_environment_record_update_stage( self, stage: sqlmesh.core.plan.stages.EnvironmentRecordUpdateStage) -> Optional[rich.tree.Tree]:
334    def visit_environment_record_update_stage(
335        self, stage: stages.EnvironmentRecordUpdateStage
336    ) -> t.Optional[Tree]:
337        return None
def visit_unpause_stage( self, stage: sqlmesh.core.plan.stages.UnpauseStage) -> Optional[rich.tree.Tree]:
339    def visit_unpause_stage(self, stage: stages.UnpauseStage) -> t.Optional[Tree]:
340        return None
def visit_finalize_environment_stage( self, stage: sqlmesh.core.plan.stages.FinalizeEnvironmentStage) -> Optional[rich.tree.Tree]:
342    def visit_finalize_environment_stage(
343        self, stage: stages.FinalizeEnvironmentStage
344    ) -> t.Optional[Tree]:
345        return None