sqlmesh.core.plan.explainer
1from __future__ import annotations 2 3import abc 4import typing as t 5import logging 6from dataclasses import dataclass 7from collections import defaultdict 8 9from rich.console import Console as RichConsole 10from rich.tree import Tree 11from sqlglot.dialects.dialect import DialectType 12from sqlmesh.core import constants as c 13from sqlmesh.core.console import Console, TerminalConsole, get_console 14from sqlmesh.core.environment import EnvironmentNamingInfo 15from sqlmesh.core.plan.common import ( 16 SnapshotIntervalClearRequest, 17 identify_restatement_intervals_across_snapshot_versions, 18) 19from sqlmesh.core.plan.definition import EvaluatablePlan, SnapshotIntervals 20from sqlmesh.core.plan import stages 21from sqlmesh.core.plan.evaluator import ( 22 PlanEvaluator, 23) 24from sqlmesh.core.state_sync import StateReader 25from sqlmesh.core.snapshot.definition import ( 26 SnapshotInfoMixin, 27 SnapshotIdAndVersion, 28 model_display_name, 29) 30from sqlmesh.utils import Verbosity, rich as srich, to_snake_case 31from sqlmesh.utils.date import to_ts 32from sqlmesh.utils.errors import SQLMeshError 33 34 35logger = logging.getLogger(__name__) 36 37 38class PlanExplainer(PlanEvaluator): 39 def __init__( 40 self, 41 state_reader: StateReader, 42 default_catalog: t.Optional[str], 43 console: t.Optional[Console] = None, 44 ): 45 self.state_reader = state_reader 46 self.default_catalog = default_catalog 47 self.console = console or get_console() 48 49 def evaluate( 50 self, 51 plan: EvaluatablePlan, 52 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 53 ) -> None: 54 plan_stages = stages.build_plan_stages(plan, self.state_reader, self.default_catalog) 55 explainer_console = _get_explainer_console( 56 self.console, plan.environment, self.default_catalog 57 ) 58 59 # add extra metadata that's only needed at this point for better --explain output 60 plan_stages = [ 61 ExplainableRestatementStage.from_restatement_stage(stage, self.state_reader, plan) 62 if isinstance(stage, stages.RestatementStage) 63 else stage 64 for stage in plan_stages 65 ] 66 67 explainer_console.explain(plan_stages) 68 69 70class ExplainerConsole(abc.ABC): 71 @abc.abstractmethod 72 def explain(self, stages: t.List[stages.PlanStage]) -> None: 73 pass 74 75 76@dataclass 77class ExplainableRestatementStage(stages.RestatementStage): 78 """ 79 This brings forward some calculations that would usually be done in the evaluator so the user can be given a better indication 80 of what might happen when they ask for the plan to be explained 81 """ 82 83 snapshot_intervals_to_clear: t.Dict[str, t.List[SnapshotIntervalClearRequest]] 84 """Which snapshots from other environments would have intervals cleared as part of restatement, grouped by name.""" 85 86 @classmethod 87 def from_restatement_stage( 88 cls: t.Type[ExplainableRestatementStage], 89 stage: stages.RestatementStage, 90 state_reader: StateReader, 91 plan: EvaluatablePlan, 92 ) -> ExplainableRestatementStage: 93 all_restatement_intervals = identify_restatement_intervals_across_snapshot_versions( 94 state_reader=state_reader, 95 prod_restatements=plan.restatements, 96 disable_restatement_models=plan.disabled_restatement_models, 97 loaded_snapshots={s.snapshot_id: s for s in stage.all_snapshots.values()}, 98 ) 99 100 # Group the interval clear requests by snapshot name to make them easier to write to the console 101 snapshot_intervals_to_clear = defaultdict(list) 102 for clear_request in all_restatement_intervals.values(): 103 snapshot_intervals_to_clear[clear_request.snapshot.name].append(clear_request) 104 105 return cls( 106 snapshot_intervals_to_clear=snapshot_intervals_to_clear, 107 all_snapshots=stage.all_snapshots, 108 ) 109 110 111MAX_TREE_LENGTH = 10 112 113 114class RichExplainerConsole(ExplainerConsole): 115 def __init__( 116 self, 117 environment_naming_info: EnvironmentNamingInfo, 118 dialect: DialectType, 119 default_catalog: t.Optional[str], 120 verbosity: Verbosity = Verbosity.DEFAULT, 121 console: t.Optional[RichConsole] = None, 122 ): 123 self.environment_naming_info = environment_naming_info 124 self.dialect = dialect 125 self.default_catalog = default_catalog 126 self.verbosity = verbosity 127 self.console: RichConsole = console or srich.console 128 129 def explain(self, stages: t.List[stages.PlanStage]) -> None: 130 tree = Tree("[bold]Explained plan[/bold]") 131 for stage in stages: 132 handler_name = f"visit_{to_snake_case(stage.__class__.__name__)}" 133 if not hasattr(self, handler_name): 134 logger.error("Unexpected stage: %s", stage.__class__.__name__) 135 continue 136 handler = getattr(self, handler_name) 137 result = handler(stage) 138 if result: 139 tree.add(self._limit_tree(result)) 140 self.console.print(tree) 141 142 def visit_before_all_stage(self, stage: stages.BeforeAllStage) -> Tree: 143 return Tree("[bold]Execute before all statements[/bold]") 144 145 def visit_after_all_stage(self, stage: stages.AfterAllStage) -> Tree: 146 return Tree("[bold]Execute after all statements[/bold]") 147 148 def visit_physical_layer_update_stage(self, stage: stages.PhysicalLayerUpdateStage) -> Tree: 149 snapshots = [ 150 s for s in stage.snapshots if s.snapshot_id in stage.snapshots_with_missing_intervals 151 ] 152 if not snapshots: 153 return Tree("[bold]SKIP: No physical layer updates to perform[/bold]") 154 155 tree = Tree( 156 "[bold]Validate SQL and create physical layer tables and views if they do not exist[/bold]" 157 ) 158 for snapshot in snapshots: 159 is_deployable = ( 160 stage.deployability_index.is_deployable(snapshot) 161 if self.environment_naming_info.name != c.PROD 162 else True 163 ) 164 display_name = self._display_name(snapshot) 165 table_name = snapshot.table_name(is_deployable) 166 model_tree = Tree(f"{display_name} -> {table_name}") 167 168 if snapshot.is_model: 169 if snapshot.model.pre_statements: 170 model_tree.add("Run pre-statements") 171 if snapshot.model.annotated: 172 model_tree.add("Dry run model query without inserting results") 173 174 if snapshot.is_view: 175 create_tree = Tree("Create view if it doesn't exist") 176 elif ( 177 snapshot.is_forward_only and snapshot.previous_versions and not snapshot.is_managed 178 ): 179 prod_table = snapshot.table_name(True) 180 create_tree = Tree( 181 f"Clone {prod_table} into {table_name} and then update its schema if it doesn't exist" 182 ) 183 else: 184 create_tree = Tree("Create table if it doesn't exist") 185 186 if not is_deployable: 187 create_tree.add("[orange1]preview[/orange1]: data will NOT be reused in production") 188 model_tree.add(create_tree) 189 190 if snapshot.is_model and snapshot.model.post_statements: 191 model_tree.add("Run post-statements") 192 193 tree.add(model_tree) 194 return tree 195 196 def visit_audit_only_run_stage(self, stage: stages.AuditOnlyRunStage) -> Tree: 197 tree = Tree("[bold]Audit-only execution[/bold]") 198 for snapshot in stage.snapshots: 199 display_name = self._display_name(snapshot) 200 tree.add(display_name) 201 return tree 202 203 def visit_explainable_restatement_stage(self, stage: ExplainableRestatementStage) -> Tree: 204 return self.visit_restatement_stage(stage) 205 206 def visit_restatement_stage( 207 self, stage: t.Union[ExplainableRestatementStage, stages.RestatementStage] 208 ) -> Tree: 209 tree = Tree( 210 "[bold]Invalidate data intervals in state for development environments to prevent old data from being promoted[/bold]\n" 211 "This only affects state and will not clear physical data from the tables until the next plan for each environment" 212 ) 213 214 if isinstance(stage, ExplainableRestatementStage) and ( 215 snapshot_intervals := stage.snapshot_intervals_to_clear 216 ): 217 for name, clear_requests in snapshot_intervals.items(): 218 display_name = model_display_name( 219 name, self.environment_naming_info, self.default_catalog, self.dialect 220 ) 221 interval_start = min(cr.interval[0] for cr in clear_requests) 222 interval_end = max(cr.interval[1] for cr in clear_requests) 223 224 if not interval_start or not interval_end: 225 continue 226 227 node = tree.add(f"{display_name} [{to_ts(interval_start)} - {to_ts(interval_end)}]") 228 229 all_environment_names = sorted( 230 set(env_name for cr in clear_requests for env_name in cr.environment_names) 231 ) 232 node.add("in environments: " + ", ".join(all_environment_names)) 233 234 return tree 235 236 def visit_backfill_stage(self, stage: stages.BackfillStage) -> Tree: 237 if not stage.snapshot_to_intervals: 238 return Tree("[bold]SKIP: No model batches to execute[/bold]") 239 240 tree = Tree( 241 "[bold]Backfill models by running their queries and run standalone audits[/bold]" 242 ) 243 for snapshot, intervals in stage.snapshot_to_intervals.items(): 244 display_name = self._display_name(snapshot) 245 if snapshot.is_model: 246 is_deployable = stage.deployability_index.is_deployable(snapshot) 247 table_name = snapshot.table_name(is_deployable) 248 model_tree = Tree(f"{display_name} -> {table_name}") 249 250 for signal_name, _ in snapshot.model.signals: 251 model_tree.add(f"Check '{signal_name}' signal") 252 253 if snapshot.model.pre_statements: 254 model_tree.add("Run pre-statements") 255 256 backfill_tree = Tree("Fully refresh table") 257 if snapshot.is_incremental: 258 current_intervals = ( 259 snapshot.intervals 260 if stage.deployability_index.is_deployable(snapshot) 261 else snapshot.dev_intervals 262 ) 263 # If there are no intervals, the table will be fully refreshed 264 if current_intervals: 265 formatted_range = SnapshotIntervals( 266 snapshot_id=snapshot.snapshot_id, intervals=intervals 267 ).format_intervals(snapshot.node.interval_unit) 268 backfill_tree = Tree( 269 f"Incrementally insert records within the range [{formatted_range}]" 270 ) 271 elif snapshot.is_view: 272 backfill_tree = Tree("Recreate view") 273 274 if not is_deployable: 275 backfill_tree.add( 276 "[orange1]preview[/orange1]: data will NOT be reused in production" 277 ) 278 279 model_tree.add(backfill_tree) 280 281 if snapshot.model.post_statements: 282 model_tree.add("Run post-statements") 283 284 if snapshot.model.audits: 285 for audit_name, _ in snapshot.model.audits: 286 model_tree.add(f"Run '{audit_name}' audit") 287 288 tree.add(model_tree) 289 else: 290 tree.add(f"{display_name} \\[standalone audit]") 291 return tree 292 293 def visit_migrate_schemas_stage(self, stage: stages.MigrateSchemasStage) -> Tree: 294 tree = Tree( 295 "[bold]Update schemas (add, drop, alter columns) of production physical tables to reflect forward-only changes[/bold]" 296 ) 297 for snapshot in stage.snapshots: 298 display_name = self._display_name(snapshot) 299 table_name = snapshot.table_name(True) 300 tree.add(f"{display_name} -> {table_name}") 301 return tree 302 303 def visit_virtual_layer_update_stage(self, stage: stages.VirtualLayerUpdateStage) -> Tree: 304 tree = Tree( 305 f"[bold]Update the virtual layer for environment '{self.environment_naming_info.name}'[/bold]" 306 ) 307 promote_tree = Tree( 308 "[bold]Create or update views in the virtual layer to point at new physical tables and views[/bold]" 309 ) 310 for snapshot in stage.promoted_snapshots: 311 display_name = self._display_name(snapshot) 312 table_name = snapshot.table_name(stage.deployability_index.is_representative(snapshot)) 313 promote_tree.add(f"{display_name} -> {table_name}") 314 315 demote_tree = Tree( 316 "[bold]Delete views in the virtual layer for models that were removed[/bold]" 317 ) 318 for snapshot in stage.demoted_snapshots: 319 display_name = self._display_name(snapshot, stage.demoted_environment_naming_info) 320 demote_tree.add(display_name) 321 322 if stage.promoted_snapshots: 323 tree.add(self._limit_tree(promote_tree)) 324 if stage.demoted_snapshots: 325 tree.add(self._limit_tree(demote_tree)) 326 return tree 327 328 def visit_create_snapshot_records_stage( 329 self, stage: stages.CreateSnapshotRecordsStage 330 ) -> t.Optional[Tree]: 331 return None 332 333 def visit_environment_record_update_stage( 334 self, stage: stages.EnvironmentRecordUpdateStage 335 ) -> t.Optional[Tree]: 336 return None 337 338 def visit_unpause_stage(self, stage: stages.UnpauseStage) -> t.Optional[Tree]: 339 return None 340 341 def visit_finalize_environment_stage( 342 self, stage: stages.FinalizeEnvironmentStage 343 ) -> t.Optional[Tree]: 344 return None 345 346 def _display_name( 347 self, 348 snapshot: t.Union[SnapshotInfoMixin, SnapshotIdAndVersion], 349 environment_naming_info: t.Optional[EnvironmentNamingInfo] = None, 350 ) -> str: 351 return snapshot.display_name( 352 environment_naming_info=environment_naming_info or self.environment_naming_info, 353 default_catalog=self.default_catalog 354 if self.verbosity < Verbosity.VERY_VERBOSE 355 else None, 356 dialect=self.dialect, 357 ) 358 359 def _limit_tree(self, tree: Tree) -> Tree: 360 tree_length = len(tree.children) 361 if tree_length <= MAX_TREE_LENGTH: 362 return tree 363 if self.verbosity < Verbosity.VERY_VERBOSE: 364 tree.children = [ 365 tree.children[0], 366 Tree(f".... {tree_length - 2} more ...."), 367 tree.children[-1], 368 ] 369 return tree 370 371 372def _get_explainer_console( 373 console: t.Optional[Console], 374 environment_naming_info: EnvironmentNamingInfo, 375 default_catalog: t.Optional[str], 376) -> ExplainerConsole: 377 console = console or get_console() 378 if not isinstance(console, TerminalConsole): 379 raise SQLMeshError("Plain explaination is only supported in the terminal.") 380 return RichExplainerConsole( 381 environment_naming_info=environment_naming_info, 382 dialect=console.dialect, 383 default_catalog=default_catalog, 384 verbosity=console.verbosity, 385 console=console.console, 386 )
39class PlanExplainer(PlanEvaluator): 40 def __init__( 41 self, 42 state_reader: StateReader, 43 default_catalog: t.Optional[str], 44 console: t.Optional[Console] = None, 45 ): 46 self.state_reader = state_reader 47 self.default_catalog = default_catalog 48 self.console = console or get_console() 49 50 def evaluate( 51 self, 52 plan: EvaluatablePlan, 53 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 54 ) -> None: 55 plan_stages = stages.build_plan_stages(plan, self.state_reader, self.default_catalog) 56 explainer_console = _get_explainer_console( 57 self.console, plan.environment, self.default_catalog 58 ) 59 60 # add extra metadata that's only needed at this point for better --explain output 61 plan_stages = [ 62 ExplainableRestatementStage.from_restatement_stage(stage, self.state_reader, plan) 63 if isinstance(stage, stages.RestatementStage) 64 else stage 65 for stage in plan_stages 66 ] 67 68 explainer_console.explain(plan_stages)
Helper class that provides a standard way to create an ABC using inheritance.
50 def evaluate( 51 self, 52 plan: EvaluatablePlan, 53 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 54 ) -> None: 55 plan_stages = stages.build_plan_stages(plan, self.state_reader, self.default_catalog) 56 explainer_console = _get_explainer_console( 57 self.console, plan.environment, self.default_catalog 58 ) 59 60 # add extra metadata that's only needed at this point for better --explain output 61 plan_stages = [ 62 ExplainableRestatementStage.from_restatement_stage(stage, self.state_reader, plan) 63 if isinstance(stage, stages.RestatementStage) 64 else stage 65 for stage in plan_stages 66 ] 67 68 explainer_console.explain(plan_stages)
Evaluates a plan by pushing snapshots and backfilling data.
Given a plan, it pushes snapshots into the state and then kicks off the backfill process for all affected snapshots. Once backfill is done, snapshots that are part of the plan are promoted in the environment targeted by this plan.
Arguments:
- plan: The plan to evaluate.
- circuit_breaker: The circuit breaker to use.
71class ExplainerConsole(abc.ABC): 72 @abc.abstractmethod 73 def explain(self, stages: t.List[stages.PlanStage]) -> None: 74 pass
Helper class that provides a standard way to create an ABC using inheritance.
77@dataclass 78class ExplainableRestatementStage(stages.RestatementStage): 79 """ 80 This brings forward some calculations that would usually be done in the evaluator so the user can be given a better indication 81 of what might happen when they ask for the plan to be explained 82 """ 83 84 snapshot_intervals_to_clear: t.Dict[str, t.List[SnapshotIntervalClearRequest]] 85 """Which snapshots from other environments would have intervals cleared as part of restatement, grouped by name.""" 86 87 @classmethod 88 def from_restatement_stage( 89 cls: t.Type[ExplainableRestatementStage], 90 stage: stages.RestatementStage, 91 state_reader: StateReader, 92 plan: EvaluatablePlan, 93 ) -> ExplainableRestatementStage: 94 all_restatement_intervals = identify_restatement_intervals_across_snapshot_versions( 95 state_reader=state_reader, 96 prod_restatements=plan.restatements, 97 disable_restatement_models=plan.disabled_restatement_models, 98 loaded_snapshots={s.snapshot_id: s for s in stage.all_snapshots.values()}, 99 ) 100 101 # Group the interval clear requests by snapshot name to make them easier to write to the console 102 snapshot_intervals_to_clear = defaultdict(list) 103 for clear_request in all_restatement_intervals.values(): 104 snapshot_intervals_to_clear[clear_request.snapshot.name].append(clear_request) 105 106 return cls( 107 snapshot_intervals_to_clear=snapshot_intervals_to_clear, 108 all_snapshots=stage.all_snapshots, 109 )
This brings forward some calculations that would usually be done in the evaluator so the user can be given a better indication of what might happen when they ask for the plan to be explained
Which snapshots from other environments would have intervals cleared as part of restatement, grouped by name.
87 @classmethod 88 def from_restatement_stage( 89 cls: t.Type[ExplainableRestatementStage], 90 stage: stages.RestatementStage, 91 state_reader: StateReader, 92 plan: EvaluatablePlan, 93 ) -> ExplainableRestatementStage: 94 all_restatement_intervals = identify_restatement_intervals_across_snapshot_versions( 95 state_reader=state_reader, 96 prod_restatements=plan.restatements, 97 disable_restatement_models=plan.disabled_restatement_models, 98 loaded_snapshots={s.snapshot_id: s for s in stage.all_snapshots.values()}, 99 ) 100 101 # Group the interval clear requests by snapshot name to make them easier to write to the console 102 snapshot_intervals_to_clear = defaultdict(list) 103 for clear_request in all_restatement_intervals.values(): 104 snapshot_intervals_to_clear[clear_request.snapshot.name].append(clear_request) 105 106 return cls( 107 snapshot_intervals_to_clear=snapshot_intervals_to_clear, 108 all_snapshots=stage.all_snapshots, 109 )
Inherited Members
115class RichExplainerConsole(ExplainerConsole): 116 def __init__( 117 self, 118 environment_naming_info: EnvironmentNamingInfo, 119 dialect: DialectType, 120 default_catalog: t.Optional[str], 121 verbosity: Verbosity = Verbosity.DEFAULT, 122 console: t.Optional[RichConsole] = None, 123 ): 124 self.environment_naming_info = environment_naming_info 125 self.dialect = dialect 126 self.default_catalog = default_catalog 127 self.verbosity = verbosity 128 self.console: RichConsole = console or srich.console 129 130 def explain(self, stages: t.List[stages.PlanStage]) -> None: 131 tree = Tree("[bold]Explained plan[/bold]") 132 for stage in stages: 133 handler_name = f"visit_{to_snake_case(stage.__class__.__name__)}" 134 if not hasattr(self, handler_name): 135 logger.error("Unexpected stage: %s", stage.__class__.__name__) 136 continue 137 handler = getattr(self, handler_name) 138 result = handler(stage) 139 if result: 140 tree.add(self._limit_tree(result)) 141 self.console.print(tree) 142 143 def visit_before_all_stage(self, stage: stages.BeforeAllStage) -> Tree: 144 return Tree("[bold]Execute before all statements[/bold]") 145 146 def visit_after_all_stage(self, stage: stages.AfterAllStage) -> Tree: 147 return Tree("[bold]Execute after all statements[/bold]") 148 149 def visit_physical_layer_update_stage(self, stage: stages.PhysicalLayerUpdateStage) -> Tree: 150 snapshots = [ 151 s for s in stage.snapshots if s.snapshot_id in stage.snapshots_with_missing_intervals 152 ] 153 if not snapshots: 154 return Tree("[bold]SKIP: No physical layer updates to perform[/bold]") 155 156 tree = Tree( 157 "[bold]Validate SQL and create physical layer tables and views if they do not exist[/bold]" 158 ) 159 for snapshot in snapshots: 160 is_deployable = ( 161 stage.deployability_index.is_deployable(snapshot) 162 if self.environment_naming_info.name != c.PROD 163 else True 164 ) 165 display_name = self._display_name(snapshot) 166 table_name = snapshot.table_name(is_deployable) 167 model_tree = Tree(f"{display_name} -> {table_name}") 168 169 if snapshot.is_model: 170 if snapshot.model.pre_statements: 171 model_tree.add("Run pre-statements") 172 if snapshot.model.annotated: 173 model_tree.add("Dry run model query without inserting results") 174 175 if snapshot.is_view: 176 create_tree = Tree("Create view if it doesn't exist") 177 elif ( 178 snapshot.is_forward_only and snapshot.previous_versions and not snapshot.is_managed 179 ): 180 prod_table = snapshot.table_name(True) 181 create_tree = Tree( 182 f"Clone {prod_table} into {table_name} and then update its schema if it doesn't exist" 183 ) 184 else: 185 create_tree = Tree("Create table if it doesn't exist") 186 187 if not is_deployable: 188 create_tree.add("[orange1]preview[/orange1]: data will NOT be reused in production") 189 model_tree.add(create_tree) 190 191 if snapshot.is_model and snapshot.model.post_statements: 192 model_tree.add("Run post-statements") 193 194 tree.add(model_tree) 195 return tree 196 197 def visit_audit_only_run_stage(self, stage: stages.AuditOnlyRunStage) -> Tree: 198 tree = Tree("[bold]Audit-only execution[/bold]") 199 for snapshot in stage.snapshots: 200 display_name = self._display_name(snapshot) 201 tree.add(display_name) 202 return tree 203 204 def visit_explainable_restatement_stage(self, stage: ExplainableRestatementStage) -> Tree: 205 return self.visit_restatement_stage(stage) 206 207 def visit_restatement_stage( 208 self, stage: t.Union[ExplainableRestatementStage, stages.RestatementStage] 209 ) -> Tree: 210 tree = Tree( 211 "[bold]Invalidate data intervals in state for development environments to prevent old data from being promoted[/bold]\n" 212 "This only affects state and will not clear physical data from the tables until the next plan for each environment" 213 ) 214 215 if isinstance(stage, ExplainableRestatementStage) and ( 216 snapshot_intervals := stage.snapshot_intervals_to_clear 217 ): 218 for name, clear_requests in snapshot_intervals.items(): 219 display_name = model_display_name( 220 name, self.environment_naming_info, self.default_catalog, self.dialect 221 ) 222 interval_start = min(cr.interval[0] for cr in clear_requests) 223 interval_end = max(cr.interval[1] for cr in clear_requests) 224 225 if not interval_start or not interval_end: 226 continue 227 228 node = tree.add(f"{display_name} [{to_ts(interval_start)} - {to_ts(interval_end)}]") 229 230 all_environment_names = sorted( 231 set(env_name for cr in clear_requests for env_name in cr.environment_names) 232 ) 233 node.add("in environments: " + ", ".join(all_environment_names)) 234 235 return tree 236 237 def visit_backfill_stage(self, stage: stages.BackfillStage) -> Tree: 238 if not stage.snapshot_to_intervals: 239 return Tree("[bold]SKIP: No model batches to execute[/bold]") 240 241 tree = Tree( 242 "[bold]Backfill models by running their queries and run standalone audits[/bold]" 243 ) 244 for snapshot, intervals in stage.snapshot_to_intervals.items(): 245 display_name = self._display_name(snapshot) 246 if snapshot.is_model: 247 is_deployable = stage.deployability_index.is_deployable(snapshot) 248 table_name = snapshot.table_name(is_deployable) 249 model_tree = Tree(f"{display_name} -> {table_name}") 250 251 for signal_name, _ in snapshot.model.signals: 252 model_tree.add(f"Check '{signal_name}' signal") 253 254 if snapshot.model.pre_statements: 255 model_tree.add("Run pre-statements") 256 257 backfill_tree = Tree("Fully refresh table") 258 if snapshot.is_incremental: 259 current_intervals = ( 260 snapshot.intervals 261 if stage.deployability_index.is_deployable(snapshot) 262 else snapshot.dev_intervals 263 ) 264 # If there are no intervals, the table will be fully refreshed 265 if current_intervals: 266 formatted_range = SnapshotIntervals( 267 snapshot_id=snapshot.snapshot_id, intervals=intervals 268 ).format_intervals(snapshot.node.interval_unit) 269 backfill_tree = Tree( 270 f"Incrementally insert records within the range [{formatted_range}]" 271 ) 272 elif snapshot.is_view: 273 backfill_tree = Tree("Recreate view") 274 275 if not is_deployable: 276 backfill_tree.add( 277 "[orange1]preview[/orange1]: data will NOT be reused in production" 278 ) 279 280 model_tree.add(backfill_tree) 281 282 if snapshot.model.post_statements: 283 model_tree.add("Run post-statements") 284 285 if snapshot.model.audits: 286 for audit_name, _ in snapshot.model.audits: 287 model_tree.add(f"Run '{audit_name}' audit") 288 289 tree.add(model_tree) 290 else: 291 tree.add(f"{display_name} \\[standalone audit]") 292 return tree 293 294 def visit_migrate_schemas_stage(self, stage: stages.MigrateSchemasStage) -> Tree: 295 tree = Tree( 296 "[bold]Update schemas (add, drop, alter columns) of production physical tables to reflect forward-only changes[/bold]" 297 ) 298 for snapshot in stage.snapshots: 299 display_name = self._display_name(snapshot) 300 table_name = snapshot.table_name(True) 301 tree.add(f"{display_name} -> {table_name}") 302 return tree 303 304 def visit_virtual_layer_update_stage(self, stage: stages.VirtualLayerUpdateStage) -> Tree: 305 tree = Tree( 306 f"[bold]Update the virtual layer for environment '{self.environment_naming_info.name}'[/bold]" 307 ) 308 promote_tree = Tree( 309 "[bold]Create or update views in the virtual layer to point at new physical tables and views[/bold]" 310 ) 311 for snapshot in stage.promoted_snapshots: 312 display_name = self._display_name(snapshot) 313 table_name = snapshot.table_name(stage.deployability_index.is_representative(snapshot)) 314 promote_tree.add(f"{display_name} -> {table_name}") 315 316 demote_tree = Tree( 317 "[bold]Delete views in the virtual layer for models that were removed[/bold]" 318 ) 319 for snapshot in stage.demoted_snapshots: 320 display_name = self._display_name(snapshot, stage.demoted_environment_naming_info) 321 demote_tree.add(display_name) 322 323 if stage.promoted_snapshots: 324 tree.add(self._limit_tree(promote_tree)) 325 if stage.demoted_snapshots: 326 tree.add(self._limit_tree(demote_tree)) 327 return tree 328 329 def visit_create_snapshot_records_stage( 330 self, stage: stages.CreateSnapshotRecordsStage 331 ) -> t.Optional[Tree]: 332 return None 333 334 def visit_environment_record_update_stage( 335 self, stage: stages.EnvironmentRecordUpdateStage 336 ) -> t.Optional[Tree]: 337 return None 338 339 def visit_unpause_stage(self, stage: stages.UnpauseStage) -> t.Optional[Tree]: 340 return None 341 342 def visit_finalize_environment_stage( 343 self, stage: stages.FinalizeEnvironmentStage 344 ) -> t.Optional[Tree]: 345 return None 346 347 def _display_name( 348 self, 349 snapshot: t.Union[SnapshotInfoMixin, SnapshotIdAndVersion], 350 environment_naming_info: t.Optional[EnvironmentNamingInfo] = None, 351 ) -> str: 352 return snapshot.display_name( 353 environment_naming_info=environment_naming_info or self.environment_naming_info, 354 default_catalog=self.default_catalog 355 if self.verbosity < Verbosity.VERY_VERBOSE 356 else None, 357 dialect=self.dialect, 358 ) 359 360 def _limit_tree(self, tree: Tree) -> Tree: 361 tree_length = len(tree.children) 362 if tree_length <= MAX_TREE_LENGTH: 363 return tree 364 if self.verbosity < Verbosity.VERY_VERBOSE: 365 tree.children = [ 366 tree.children[0], 367 Tree(f".... {tree_length - 2} more ...."), 368 tree.children[-1], 369 ] 370 return tree
Helper class that provides a standard way to create an ABC using inheritance.
116 def __init__( 117 self, 118 environment_naming_info: EnvironmentNamingInfo, 119 dialect: DialectType, 120 default_catalog: t.Optional[str], 121 verbosity: Verbosity = Verbosity.DEFAULT, 122 console: t.Optional[RichConsole] = None, 123 ): 124 self.environment_naming_info = environment_naming_info 125 self.dialect = dialect 126 self.default_catalog = default_catalog 127 self.verbosity = verbosity 128 self.console: RichConsole = console or srich.console
130 def explain(self, stages: t.List[stages.PlanStage]) -> None: 131 tree = Tree("[bold]Explained plan[/bold]") 132 for stage in stages: 133 handler_name = f"visit_{to_snake_case(stage.__class__.__name__)}" 134 if not hasattr(self, handler_name): 135 logger.error("Unexpected stage: %s", stage.__class__.__name__) 136 continue 137 handler = getattr(self, handler_name) 138 result = handler(stage) 139 if result: 140 tree.add(self._limit_tree(result)) 141 self.console.print(tree)
149 def visit_physical_layer_update_stage(self, stage: stages.PhysicalLayerUpdateStage) -> Tree: 150 snapshots = [ 151 s for s in stage.snapshots if s.snapshot_id in stage.snapshots_with_missing_intervals 152 ] 153 if not snapshots: 154 return Tree("[bold]SKIP: No physical layer updates to perform[/bold]") 155 156 tree = Tree( 157 "[bold]Validate SQL and create physical layer tables and views if they do not exist[/bold]" 158 ) 159 for snapshot in snapshots: 160 is_deployable = ( 161 stage.deployability_index.is_deployable(snapshot) 162 if self.environment_naming_info.name != c.PROD 163 else True 164 ) 165 display_name = self._display_name(snapshot) 166 table_name = snapshot.table_name(is_deployable) 167 model_tree = Tree(f"{display_name} -> {table_name}") 168 169 if snapshot.is_model: 170 if snapshot.model.pre_statements: 171 model_tree.add("Run pre-statements") 172 if snapshot.model.annotated: 173 model_tree.add("Dry run model query without inserting results") 174 175 if snapshot.is_view: 176 create_tree = Tree("Create view if it doesn't exist") 177 elif ( 178 snapshot.is_forward_only and snapshot.previous_versions and not snapshot.is_managed 179 ): 180 prod_table = snapshot.table_name(True) 181 create_tree = Tree( 182 f"Clone {prod_table} into {table_name} and then update its schema if it doesn't exist" 183 ) 184 else: 185 create_tree = Tree("Create table if it doesn't exist") 186 187 if not is_deployable: 188 create_tree.add("[orange1]preview[/orange1]: data will NOT be reused in production") 189 model_tree.add(create_tree) 190 191 if snapshot.is_model and snapshot.model.post_statements: 192 model_tree.add("Run post-statements") 193 194 tree.add(model_tree) 195 return tree
207 def visit_restatement_stage( 208 self, stage: t.Union[ExplainableRestatementStage, stages.RestatementStage] 209 ) -> Tree: 210 tree = Tree( 211 "[bold]Invalidate data intervals in state for development environments to prevent old data from being promoted[/bold]\n" 212 "This only affects state and will not clear physical data from the tables until the next plan for each environment" 213 ) 214 215 if isinstance(stage, ExplainableRestatementStage) and ( 216 snapshot_intervals := stage.snapshot_intervals_to_clear 217 ): 218 for name, clear_requests in snapshot_intervals.items(): 219 display_name = model_display_name( 220 name, self.environment_naming_info, self.default_catalog, self.dialect 221 ) 222 interval_start = min(cr.interval[0] for cr in clear_requests) 223 interval_end = max(cr.interval[1] for cr in clear_requests) 224 225 if not interval_start or not interval_end: 226 continue 227 228 node = tree.add(f"{display_name} [{to_ts(interval_start)} - {to_ts(interval_end)}]") 229 230 all_environment_names = sorted( 231 set(env_name for cr in clear_requests for env_name in cr.environment_names) 232 ) 233 node.add("in environments: " + ", ".join(all_environment_names)) 234 235 return tree
237 def visit_backfill_stage(self, stage: stages.BackfillStage) -> Tree: 238 if not stage.snapshot_to_intervals: 239 return Tree("[bold]SKIP: No model batches to execute[/bold]") 240 241 tree = Tree( 242 "[bold]Backfill models by running their queries and run standalone audits[/bold]" 243 ) 244 for snapshot, intervals in stage.snapshot_to_intervals.items(): 245 display_name = self._display_name(snapshot) 246 if snapshot.is_model: 247 is_deployable = stage.deployability_index.is_deployable(snapshot) 248 table_name = snapshot.table_name(is_deployable) 249 model_tree = Tree(f"{display_name} -> {table_name}") 250 251 for signal_name, _ in snapshot.model.signals: 252 model_tree.add(f"Check '{signal_name}' signal") 253 254 if snapshot.model.pre_statements: 255 model_tree.add("Run pre-statements") 256 257 backfill_tree = Tree("Fully refresh table") 258 if snapshot.is_incremental: 259 current_intervals = ( 260 snapshot.intervals 261 if stage.deployability_index.is_deployable(snapshot) 262 else snapshot.dev_intervals 263 ) 264 # If there are no intervals, the table will be fully refreshed 265 if current_intervals: 266 formatted_range = SnapshotIntervals( 267 snapshot_id=snapshot.snapshot_id, intervals=intervals 268 ).format_intervals(snapshot.node.interval_unit) 269 backfill_tree = Tree( 270 f"Incrementally insert records within the range [{formatted_range}]" 271 ) 272 elif snapshot.is_view: 273 backfill_tree = Tree("Recreate view") 274 275 if not is_deployable: 276 backfill_tree.add( 277 "[orange1]preview[/orange1]: data will NOT be reused in production" 278 ) 279 280 model_tree.add(backfill_tree) 281 282 if snapshot.model.post_statements: 283 model_tree.add("Run post-statements") 284 285 if snapshot.model.audits: 286 for audit_name, _ in snapshot.model.audits: 287 model_tree.add(f"Run '{audit_name}' audit") 288 289 tree.add(model_tree) 290 else: 291 tree.add(f"{display_name} \\[standalone audit]") 292 return tree
294 def visit_migrate_schemas_stage(self, stage: stages.MigrateSchemasStage) -> Tree: 295 tree = Tree( 296 "[bold]Update schemas (add, drop, alter columns) of production physical tables to reflect forward-only changes[/bold]" 297 ) 298 for snapshot in stage.snapshots: 299 display_name = self._display_name(snapshot) 300 table_name = snapshot.table_name(True) 301 tree.add(f"{display_name} -> {table_name}") 302 return tree
304 def visit_virtual_layer_update_stage(self, stage: stages.VirtualLayerUpdateStage) -> Tree: 305 tree = Tree( 306 f"[bold]Update the virtual layer for environment '{self.environment_naming_info.name}'[/bold]" 307 ) 308 promote_tree = Tree( 309 "[bold]Create or update views in the virtual layer to point at new physical tables and views[/bold]" 310 ) 311 for snapshot in stage.promoted_snapshots: 312 display_name = self._display_name(snapshot) 313 table_name = snapshot.table_name(stage.deployability_index.is_representative(snapshot)) 314 promote_tree.add(f"{display_name} -> {table_name}") 315 316 demote_tree = Tree( 317 "[bold]Delete views in the virtual layer for models that were removed[/bold]" 318 ) 319 for snapshot in stage.demoted_snapshots: 320 display_name = self._display_name(snapshot, stage.demoted_environment_naming_info) 321 demote_tree.add(display_name) 322 323 if stage.promoted_snapshots: 324 tree.add(self._limit_tree(promote_tree)) 325 if stage.demoted_snapshots: 326 tree.add(self._limit_tree(demote_tree)) 327 return tree