sqlmesh.core.console
1from __future__ import annotations 2 3import abc 4import datetime 5import typing as t 6import unittest 7import uuid 8 9from hyperscript import h 10from rich.console import Console as RichConsole 11from rich.live import Live 12from rich.progress import ( 13 BarColumn, 14 Progress, 15 SpinnerColumn, 16 TaskID, 17 TextColumn, 18 TimeElapsedColumn, 19) 20from rich.prompt import Confirm, Prompt 21from rich.status import Status 22from rich.syntax import Syntax 23from rich.table import Table 24from rich.tree import Tree 25 26from sqlmesh.core.environment import EnvironmentNamingInfo 27from sqlmesh.core.snapshot import ( 28 Snapshot, 29 SnapshotChangeCategory, 30 SnapshotId, 31 SnapshotInfoLike, 32 start_date, 33) 34from sqlmesh.core.test import ModelTest 35from sqlmesh.utils import rich as srich 36from sqlmesh.utils.date import time_like_to_str, to_date, yesterday_ds 37 38if t.TYPE_CHECKING: 39 import ipywidgets as widgets 40 41 from sqlmesh.core.context_diff import ContextDiff 42 from sqlmesh.core.plan import Plan, PlanBuilder 43 from sqlmesh.core.table_diff import RowDiff, SchemaDiff 44 45 LayoutWidget = t.TypeVar("LayoutWidget", bound=t.Union[widgets.VBox, widgets.HBox]) 46 47 48SNAPSHOT_CHANGE_CATEGORY_STR = { 49 None: "Unknown", 50 SnapshotChangeCategory.BREAKING: "Breaking", 51 SnapshotChangeCategory.NON_BREAKING: "Non-breaking", 52 SnapshotChangeCategory.FORWARD_ONLY: "Forward-only", 53 SnapshotChangeCategory.INDIRECT_BREAKING: "Indirect Breaking", 54 SnapshotChangeCategory.INDIRECT_NON_BREAKING: "Indirect Non-breaking", 55} 56 57 58class Console(abc.ABC): 59 """Abstract base class for defining classes used for displaying information to the user and also interact 60 with them when their input is needed.""" 61 62 @abc.abstractmethod 63 def start_plan_evaluation(self, plan: Plan) -> None: 64 """Indicates that a new evaluation has begun.""" 65 66 @abc.abstractmethod 67 def stop_plan_evaluation(self) -> None: 68 """Indicates that the evaluation has ended.""" 69 70 @abc.abstractmethod 71 def start_evaluation_progress( 72 self, 73 batches: t.Dict[Snapshot, int], 74 environment_naming_info: EnvironmentNamingInfo, 75 default_catalog: t.Optional[str], 76 ) -> None: 77 """Indicates that a new snapshot evaluation progress has begun.""" 78 79 @abc.abstractmethod 80 def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: 81 """Starts the snapshot evaluation progress.""" 82 83 @abc.abstractmethod 84 def update_snapshot_evaluation_progress( 85 self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int] 86 ) -> None: 87 """Updates the snapshot evaluation progress.""" 88 89 @abc.abstractmethod 90 def stop_evaluation_progress(self, success: bool = True) -> None: 91 """Stops the snapshot evaluation progress.""" 92 93 @abc.abstractmethod 94 def start_creation_progress( 95 self, 96 total_tasks: int, 97 environment_naming_info: EnvironmentNamingInfo, 98 default_catalog: t.Optional[str], 99 ) -> None: 100 """Indicates that a new snapshot creation progress has begun.""" 101 102 @abc.abstractmethod 103 def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None: 104 """Update the snapshot creation progress.""" 105 106 @abc.abstractmethod 107 def stop_creation_progress(self, success: bool = True) -> None: 108 """Stop the snapshot creation progress.""" 109 110 @abc.abstractmethod 111 def update_cleanup_progress(self, object_name: str) -> None: 112 """Update the snapshot cleanup progress.""" 113 114 @abc.abstractmethod 115 def start_promotion_progress( 116 self, 117 total_tasks: int, 118 environment_naming_info: EnvironmentNamingInfo, 119 default_catalog: t.Optional[str], 120 ) -> None: 121 """Indicates that a new snapshot promotion progress has begun.""" 122 123 @abc.abstractmethod 124 def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None: 125 """Update the snapshot promotion progress.""" 126 127 @abc.abstractmethod 128 def stop_promotion_progress(self, success: bool = True) -> None: 129 """Stop the snapshot promotion progress.""" 130 131 @abc.abstractmethod 132 def start_migration_progress(self, total_tasks: int) -> None: 133 """Indicates that a new migration progress has begun.""" 134 135 @abc.abstractmethod 136 def update_migration_progress(self, num_tasks: int) -> None: 137 """Update the migration progress.""" 138 139 @abc.abstractmethod 140 def stop_migration_progress(self, success: bool = True) -> None: 141 """Stop the migration progress.""" 142 143 @abc.abstractmethod 144 def show_model_difference_summary( 145 self, 146 context_diff: ContextDiff, 147 environment_naming_info: EnvironmentNamingInfo, 148 default_catalog: t.Optional[str], 149 no_diff: bool = True, 150 ignored_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None, 151 ) -> None: 152 """Displays a summary of differences for the given models.""" 153 154 @abc.abstractmethod 155 def plan( 156 self, 157 plan_builder: PlanBuilder, 158 auto_apply: bool, 159 default_catalog: t.Optional[str], 160 no_diff: bool = False, 161 no_prompts: bool = False, 162 ) -> None: 163 """The main plan flow. 164 165 The console should present the user with choices on how to backfill and version the snapshots 166 of a plan. 167 168 Args: 169 plan: The plan to make choices for. 170 auto_apply: Whether to automatically apply the plan after all choices have been made. 171 no_diff: Hide text differences for changed models. 172 no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that 173 if this flag is set to true and there are uncategorized changes the plan creation will 174 fail. Default: False 175 """ 176 177 @abc.abstractmethod 178 def log_test_results( 179 self, result: unittest.result.TestResult, output: str, target_dialect: str 180 ) -> None: 181 """Display the test result and output. 182 183 Args: 184 result: The unittest test result that contains metrics like num success, fails, ect. 185 output: The generated output from the unittest. 186 target_dialect: The dialect that tests were run against. Assumes all tests run against the same dialect. 187 """ 188 189 @abc.abstractmethod 190 def show_sql(self, sql: str) -> None: 191 """Display to the user SQL.""" 192 193 @abc.abstractmethod 194 def log_status_update(self, message: str) -> None: 195 """Display general status update to the user.""" 196 197 @abc.abstractmethod 198 def log_error(self, message: str) -> None: 199 """Display error info to the user.""" 200 201 @abc.abstractmethod 202 def log_success(self, message: str) -> None: 203 """Display a general successful message to the user.""" 204 205 @abc.abstractmethod 206 def loading_start(self, message: t.Optional[str] = None) -> uuid.UUID: 207 """Starts loading and returns a unique ID that can be used to stop the loading. Optionally can display a message.""" 208 209 @abc.abstractmethod 210 def loading_stop(self, id: uuid.UUID) -> None: 211 """Stop loading for the given id.""" 212 213 @abc.abstractmethod 214 def show_schema_diff(self, schema_diff: SchemaDiff) -> None: 215 """Show table schema diff.""" 216 217 @abc.abstractmethod 218 def show_row_diff(self, row_diff: RowDiff, show_sample: bool = True) -> None: 219 """Show table summary diff.""" 220 221 222class TerminalConsole(Console): 223 """A rich based implementation of the console.""" 224 225 def __init__( 226 self, console: t.Optional[RichConsole] = None, verbose: bool = False, **kwargs: t.Any 227 ) -> None: 228 self.console: RichConsole = console or srich.console 229 230 self.evaluation_progress_live: t.Optional[Live] = None 231 self.evaluation_total_progress: t.Optional[Progress] = None 232 self.evaluation_total_task: t.Optional[TaskID] = None 233 self.evaluation_model_progress: t.Optional[Progress] = None 234 self.evaluation_model_tasks: t.Dict[str, TaskID] = {} 235 self.evaluation_model_batches: t.Dict[Snapshot, int] = {} 236 237 # Put in temporary values that are replaced when evaluating 238 self.environment_naming_info = EnvironmentNamingInfo() 239 self.default_catalog: t.Optional[str] = None 240 241 self.creation_progress: t.Optional[Progress] = None 242 self.creation_task: t.Optional[TaskID] = None 243 244 self.promotion_progress: t.Optional[Progress] = None 245 self.promotion_task: t.Optional[TaskID] = None 246 247 self.migration_progress: t.Optional[Progress] = None 248 self.migration_task: t.Optional[TaskID] = None 249 250 self.loading_status: t.Dict[uuid.UUID, Status] = {} 251 252 self.verbose = verbose 253 254 def _print(self, value: t.Any, **kwargs: t.Any) -> None: 255 self.console.print(value, **kwargs) 256 257 def _prompt(self, message: str, **kwargs: t.Any) -> t.Any: 258 return Prompt.ask(message, console=self.console, **kwargs) 259 260 def _confirm(self, message: str, **kwargs: t.Any) -> bool: 261 return Confirm.ask(message, console=self.console, **kwargs) 262 263 def start_plan_evaluation(self, plan: Plan) -> None: 264 pass 265 266 def stop_plan_evaluation(self) -> None: 267 pass 268 269 def start_evaluation_progress( 270 self, 271 batches: t.Dict[Snapshot, int], 272 environment_naming_info: EnvironmentNamingInfo, 273 default_catalog: t.Optional[str], 274 ) -> None: 275 """Indicates that a new snapshot evaluation progress has begun.""" 276 if not self.evaluation_progress_live: 277 self.evaluation_total_progress = Progress( 278 TextColumn("[bold blue]Evaluating models", justify="right"), 279 BarColumn(bar_width=40), 280 "[progress.percentage]{task.percentage:>3.1f}%", 281 "•", 282 srich.BatchColumn(), 283 "•", 284 TimeElapsedColumn(), 285 console=self.console, 286 ) 287 288 self.evaluation_model_progress = Progress( 289 TextColumn("{task.fields[view_name]}", justify="right"), 290 SpinnerColumn(spinner_name="simpleDots"), 291 console=self.console, 292 ) 293 294 progress_table = Table.grid() 295 progress_table.add_row(self.evaluation_total_progress) 296 progress_table.add_row(self.evaluation_model_progress) 297 298 self.evaluation_progress_live = Live(progress_table, refresh_per_second=10) 299 self.evaluation_progress_live.start() 300 301 self.evaluation_total_task = self.evaluation_total_progress.add_task( 302 "Evaluating models...", total=sum(batches.values()) 303 ) 304 305 self.evaluation_model_batches = batches 306 self.environment_naming_info = environment_naming_info 307 self.default_catalog = default_catalog 308 309 def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: 310 if self.evaluation_model_progress and snapshot.name not in self.evaluation_model_tasks: 311 display_name = snapshot.display_name(self.environment_naming_info, self.default_catalog) 312 self.evaluation_model_tasks[snapshot.name] = self.evaluation_model_progress.add_task( 313 f"Evaluating {display_name}...", 314 view_name=display_name, 315 total=self.evaluation_model_batches[snapshot], 316 ) 317 318 def update_snapshot_evaluation_progress( 319 self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int] 320 ) -> None: 321 """Update the snapshot evaluation progress.""" 322 if ( 323 self.evaluation_total_progress 324 and self.evaluation_model_progress 325 and self.evaluation_progress_live 326 ): 327 total_batches = self.evaluation_model_batches[snapshot] 328 329 if duration_ms: 330 self.evaluation_progress_live.console.print( 331 f"[{batch_idx + 1}/{total_batches}] {snapshot.display_name(self.environment_naming_info, self.default_catalog)} [green]evaluated[/green] in {(duration_ms / 1000.0):.2f}s" 332 ) 333 334 self.evaluation_total_progress.update( 335 self.evaluation_total_task or TaskID(0), refresh=True, advance=1 336 ) 337 338 model_task_id = self.evaluation_model_tasks[snapshot.name] 339 self.evaluation_model_progress.update(model_task_id, refresh=True, advance=1) 340 if self.evaluation_model_progress._tasks[model_task_id].completed >= total_batches: 341 self.evaluation_model_progress.remove_task(model_task_id) 342 343 def stop_evaluation_progress(self, success: bool = True) -> None: 344 """Stop the snapshot evaluation progress.""" 345 if self.evaluation_progress_live: 346 self.evaluation_progress_live.stop() 347 if success: 348 self.log_success("All model batches have been executed successfully") 349 350 self.evaluation_progress_live = None 351 self.evaluation_total_progress = None 352 self.evaluation_total_task = None 353 self.evaluation_model_progress = None 354 self.evaluation_model_tasks = {} 355 self.evaluation_model_batches = {} 356 self.environment_naming_info = EnvironmentNamingInfo() 357 self.default_catalog = None 358 359 def start_creation_progress( 360 self, 361 total_tasks: int, 362 environment_naming_info: EnvironmentNamingInfo, 363 default_catalog: t.Optional[str], 364 ) -> None: 365 """Indicates that a new creation progress has begun.""" 366 if self.creation_progress is None: 367 self.creation_progress = Progress( 368 TextColumn("[bold blue]Creating physical tables", justify="right"), 369 BarColumn(bar_width=40), 370 "[progress.percentage]{task.percentage:>3.1f}%", 371 "•", 372 srich.BatchColumn(), 373 "•", 374 TimeElapsedColumn(), 375 console=self.console, 376 ) 377 378 self.creation_progress.start() 379 self.creation_task = self.creation_progress.add_task( 380 "Creating physical tables...", 381 total=total_tasks, 382 ) 383 384 self.environment_naming_info = environment_naming_info 385 self.default_catalog = default_catalog 386 387 def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None: 388 """Update the snapshot creation progress.""" 389 if self.creation_progress is not None and self.creation_task is not None: 390 if self.verbose: 391 self.creation_progress.live.console.print( 392 f"{snapshot.display_name(self.environment_naming_info, self.default_catalog)} [green]created[/green]" 393 ) 394 self.creation_progress.update(self.creation_task, refresh=True, advance=1) 395 396 def stop_creation_progress(self, success: bool = True) -> None: 397 """Stop the snapshot creation progress.""" 398 self.creation_task = None 399 if self.creation_progress is not None: 400 self.creation_progress.stop() 401 self.creation_progress = None 402 if success: 403 self.log_success("All model versions have been created successfully") 404 405 self.environment_naming_info = EnvironmentNamingInfo() 406 self.default_catalog = None 407 408 def update_cleanup_progress(self, object_name: str) -> None: 409 """Update the snapshot cleanup progress.""" 410 self._print(f"Deleted object {object_name}") 411 412 def start_promotion_progress( 413 self, 414 total_tasks: int, 415 environment_naming_info: EnvironmentNamingInfo, 416 default_catalog: t.Optional[str], 417 ) -> None: 418 """Indicates that a new snapshot promotion progress has begun.""" 419 if self.promotion_progress is None: 420 self.promotion_progress = Progress( 421 TextColumn( 422 f"[bold blue]Virtually Updating '{environment_naming_info.name}'", 423 justify="right", 424 ), 425 BarColumn(bar_width=40), 426 "[progress.percentage]{task.percentage:>3.1f}%", 427 "•", 428 TimeElapsedColumn(), 429 console=self.console, 430 ) 431 432 self.promotion_progress.start() 433 self.promotion_task = self.promotion_progress.add_task( 434 f"Virtually Updating {environment_naming_info.name}...", 435 total=total_tasks, 436 ) 437 438 self.environment_naming_info = environment_naming_info 439 self.default_catalog = default_catalog 440 441 def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None: 442 """Update the snapshot promotion progress.""" 443 if self.promotion_progress is not None and self.promotion_task is not None: 444 if self.verbose: 445 action_str = "[green]promoted[/green]" if promoted else "[yellow]demoted[/yellow]" 446 self.promotion_progress.live.console.print( 447 f"{snapshot.display_name(self.environment_naming_info, self.default_catalog)} {action_str}" 448 ) 449 self.promotion_progress.update(self.promotion_task, refresh=True, advance=1) 450 451 def stop_promotion_progress(self, success: bool = True) -> None: 452 """Stop the snapshot promotion progress.""" 453 self.promotion_task = None 454 if self.promotion_progress is not None: 455 self.promotion_progress.stop() 456 self.promotion_progress = None 457 if success: 458 self.log_success("The target environment has been updated successfully") 459 460 self.environment_naming_info = EnvironmentNamingInfo() 461 self.default_catalog = None 462 463 def start_migration_progress(self, total_tasks: int) -> None: 464 """Indicates that a new migration progress has begun.""" 465 if self.migration_progress is None: 466 self.migration_progress = Progress( 467 TextColumn("[bold blue]Migrating snapshots", justify="right"), 468 BarColumn(bar_width=40), 469 "[progress.percentage]{task.percentage:>3.1f}%", 470 "•", 471 srich.BatchColumn(), 472 "•", 473 TimeElapsedColumn(), 474 console=self.console, 475 ) 476 477 self.migration_progress.start() 478 self.migration_task = self.migration_progress.add_task( 479 "Migrating snapshots...", 480 total=total_tasks, 481 ) 482 483 def update_migration_progress(self, num_tasks: int) -> None: 484 """Update the migration progress.""" 485 if self.migration_progress is not None and self.migration_task is not None: 486 self.migration_progress.update(self.migration_task, refresh=True, advance=num_tasks) 487 488 def stop_migration_progress(self, success: bool = True) -> None: 489 """Stop the migration progress.""" 490 self.migration_task = None 491 if self.migration_progress is not None: 492 self.migration_progress.stop() 493 self.migration_progress = None 494 if success: 495 self.log_success("The migration has been completed successfully") 496 497 def show_model_difference_summary( 498 self, 499 context_diff: ContextDiff, 500 environment_naming_info: EnvironmentNamingInfo, 501 default_catalog: t.Optional[str], 502 no_diff: bool = True, 503 ignored_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None, 504 ) -> None: 505 """Shows a summary of the differences. 506 507 Args: 508 context_diff: The context diff to use to print the summary 509 environment_naming_info: The environment naming info to reference when printing model names 510 default_catalog: The default catalog to reference when deciding to remove catalog from display names 511 no_diff: Hide the actual SQL differences. 512 ignored_snapshot_ids: A set of snapshot ids that are ignored 513 """ 514 ignored_snapshot_ids = ignored_snapshot_ids or set() 515 if context_diff.is_new_environment: 516 self._print( 517 Tree( 518 f"[bold]New environment `{context_diff.environment}` will be created from `{context_diff.create_from}`" 519 ) 520 ) 521 if not context_diff.has_snapshot_changes: 522 return 523 524 if not context_diff.has_changes: 525 self._print(Tree(f"[bold]No differences when compared to `{context_diff.environment}`")) 526 return 527 528 self._print(Tree(f"[bold]Summary of differences against `{context_diff.environment}`:")) 529 self._show_summary_tree_for( 530 context_diff, 531 "Models", 532 lambda x: x.is_model, 533 environment_naming_info, 534 default_catalog, 535 no_diff=no_diff, 536 ignored_snapshot_ids=ignored_snapshot_ids, 537 ) 538 self._show_summary_tree_for( 539 context_diff, 540 "Standalone Audits", 541 lambda x: x.is_audit, 542 environment_naming_info, 543 default_catalog, 544 no_diff=no_diff, 545 ignored_snapshot_ids=ignored_snapshot_ids, 546 ) 547 548 def plan( 549 self, 550 plan_builder: PlanBuilder, 551 auto_apply: bool, 552 default_catalog: t.Optional[str], 553 no_diff: bool = False, 554 no_prompts: bool = False, 555 ) -> None: 556 """The main plan flow. 557 558 The console should present the user with choices on how to backfill and version the snapshots 559 of a plan. 560 561 Args: 562 plan: The plan to make choices for. 563 auto_apply: Whether to automatically apply the plan after all choices have been made. 564 default_catalog: The default catalog to reference when deciding to remove catalog from display names 565 no_diff: Hide text differences for changed models. 566 no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that 567 if this flag is set to true and there are uncategorized changes the plan creation will 568 fail. Default: False 569 """ 570 self._prompt_categorize( 571 plan_builder, 572 auto_apply, 573 no_diff=no_diff, 574 no_prompts=no_prompts, 575 default_catalog=default_catalog, 576 ) 577 578 if not no_prompts: 579 self._show_options_after_categorization( 580 plan_builder, auto_apply, default_catalog=default_catalog 581 ) 582 583 if auto_apply: 584 plan_builder.apply() 585 586 def _get_ignored_tree( 587 self, 588 ignored_snapshot_ids: t.Set[SnapshotId], 589 snapshots: t.Dict[SnapshotId, Snapshot], 590 environment_naming_info: EnvironmentNamingInfo, 591 default_catalog: t.Optional[str], 592 ) -> Tree: 593 ignored = Tree("[bold][ignored]Ignored Models (Expected Plan Start):") 594 for s_id in ignored_snapshot_ids: 595 snapshot = snapshots[s_id] 596 ignored.add( 597 f"[ignored]{snapshot.display_name(environment_naming_info, default_catalog)} ({snapshot.get_latest(start_date(snapshot, snapshots.values()))})" 598 ) 599 return ignored 600 601 def _show_summary_tree_for( 602 self, 603 context_diff: ContextDiff, 604 header: str, 605 snapshot_selector: t.Callable[[SnapshotInfoLike], bool], 606 environment_naming_info: EnvironmentNamingInfo, 607 default_catalog: t.Optional[str], 608 no_diff: bool = True, 609 ignored_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None, 610 ) -> None: 611 ignored_snapshot_ids = ignored_snapshot_ids or set() 612 selected_snapshots = { 613 s_id: snapshot 614 for s_id, snapshot in context_diff.snapshots.items() 615 if snapshot_selector(snapshot) 616 } 617 selected_ignored_snapshot_ids = { 618 s_id for s_id in selected_snapshots if s_id in ignored_snapshot_ids 619 } 620 added_snapshot_ids = { 621 s_id for s_id in context_diff.added if snapshot_selector(context_diff.snapshots[s_id]) 622 } - selected_ignored_snapshot_ids 623 removed_snapshot_ids = { 624 s_id 625 for s_id, snapshot in context_diff.removed_snapshots.items() 626 if snapshot_selector(snapshot) 627 } - selected_ignored_snapshot_ids 628 modified_snapshot_ids = { 629 current_snapshot.snapshot_id 630 for _, (current_snapshot, _) in context_diff.modified_snapshots.items() 631 if snapshot_selector(current_snapshot) 632 } - selected_ignored_snapshot_ids 633 634 tree_sets = ( 635 added_snapshot_ids, 636 removed_snapshot_ids, 637 modified_snapshot_ids, 638 selected_ignored_snapshot_ids, 639 ) 640 if all(not s_ids for s_ids in tree_sets): 641 return 642 643 tree = Tree(f"[bold]{header}:") 644 if added_snapshot_ids: 645 added_tree = Tree("[bold][added]Added:") 646 for s_id in added_snapshot_ids: 647 snapshot = context_diff.snapshots[s_id] 648 added_tree.add( 649 f"[added]{snapshot.display_name(environment_naming_info, default_catalog)}" 650 ) 651 tree.add(added_tree) 652 if removed_snapshot_ids: 653 removed_tree = Tree("[bold][removed]Removed:") 654 for s_id in removed_snapshot_ids: 655 snapshot_table_info = context_diff.removed_snapshots[s_id] 656 removed_tree.add( 657 f"[removed]{snapshot_table_info.display_name(environment_naming_info, default_catalog)}" 658 ) 659 tree.add(removed_tree) 660 if modified_snapshot_ids: 661 direct = Tree("[bold][direct]Directly Modified:") 662 indirect = Tree("[bold][indirect]Indirectly Modified:") 663 metadata = Tree("[bold][metadata]Metadata Updated:") 664 for s_id in modified_snapshot_ids: 665 name = s_id.name 666 display_name = context_diff.snapshots[s_id].display_name( 667 environment_naming_info, default_catalog 668 ) 669 if context_diff.directly_modified(name): 670 direct.add( 671 f"[direct]{display_name}" 672 if no_diff 673 else Syntax(f"{display_name}\n{context_diff.text_diff(name)}", "sql") 674 ) 675 elif context_diff.indirectly_modified(name): 676 indirect.add(f"[indirect]{display_name}") 677 elif context_diff.metadata_updated(name): 678 metadata.add( 679 f"[metadata]{display_name}" 680 if no_diff 681 else Syntax(f"{display_name}", "sql", word_wrap=True) 682 ) 683 if direct.children: 684 tree.add(direct) 685 if indirect.children: 686 tree.add(indirect) 687 if metadata.children: 688 tree.add(metadata) 689 if selected_ignored_snapshot_ids: 690 tree.add( 691 self._get_ignored_tree( 692 selected_ignored_snapshot_ids, 693 selected_snapshots, 694 environment_naming_info, 695 default_catalog, 696 ) 697 ) 698 self._print(tree) 699 700 def _show_options_after_categorization( 701 self, plan_builder: PlanBuilder, auto_apply: bool, default_catalog: t.Optional[str] 702 ) -> None: 703 plan = plan_builder.build() 704 if plan.forward_only and plan.new_snapshots: 705 self._prompt_effective_from(plan_builder, auto_apply, default_catalog) 706 707 if plan.requires_backfill: 708 self._show_missing_dates(plan_builder.build(), default_catalog) 709 self._prompt_backfill(plan_builder, auto_apply, default_catalog) 710 elif plan.has_changes and not auto_apply: 711 self._prompt_promote(plan_builder) 712 elif plan.has_unmodified_unpromoted and not auto_apply: 713 self.log_status_update("\n[bold]Virtually updating unmodified models\n") 714 self._prompt_promote(plan_builder) 715 716 def _prompt_categorize( 717 self, 718 plan_builder: PlanBuilder, 719 auto_apply: bool, 720 no_diff: bool, 721 no_prompts: bool, 722 default_catalog: t.Optional[str], 723 ) -> None: 724 """Get the user's change category for the directly modified models.""" 725 plan = plan_builder.build() 726 727 self.show_model_difference_summary( 728 plan.context_diff, 729 plan.environment_naming_info, 730 default_catalog=default_catalog, 731 ignored_snapshot_ids=plan.ignored, 732 ) 733 734 if not no_diff: 735 self._show_categorized_snapshots(plan, default_catalog) 736 737 for snapshot in plan.uncategorized: 738 if not no_diff: 739 self.show_sql(plan.context_diff.text_diff(snapshot.name)) 740 tree = Tree( 741 f"[bold][direct]Directly Modified: {snapshot.display_name(plan.environment_naming_info, default_catalog)}" 742 ) 743 indirect_tree = None 744 745 for child_sid in sorted(plan.indirectly_modified.get(snapshot.snapshot_id, set())): 746 child_snapshot = plan.context_diff.snapshots[child_sid] 747 if not indirect_tree: 748 indirect_tree = Tree("[indirect]Indirectly Modified Children:") 749 tree.add(indirect_tree) 750 indirect_tree.add( 751 f"[indirect]{child_snapshot.display_name(plan.environment_naming_info, default_catalog)}" 752 ) 753 self._print(tree) 754 if not no_prompts: 755 self._get_snapshot_change_category( 756 snapshot, plan_builder, auto_apply, default_catalog 757 ) 758 759 def _show_categorized_snapshots(self, plan: Plan, default_catalog: t.Optional[str]) -> None: 760 context_diff = plan.context_diff 761 762 for snapshot in plan.categorized: 763 if not context_diff.directly_modified(snapshot.name): 764 continue 765 766 category_str = SNAPSHOT_CHANGE_CATEGORY_STR[snapshot.change_category] 767 tree = Tree( 768 f"[bold][direct]Directly Modified: {snapshot.display_name(plan.environment_naming_info, default_catalog)} ({category_str})" 769 ) 770 indirect_tree = None 771 for child_sid in sorted(plan.indirectly_modified.get(snapshot.snapshot_id, set())): 772 child_snapshot = context_diff.snapshots[child_sid] 773 if not indirect_tree: 774 indirect_tree = Tree("[indirect]Indirectly Modified Children:") 775 tree.add(indirect_tree) 776 child_category_str = SNAPSHOT_CHANGE_CATEGORY_STR[child_snapshot.change_category] 777 indirect_tree.add( 778 f"[indirect]{child_snapshot.display_name(plan.environment_naming_info, default_catalog)} ({child_category_str})" 779 ) 780 self._print(Syntax(context_diff.text_diff(snapshot.name), "sql", word_wrap=True)) 781 self._print(tree) 782 783 def _show_missing_dates(self, plan: Plan, default_catalog: t.Optional[str]) -> None: 784 """Displays the models with missing dates.""" 785 missing_intervals = plan.missing_intervals 786 if not missing_intervals: 787 return 788 backfill = Tree("[bold]Models needing backfill (missing dates):") 789 for missing in missing_intervals: 790 snapshot = plan.context_diff.snapshots[missing.snapshot_id] 791 if not snapshot.is_model: 792 continue 793 794 preview_modifier = "" 795 if not plan.deployability_index.is_deployable(snapshot): 796 preview_modifier = " ([orange1]preview[/orange1])" 797 798 backfill.add( 799 f"{snapshot.display_name(plan.environment_naming_info, default_catalog)}: {missing.format_intervals(snapshot.node.interval_unit)}{preview_modifier}" 800 ) 801 self._print(backfill) 802 803 def _prompt_effective_from( 804 self, plan_builder: PlanBuilder, auto_apply: bool, default_catalog: t.Optional[str] 805 ) -> None: 806 if not plan_builder.build().effective_from: 807 effective_from = self._prompt( 808 "Enter the effective date (eg. '1 year', '2020-01-01') to apply forward-only changes retroactively or blank to only apply them going forward once changes are deployed to prod" 809 ) 810 if effective_from: 811 plan_builder.set_effective_from(effective_from) 812 813 def _prompt_backfill( 814 self, plan_builder: PlanBuilder, auto_apply: bool, default_catalog: t.Optional[str] 815 ) -> None: 816 plan = plan_builder.build() 817 is_forward_only_dev = plan.is_dev and plan.forward_only 818 backfill_or_preview = "preview" if is_forward_only_dev else "backfill" 819 820 if plan_builder.is_start_and_end_allowed: 821 if not plan_builder.override_start: 822 if is_forward_only_dev: 823 if plan.effective_from: 824 blank_meaning = f"to preview starting from the effective date ('{time_like_to_str(plan.effective_from)}')" 825 default_start = plan.effective_from 826 else: 827 blank_meaning = "to preview starting from yesterday" 828 default_start = yesterday_ds() 829 else: 830 if plan.provided_start: 831 blank_meaning = f"starting from '{time_like_to_str(plan.provided_start)}'" 832 else: 833 blank_meaning = "from the beginning of history" 834 default_start = None 835 836 start = self._prompt( 837 f"Enter the {backfill_or_preview} start date (eg. '1 year', '2020-01-01') or blank to backfill {blank_meaning}", 838 ) 839 if start: 840 plan_builder.set_start(start) 841 elif default_start: 842 plan_builder.set_start(default_start) 843 844 if not plan_builder.override_end: 845 if plan.provided_end: 846 blank_meaning = f"'{time_like_to_str(plan.provided_end)}'" 847 else: 848 blank_meaning = "now" 849 end = self._prompt( 850 f"Enter the {backfill_or_preview} end date (eg. '1 month ago', '2020-01-01') or blank to {backfill_or_preview} up until {blank_meaning}", 851 ) 852 if end: 853 plan_builder.set_end(end) 854 855 plan = plan_builder.build() 856 857 if plan.ignored: 858 self._print( 859 self._get_ignored_tree( 860 plan.ignored, 861 plan.context_diff.snapshots, 862 plan.environment_naming_info, 863 default_catalog, 864 ) 865 ) 866 if not auto_apply and self._confirm(f"Apply - {backfill_or_preview.capitalize()} Tables"): 867 plan_builder.apply() 868 869 def _prompt_promote(self, plan_builder: PlanBuilder) -> None: 870 if self._confirm( 871 "Apply - Virtual Update", 872 ): 873 plan_builder.apply() 874 875 def log_test_results( 876 self, result: unittest.result.TestResult, output: str, target_dialect: str 877 ) -> None: 878 divider_length = 70 879 if result.wasSuccessful(): 880 self._print("=" * divider_length) 881 self._print( 882 f"Successfully Ran {str(result.testsRun)} tests against {target_dialect}", 883 style="green", 884 ) 885 self._print("-" * divider_length) 886 else: 887 self._print("-" * divider_length) 888 self._print("Test Failure Summary") 889 self._print("=" * divider_length) 890 self._print( 891 f"Num Successful Tests: {result.testsRun - len(result.failures) - len(result.errors)}" 892 ) 893 for test, _ in result.failures + result.errors: 894 if isinstance(test, ModelTest): 895 self._print(f"Failure Test: {test.model.name} {test.test_name}") 896 self._print("=" * divider_length) 897 self._print(output) 898 899 def show_sql(self, sql: str) -> None: 900 self._print(Syntax(sql, "sql", word_wrap=True), crop=False) 901 902 def log_status_update(self, message: str) -> None: 903 self._print(message) 904 905 def log_error(self, message: str) -> None: 906 self._print(f"[red]{message}[/red]") 907 908 def log_success(self, message: str) -> None: 909 self._print(f"\n[green]{message}[/green]\n") 910 911 def loading_start(self, message: t.Optional[str] = None) -> uuid.UUID: 912 id = uuid.uuid4() 913 self.loading_status[id] = Status(message or "", console=self.console, spinner="line") 914 self.loading_status[id].start() 915 return id 916 917 def loading_stop(self, id: uuid.UUID) -> None: 918 self.loading_status[id].stop() 919 del self.loading_status[id] 920 921 def show_schema_diff(self, schema_diff: SchemaDiff) -> None: 922 source_name = schema_diff.source 923 if schema_diff.source_alias: 924 source_name = schema_diff.source_alias.upper() 925 target_name = schema_diff.target 926 if schema_diff.target_alias: 927 target_name = schema_diff.target_alias.upper() 928 929 first_line = f"\n[b]Schema Diff Between '[yellow]{source_name}[/yellow]' and '[green]{target_name}[/green]'" 930 if schema_diff.model_name: 931 first_line = ( 932 first_line + f" environments for model '[blue]{schema_diff.model_name}[/blue]'" 933 ) 934 935 tree = Tree(first_line + ":") 936 937 if any([schema_diff.added, schema_diff.removed, schema_diff.modified]): 938 if schema_diff.added: 939 added = Tree("[green]Added Columns:") 940 for c, t in schema_diff.added: 941 added.add(f"[green]{c} ({t})") 942 tree.add(added) 943 944 if schema_diff.removed: 945 removed = Tree("[red]Removed Columns:") 946 for c, t in schema_diff.removed: 947 removed.add(f"[red]{c} ({t})") 948 tree.add(removed) 949 950 if schema_diff.modified: 951 modified = Tree("[magenta]Modified Columns:") 952 for c, (ft, tt) in schema_diff.modified.items(): 953 modified.add(f"[magenta]{c} ({ft} -> {tt})") 954 tree.add(modified) 955 else: 956 tree.add("[b]Schemas match") 957 958 self.console.print(tree) 959 960 def show_row_diff(self, row_diff: RowDiff, show_sample: bool = True) -> None: 961 source_name = row_diff.source 962 if row_diff.source_alias: 963 source_name = row_diff.source_alias.upper() 964 target_name = row_diff.target 965 if row_diff.target_alias: 966 target_name = row_diff.target_alias.upper() 967 968 tree = Tree("[b]Row Counts:[/b]") 969 tree.add(f" [b][blue]COMMON[/blue]:[/b] {row_diff.join_count} rows") 970 tree.add(f" [b][yellow]{source_name} ONLY[/yellow]:[/b] {row_diff.s_only_count} rows") 971 tree.add(f" [b][green]{target_name} ONLY[/green]:[/b] {row_diff.t_only_count} rows") 972 self.console.print("\n", tree) 973 974 self.console.print("\n[b][blue]COMMON ROWS[/blue] column comparison stats:[/b]") 975 if row_diff.column_stats.shape[0] > 0: 976 self.console.print(row_diff.column_stats.to_string(index=True), end="\n\n") 977 else: 978 self.console.print(" No columns with same name and data type in both tables") 979 980 if show_sample: 981 self.console.print("\n[b][blue]COMMON ROWS[/blue] sample data differences:[/b]") 982 if row_diff.joined_sample.shape[0] > 0: 983 self.console.print(row_diff.joined_sample.to_string(index=False), end="\n\n") 984 else: 985 self.console.print(" All joined rows match") 986 987 if row_diff.s_sample.shape[0] > 0: 988 self.console.print(f"\n[b][yellow]{source_name} ONLY[/yellow] sample rows:[/b]") 989 self.console.print(row_diff.s_sample.to_string(index=False), end="\n\n") 990 991 if row_diff.t_sample.shape[0] > 0: 992 self.console.print(f"\n[b][green]{target_name} ONLY[/green] sample rows:[/b]") 993 self.console.print(row_diff.t_sample.to_string(index=False), end="\n\n") 994 995 def _get_snapshot_change_category( 996 self, 997 snapshot: Snapshot, 998 plan_builder: PlanBuilder, 999 auto_apply: bool, 1000 default_catalog: t.Optional[str], 1001 ) -> None: 1002 choices = self._snapshot_change_choices( 1003 snapshot, plan_builder.environment_naming_info, default_catalog 1004 ) 1005 response = self._prompt( 1006 "\n".join([f"[{i+1}] {choice}" for i, choice in enumerate(choices.values())]), 1007 show_choices=False, 1008 choices=[f"{i+1}" for i in range(len(choices))], 1009 ) 1010 choice = list(choices)[int(response) - 1] 1011 plan_builder.set_choice(snapshot, choice) 1012 1013 def _snapshot_change_choices( 1014 self, 1015 snapshot: Snapshot, 1016 environment_naming_info: EnvironmentNamingInfo, 1017 default_catalog: t.Optional[str], 1018 use_rich_formatting: bool = True, 1019 ) -> t.Dict[SnapshotChangeCategory, str]: 1020 direct = snapshot.display_name(environment_naming_info, default_catalog) 1021 if use_rich_formatting: 1022 direct = f"[direct]{direct}[/direct]" 1023 indirect = "indirectly modified children" 1024 if use_rich_formatting: 1025 indirect = f"[indirect]{indirect}[/indirect]" 1026 if snapshot.is_view: 1027 choices = { 1028 SnapshotChangeCategory.BREAKING: f"Update {direct} and backfill {indirect}", 1029 SnapshotChangeCategory.NON_BREAKING: f"Update {direct} but don't backfill {indirect}", 1030 } 1031 elif snapshot.is_symbolic: 1032 choices = { 1033 SnapshotChangeCategory.BREAKING: f"Backfill {indirect}", 1034 SnapshotChangeCategory.NON_BREAKING: f"Don't backfill {indirect}", 1035 } 1036 else: 1037 choices = { 1038 SnapshotChangeCategory.BREAKING: f"Backfill {direct} and {indirect}", 1039 SnapshotChangeCategory.NON_BREAKING: f"Backfill {direct} but not {indirect}", 1040 } 1041 labeled_choices = { 1042 k: f"[{SNAPSHOT_CHANGE_CATEGORY_STR[k]}] {v}" for k, v in choices.items() 1043 } 1044 return labeled_choices 1045 1046 1047def add_to_layout_widget(target_widget: LayoutWidget, *widgets: widgets.Widget) -> LayoutWidget: 1048 """Helper function to add a widget to a layout widget. 1049 1050 Args: 1051 target_widget: The layout widget to add the other widget(s) to. 1052 *widgets: The widgets to add to the layout widget. 1053 1054 Returns: 1055 The layout widget with the children added. 1056 """ 1057 target_widget.children += tuple(widgets) 1058 return target_widget 1059 1060 1061class NotebookMagicConsole(TerminalConsole): 1062 """ 1063 Console to be used when using the magic notebook interface (`%<command>`). 1064 Generally reuses the Terminal console when possible by either directly outputing what it provides 1065 or capturing it and converting it into a widget. 1066 """ 1067 1068 def __init__( 1069 self, 1070 display: t.Optional[t.Callable] = None, 1071 console: t.Optional[RichConsole] = None, 1072 **kwargs: t.Any, 1073 ) -> None: 1074 import ipywidgets as widgets 1075 from IPython import get_ipython 1076 from IPython.display import display as ipython_display 1077 1078 super().__init__(console, **kwargs) 1079 1080 self.display = display or get_ipython().user_ns.get("display", ipython_display) 1081 self.missing_dates_output = widgets.Output() 1082 self.dynamic_options_after_categorization_output = widgets.VBox() 1083 1084 def _show_missing_dates(self, plan: Plan, default_catalog: t.Optional[str]) -> None: 1085 self._add_to_dynamic_options(self.missing_dates_output) 1086 self.missing_dates_output.outputs = () 1087 with self.missing_dates_output: 1088 super()._show_missing_dates(plan, default_catalog) 1089 1090 def _apply(self, button: widgets.Button) -> None: 1091 button.disabled = True 1092 with button.output: 1093 button.plan_builder.apply() 1094 1095 def _prompt_promote(self, plan_builder: PlanBuilder) -> None: 1096 import ipywidgets as widgets 1097 1098 button = widgets.Button( 1099 description="Apply - Virtual Update", 1100 disabled=False, 1101 button_style="success", 1102 # Auto will make the button really large. 1103 # Likely changing this soon anyways to be just `Apply` with description above 1104 layout={"width": "10rem"}, 1105 ) 1106 self._add_to_dynamic_options(button) 1107 output = widgets.Output() 1108 self._add_to_dynamic_options(output) 1109 1110 button.plan_builder = plan_builder 1111 button.on_click(self._apply) 1112 button.output = output 1113 1114 def _prompt_effective_from( 1115 self, plan_builder: PlanBuilder, auto_apply: bool, default_catalog: t.Optional[str] 1116 ) -> None: 1117 import ipywidgets as widgets 1118 1119 prompt = widgets.VBox() 1120 1121 def effective_from_change_callback(change: t.Dict[str, datetime.datetime]) -> None: 1122 plan_builder.set_effective_from(change["new"]) 1123 self._show_options_after_categorization(plan_builder, auto_apply, default_catalog) 1124 1125 def going_forward_change_callback(change: t.Dict[str, bool]) -> None: 1126 checked = change["new"] 1127 plan_builder.set_effective_from(None if checked else yesterday_ds()) 1128 self._show_options_after_categorization( 1129 plan_builder, auto_apply=auto_apply, default_catalog=default_catalog 1130 ) 1131 1132 date_picker = widgets.DatePicker( 1133 disabled=plan_builder.build().effective_from is None, 1134 value=to_date(plan_builder.build().effective_from or yesterday_ds()), 1135 layout={"width": "auto"}, 1136 ) 1137 date_picker.observe(effective_from_change_callback, "value") 1138 1139 going_forward_checkbox = widgets.Checkbox( 1140 value=plan_builder.build().effective_from is None, 1141 description="Apply Going Forward Once Deployed To Prod", 1142 disabled=False, 1143 indent=False, 1144 ) 1145 going_forward_checkbox.observe(going_forward_change_callback, "value") 1146 1147 add_to_layout_widget( 1148 prompt, 1149 widgets.HBox( 1150 [ 1151 widgets.Label("Effective From Date:", layout={"width": "8rem"}), 1152 date_picker, 1153 going_forward_checkbox, 1154 ] 1155 ), 1156 ) 1157 1158 self._add_to_dynamic_options(prompt) 1159 1160 def _prompt_backfill( 1161 self, plan_builder: PlanBuilder, auto_apply: bool, default_catalog: t.Optional[str] 1162 ) -> None: 1163 import ipywidgets as widgets 1164 1165 prompt = widgets.VBox() 1166 1167 backfill_or_preview = ( 1168 "Preview" 1169 if plan_builder.build().is_dev and plan_builder.build().forward_only 1170 else "Backfill" 1171 ) 1172 1173 def _date_picker( 1174 plan_builder: PlanBuilder, value: t.Any, on_change: t.Callable, disabled: bool = False 1175 ) -> widgets.DatePicker: 1176 picker = widgets.DatePicker( 1177 disabled=disabled, 1178 value=value, 1179 layout={"width": "auto"}, 1180 ) 1181 1182 picker.observe(on_change, "value") 1183 return picker 1184 1185 def start_change_callback(change: t.Dict[str, datetime.datetime]) -> None: 1186 plan_builder.set_start(change["new"]) 1187 self._show_options_after_categorization(plan_builder, auto_apply, default_catalog) 1188 1189 def end_change_callback(change: t.Dict[str, datetime.datetime]) -> None: 1190 plan_builder.set_end(change["new"]) 1191 self._show_options_after_categorization(plan_builder, auto_apply, default_catalog) 1192 1193 if plan_builder.is_start_and_end_allowed: 1194 add_to_layout_widget( 1195 prompt, 1196 widgets.HBox( 1197 [ 1198 widgets.Label( 1199 f"Start {backfill_or_preview} Date:", layout={"width": "8rem"} 1200 ), 1201 _date_picker( 1202 plan_builder, to_date(plan_builder.build().start), start_change_callback 1203 ), 1204 ] 1205 ), 1206 ) 1207 1208 add_to_layout_widget( 1209 prompt, 1210 widgets.HBox( 1211 [ 1212 widgets.Label(f"End {backfill_or_preview} Date:", layout={"width": "8rem"}), 1213 _date_picker( 1214 plan_builder, 1215 to_date(plan_builder.build().end), 1216 end_change_callback, 1217 ), 1218 ] 1219 ), 1220 ) 1221 1222 self._add_to_dynamic_options(prompt) 1223 1224 if not auto_apply: 1225 button = widgets.Button( 1226 description=f"Apply - {backfill_or_preview} Tables", 1227 disabled=False, 1228 button_style="success", 1229 ) 1230 self._add_to_dynamic_options(button) 1231 output = widgets.Output() 1232 self._add_to_dynamic_options(output) 1233 1234 button.plan_builder = plan_builder 1235 button.on_click(self._apply) 1236 button.output = output 1237 1238 def _show_options_after_categorization( 1239 self, plan_builder: PlanBuilder, auto_apply: bool, default_catalog: t.Optional[str] 1240 ) -> None: 1241 self.dynamic_options_after_categorization_output.children = () 1242 self.display(self.dynamic_options_after_categorization_output) 1243 super()._show_options_after_categorization(plan_builder, auto_apply, default_catalog) 1244 1245 def _add_to_dynamic_options(self, *widgets: widgets.Widget) -> None: 1246 add_to_layout_widget(self.dynamic_options_after_categorization_output, *widgets) 1247 1248 def _get_snapshot_change_category( 1249 self, 1250 snapshot: Snapshot, 1251 plan_builder: PlanBuilder, 1252 auto_apply: bool, 1253 default_catalog: t.Optional[str], 1254 ) -> None: 1255 import ipywidgets as widgets 1256 1257 choice_mapping = self._snapshot_change_choices( 1258 snapshot, 1259 plan_builder.environment_naming_info, 1260 default_catalog, 1261 use_rich_formatting=False, 1262 ) 1263 choices = list(choice_mapping) 1264 plan_builder.set_choice(snapshot, choices[0]) 1265 1266 def radio_button_selected(change: t.Dict[str, t.Any]) -> None: 1267 plan_builder.set_choice(snapshot, choices[change["owner"].index]) 1268 self._show_options_after_categorization(plan_builder, auto_apply, default_catalog) 1269 1270 radio = widgets.RadioButtons( 1271 options=choice_mapping.values(), 1272 layout={"width": "max-content"}, 1273 disabled=False, 1274 ) 1275 radio.observe( 1276 radio_button_selected, 1277 "value", 1278 ) 1279 self.display(radio) 1280 1281 def log_test_results( 1282 self, result: unittest.result.TestResult, output: str, target_dialect: str 1283 ) -> None: 1284 import ipywidgets as widgets 1285 1286 divider_length = 70 1287 shared_style = { 1288 "font-size": "11px", 1289 "font-weight": "bold", 1290 "font-family": "Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace", 1291 } 1292 if result.wasSuccessful(): 1293 success_color = {"color": "#008000"} 1294 header = str(h("span", {"style": shared_style}, "-" * divider_length)) 1295 message = str( 1296 h( 1297 "span", 1298 {"style": {**shared_style, **success_color}}, 1299 f"Successfully Ran {str(result.testsRun)} Tests Against {target_dialect}", 1300 ) 1301 ) 1302 footer = str(h("span", {"style": shared_style}, "=" * divider_length)) 1303 self.display(widgets.HTML("<br>".join([header, message, footer]))) 1304 else: 1305 fail_color = {"color": "#db3737"} 1306 fail_shared_style = {**shared_style, **fail_color} 1307 header = str(h("span", {"style": fail_shared_style}, "-" * divider_length)) 1308 message = str(h("span", {"style": fail_shared_style}, "Test Failure Summary")) 1309 num_success = str( 1310 h( 1311 "span", 1312 {"style": fail_shared_style}, 1313 f"Num Successful Tests: {result.testsRun - len(result.failures) - len(result.errors)}", 1314 ) 1315 ) 1316 failure_tests = [] 1317 for test, _ in result.failures + result.errors: 1318 if isinstance(test, ModelTest): 1319 failure_tests.append( 1320 str( 1321 h( 1322 "span", 1323 {"style": fail_shared_style}, 1324 f"Failure Test: {test.model.name} {test.test_name}", 1325 ) 1326 ) 1327 ) 1328 failures = "<br>".join(failure_tests) 1329 footer = str(h("span", {"style": fail_shared_style}, "=" * divider_length)) 1330 error_output = widgets.Textarea(output, layout={"height": "300px", "width": "100%"}) 1331 test_info = widgets.HTML( 1332 "<br>".join([header, message, footer, num_success, failures, footer]) 1333 ) 1334 self.display(widgets.VBox(children=[test_info, error_output], layout={"width": "100%"})) 1335 1336 1337class CaptureTerminalConsole(TerminalConsole): 1338 """ 1339 Captures the output of the terminal console so that it can be extracted out and displayed within other interfaces. 1340 The captured output is cleared out after it is retrieved. 1341 1342 Note: `_prompt` and `_confirm` need to also be overriden to work with the custom interface if you want to use 1343 this console interactively. 1344 """ 1345 1346 def __init__(self, console: t.Optional[RichConsole] = None, **kwargs: t.Any) -> None: 1347 super().__init__(console=console, **kwargs) 1348 self._captured_outputs: t.List[str] = [] 1349 self._errors: t.List[str] = [] 1350 1351 @property 1352 def captured_output(self) -> str: 1353 return "".join(self._captured_outputs) 1354 1355 @property 1356 def captured_errors(self) -> str: 1357 return "".join(self._errors) 1358 1359 def consume_captured_output(self) -> str: 1360 output = self.captured_output 1361 self.clear_captured_outputs() 1362 return output 1363 1364 def consume_captured_errors(self) -> str: 1365 errors = self.captured_errors 1366 self.clear_captured_errors() 1367 return errors 1368 1369 def clear_captured_outputs(self) -> None: 1370 self._captured_outputs = [] 1371 1372 def clear_captured_errors(self) -> None: 1373 self._errors = [] 1374 1375 def log_error(self, message: str) -> None: 1376 self._errors.append(message) 1377 super().log_error(message) 1378 1379 def _print(self, value: t.Any, **kwargs: t.Any) -> None: 1380 with self.console.capture() as capture: 1381 self.console.print(value, **kwargs) 1382 self._captured_outputs.append(capture.get()) 1383 1384 1385class MarkdownConsole(CaptureTerminalConsole): 1386 """ 1387 A console that outputs markdown. Currently this is only configured for non-interactive use so for use cases 1388 where you want to display a plan or test results in markdown. 1389 """ 1390 1391 def show_model_difference_summary( 1392 self, 1393 context_diff: ContextDiff, 1394 environment_naming_info: EnvironmentNamingInfo, 1395 default_catalog: t.Optional[str], 1396 no_diff: bool = True, 1397 ignored_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None, 1398 ) -> None: 1399 """Shows a summary of the differences. 1400 1401 Args: 1402 context_diff: The context diff to use to print the summary. 1403 environment_naming_info: The environment naming info to reference when printing model names 1404 default_catalog: The default catalog to reference when deciding to remove catalog from display names 1405 no_diff: Hide the actual SQL differences. 1406 ignored_snapshot_ids: A set of snapshot names that are ignored 1407 """ 1408 ignored_snapshot_ids = ignored_snapshot_ids or set() 1409 if context_diff.is_new_environment: 1410 self._print( 1411 f"**New environment `{context_diff.environment}` will be created from `{context_diff.create_from}`**\n" 1412 ) 1413 if not context_diff.has_snapshot_changes: 1414 return 1415 1416 if not context_diff.has_changes: 1417 self._print(f"**No differences when compared to `{context_diff.environment}`**\n") 1418 return 1419 1420 self._print(f"**Summary of differences against `{context_diff.environment}`:**\n") 1421 1422 added_snapshots = { 1423 context_diff.snapshots[s_id] 1424 for s_id in context_diff.added 1425 if s_id not in ignored_snapshot_ids 1426 } 1427 added_snapshot_models = {s for s in added_snapshots if s.is_model} 1428 if added_snapshot_models: 1429 self._print("\n**Added Models:**") 1430 for snapshot in sorted(added_snapshot_models): 1431 self._print( 1432 f"- `{snapshot.display_name(environment_naming_info, default_catalog)}`" 1433 ) 1434 1435 added_snapshot_audits = {s for s in added_snapshots if s.is_audit} 1436 if added_snapshot_audits: 1437 self._print("\n**Added Standalone Audits:**") 1438 for snapshot in sorted(added_snapshot_audits): 1439 self._print( 1440 f"- `{snapshot.display_name(environment_naming_info, default_catalog)}`" 1441 ) 1442 1443 removed_snapshot_table_infos = { 1444 snapshot_table_info 1445 for s_id, snapshot_table_info in context_diff.removed_snapshots.items() 1446 if s_id not in ignored_snapshot_ids 1447 } 1448 removed_model_snapshot_table_infos = {s for s in removed_snapshot_table_infos if s.is_model} 1449 if removed_model_snapshot_table_infos: 1450 self._print("\n**Removed Models:**") 1451 for snapshot_table_info in sorted(removed_model_snapshot_table_infos): 1452 self._print( 1453 f"- `{snapshot_table_info.display_name(environment_naming_info, default_catalog)}`" 1454 ) 1455 1456 removed_audit_snapshot_table_infos = {s for s in removed_snapshot_table_infos if s.is_audit} 1457 if removed_audit_snapshot_table_infos: 1458 self._print("\n**Removed Standalone Audits:**") 1459 for snapshot_table_info in sorted(removed_audit_snapshot_table_infos): 1460 self._print( 1461 f"- `{snapshot_table_info.display_name(environment_naming_info, default_catalog)}`" 1462 ) 1463 1464 modified_snapshots = { 1465 current_snapshot 1466 for current_snapshot, _ in context_diff.modified_snapshots.values() 1467 if current_snapshot.snapshot_id not in ignored_snapshot_ids 1468 } 1469 if modified_snapshots: 1470 directly_modified = [] 1471 indirectly_modified = [] 1472 metadata_modified = [] 1473 for snapshot in modified_snapshots: 1474 if context_diff.directly_modified(snapshot.name): 1475 directly_modified.append(snapshot) 1476 elif context_diff.indirectly_modified(snapshot.name): 1477 indirectly_modified.append(snapshot) 1478 elif context_diff.metadata_updated(snapshot.name): 1479 metadata_modified.append(snapshot) 1480 if directly_modified: 1481 self._print("\n**Directly Modified:**") 1482 for snapshot in sorted(directly_modified): 1483 self._print( 1484 f"- `{snapshot.display_name(environment_naming_info, default_catalog)}`" 1485 ) 1486 if not no_diff: 1487 self._print(f"```diff\n{context_diff.text_diff(snapshot.name)}\n```") 1488 if indirectly_modified: 1489 self._print("\n**Indirectly Modified:**") 1490 for snapshot in sorted(indirectly_modified): 1491 self._print( 1492 f"- `{snapshot.display_name(environment_naming_info, default_catalog)}`" 1493 ) 1494 if metadata_modified: 1495 self._print("\n**Metadata Updated:**") 1496 for snapshot in sorted(metadata_modified): 1497 self._print( 1498 f"- `{snapshot.display_name(environment_naming_info, default_catalog)}`" 1499 ) 1500 if ignored_snapshot_ids: 1501 self._print("\n**Ignored Models (Expected Plan Start):**") 1502 for s_id in sorted(ignored_snapshot_ids): 1503 snapshot = context_diff.snapshots[s_id] 1504 self._print( 1505 f"- `{snapshot.display_name(environment_naming_info, default_catalog)}` ({snapshot.get_latest(start_date(snapshot, context_diff.snapshots.values()))})" 1506 ) 1507 1508 def _show_missing_dates(self, plan: Plan, default_catalog: t.Optional[str]) -> None: 1509 """Displays the models with missing dates.""" 1510 missing_intervals = plan.missing_intervals 1511 if not missing_intervals: 1512 return 1513 self._print("\n**Models needing backfill (missing dates):**") 1514 for missing in missing_intervals: 1515 snapshot = plan.context_diff.snapshots[missing.snapshot_id] 1516 if not snapshot.is_model: 1517 continue 1518 1519 preview_modifier = "" 1520 if not plan.deployability_index.is_deployable(snapshot): 1521 preview_modifier = " (**preview**)" 1522 1523 self._print( 1524 f"* `{snapshot.display_name(plan.environment_naming_info, default_catalog)}`: {missing.format_intervals(snapshot.node.interval_unit)}{preview_modifier}" 1525 ) 1526 1527 def _show_categorized_snapshots(self, plan: Plan, default_catalog: t.Optional[str]) -> None: 1528 context_diff = plan.context_diff 1529 for snapshot in plan.categorized: 1530 if not context_diff.directly_modified(snapshot.name): 1531 continue 1532 1533 category_str = SNAPSHOT_CHANGE_CATEGORY_STR[snapshot.change_category] 1534 tree = Tree( 1535 f"[bold][direct]Directly Modified: {snapshot.display_name(plan.environment_naming_info, default_catalog)} ({category_str})" 1536 ) 1537 indirect_tree = None 1538 for child_sid in sorted(plan.indirectly_modified.get(snapshot.snapshot_id, set())): 1539 child_snapshot = context_diff.snapshots[child_sid] 1540 if not indirect_tree: 1541 indirect_tree = Tree("[indirect]Indirectly Modified Children:") 1542 tree.add(indirect_tree) 1543 child_category_str = SNAPSHOT_CHANGE_CATEGORY_STR[child_snapshot.change_category] 1544 indirect_tree.add( 1545 f"[indirect]{child_snapshot.display_name(plan.environment_naming_info, default_catalog)} ({child_category_str})" 1546 ) 1547 self._print(f"```diff\n{context_diff.text_diff(snapshot.name)}\n```\n") 1548 self._print("```\n") 1549 self._print(tree) 1550 self._print("\n```") 1551 1552 def log_test_results( 1553 self, result: unittest.result.TestResult, output: str, target_dialect: str 1554 ) -> None: 1555 # import ipywidgets as widgets 1556 if result.wasSuccessful(): 1557 self._print( 1558 f"**Successfully Ran `{str(result.testsRun)}` Tests Against `{target_dialect}`**\n\n" 1559 ) 1560 else: 1561 self._print( 1562 f"**Num Successful Tests: {result.testsRun - len(result.failures) - len(result.errors)}**\n\n" 1563 ) 1564 for test, _ in result.failures + result.errors: 1565 if isinstance(test, ModelTest): 1566 self._print(f"* Failure Test: `{test.model.name}` - `{test.test_name}`\n\n") 1567 self._print(f"```{output}```\n\n") 1568 1569 def log_error(self, message: str) -> None: 1570 super().log_error(f"```\n{message}```\n\n") 1571 1572 1573class DatabricksMagicConsole(CaptureTerminalConsole): 1574 """ 1575 Note: Databricks Magic Console currently does not support progress bars while a plan is being applied. The 1576 NotebookMagicConsole does support progress bars, but they will time out after 5 minutes of execution 1577 and it makes it difficult to see the progress of the plan. 1578 """ 1579 1580 def __init__(self, *args: t.Any, **kwargs: t.Any) -> None: 1581 super().__init__(*args, **kwargs) 1582 self.evaluation_batch_progress: t.Dict[SnapshotId, t.Tuple[str, int]] = {} 1583 self.promotion_status: t.Tuple[int, int] = (0, 0) 1584 self.model_creation_status: t.Tuple[int, int] = (0, 0) 1585 self.migration_status: t.Tuple[int, int] = (0, 0) 1586 1587 def _print(self, value: t.Any, **kwargs: t.Any) -> None: 1588 super()._print(value, **kwargs) 1589 for captured_output in self._captured_outputs: 1590 print(captured_output) 1591 self.clear_captured_outputs() 1592 1593 def _prompt(self, message: str, **kwargs: t.Any) -> t.Any: 1594 self._print(message) 1595 return super()._prompt("", **kwargs) 1596 1597 def _confirm(self, message: str, **kwargs: t.Any) -> bool: 1598 message = f"{message} [y/n]" 1599 self._print(message) 1600 return super()._confirm("", **kwargs) 1601 1602 def start_evaluation_progress( 1603 self, 1604 batches: t.Dict[Snapshot, int], 1605 environment_naming_info: EnvironmentNamingInfo, 1606 default_catalog: t.Optional[str], 1607 ) -> None: 1608 self.evaluation_batches = batches 1609 self.evaluation_environment_naming_info = environment_naming_info 1610 self.default_catalog = default_catalog 1611 1612 def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: 1613 if not self.evaluation_batch_progress.get(snapshot.snapshot_id): 1614 display_name = snapshot.display_name( 1615 self.evaluation_environment_naming_info, self.default_catalog 1616 ) 1617 self.evaluation_batch_progress[snapshot.snapshot_id] = (display_name, 0) 1618 print(f"Starting '{display_name}', Total batches: {self.evaluation_batches[snapshot]}") 1619 1620 def update_snapshot_evaluation_progress( 1621 self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int] 1622 ) -> None: 1623 view_name, loaded_batches = self.evaluation_batch_progress[snapshot.snapshot_id] 1624 total_batches = self.evaluation_batches[snapshot] 1625 1626 loaded_batches += 1 1627 self.evaluation_batch_progress[snapshot.snapshot_id] = (view_name, loaded_batches) 1628 1629 finished_loading = loaded_batches == total_batches 1630 status = "Loaded" if finished_loading else "Loading" 1631 print(f"{status} '{view_name}', Completed Batches: {loaded_batches}/{total_batches}") 1632 if finished_loading: 1633 total_finished_loading = len( 1634 [ 1635 s 1636 for s, total in self.evaluation_batches.items() 1637 if self.evaluation_batch_progress.get(s.snapshot_id, (None, -1))[1] == total 1638 ] 1639 ) 1640 total = len(self.evaluation_batch_progress) 1641 print(f"Completed Loading {total_finished_loading}/{total} Models") 1642 1643 def stop_evaluation_progress(self, success: bool = True) -> None: 1644 self.evaluation_batch_progress = {} 1645 super().stop_evaluation_progress(success) 1646 print(f"Loading {'succeeded' if success else 'failed'}") 1647 1648 def start_creation_progress( 1649 self, 1650 total_tasks: int, 1651 environment_naming_info: EnvironmentNamingInfo, 1652 default_catalog: t.Optional[str], 1653 ) -> None: 1654 """Indicates that a new creation progress has begun.""" 1655 self.model_creation_status = (0, total_tasks) 1656 print("Starting Creating New Model Versions") 1657 1658 def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None: 1659 """Update the snapshot creation progress.""" 1660 num_creations, total_creations = self.model_creation_status 1661 num_creations += 1 1662 self.model_creation_status = (num_creations, total_creations) 1663 if num_creations % 5 == 0: 1664 print(f"Created New Model Versions: {num_creations}/{total_creations}") 1665 1666 def stop_creation_progress(self, success: bool = True) -> None: 1667 """Stop the snapshot creation progress.""" 1668 self.model_creation_status = (0, 0) 1669 print(f"New Model Creation {'succeeded' if success else 'failed'}") 1670 1671 def start_promotion_progress( 1672 self, 1673 total_tasks: int, 1674 environment_naming_info: EnvironmentNamingInfo, 1675 default_catalog: t.Optional[str], 1676 ) -> None: 1677 """Indicates that a new snapshot promotion progress has begun.""" 1678 self.promotion_status = (0, total_tasks) 1679 print(f"Virtually Updating '{environment_naming_info.name}'") 1680 1681 def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None: 1682 """Update the snapshot promotion progress.""" 1683 num_promotions, total_promotions = self.promotion_status 1684 num_promotions += 1 1685 self.promotion_status = (num_promotions, total_promotions) 1686 if num_promotions % 5 == 0: 1687 print(f"Virtually Updated {num_promotions}/{total_promotions}") 1688 1689 def stop_promotion_progress(self, success: bool = True) -> None: 1690 """Stop the snapshot promotion progress.""" 1691 self.promotion_status = (0, 0) 1692 print(f"Virtual Update {'succeeded' if success else 'failed'}") 1693 1694 def start_migration_progress(self, total_tasks: int) -> None: 1695 """Indicates that a new migration progress has begun.""" 1696 self.migration_status = (0, total_tasks) 1697 print("Starting Migration") 1698 1699 def update_migration_progress(self, num_tasks: int) -> None: 1700 """Update the migration progress.""" 1701 num_migrations, total_migrations = self.migration_status 1702 num_migrations += num_tasks 1703 self.migration_status = (num_migrations, total_migrations) 1704 if num_migrations % 5 == 0: 1705 print(f"Migration Updated {num_migrations}/{total_migrations}") 1706 1707 def stop_migration_progress(self, success: bool = True) -> None: 1708 """Stop the migration progress.""" 1709 self.migration_status = (0, 0) 1710 print(f"Migration {'succeeded' if success else 'failed'}") 1711 1712 1713class DebuggerTerminalConsole(TerminalConsole): 1714 """A terminal console to use while debugging with no fluff, progress bars, etc.""" 1715 1716 def __init__(self, console: t.Optional[RichConsole], *args: t.Any, **kwargs: t.Any) -> None: 1717 self.console: RichConsole = console or srich.console 1718 1719 def _write(self, msg: t.Any, *args: t.Any, **kwargs: t.Any) -> None: 1720 self.console.log(msg, *args, **kwargs) 1721 1722 def start_plan_evaluation(self, plan: Plan) -> None: 1723 self._write("Starting plan", plan.plan_id) 1724 1725 def stop_plan_evaluation(self) -> None: 1726 self._write("Stopping plan") 1727 1728 def start_evaluation_progress( 1729 self, 1730 batches: t.Dict[Snapshot, int], 1731 environment_naming_info: EnvironmentNamingInfo, 1732 default_catalog: t.Optional[str], 1733 ) -> None: 1734 self._write(f"Starting evaluation for {len(batches)} snapshots") 1735 1736 def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: 1737 self._write(f"Evaluating {snapshot.name}") 1738 1739 def update_snapshot_evaluation_progress( 1740 self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int] 1741 ) -> None: 1742 self._write(f"Evaluating {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms") 1743 1744 def stop_evaluation_progress(self, success: bool = True) -> None: 1745 self._write(f"Stopping evaluation with success={success}") 1746 1747 def start_creation_progress( 1748 self, 1749 total_tasks: int, 1750 environment_naming_info: EnvironmentNamingInfo, 1751 default_catalog: t.Optional[str], 1752 ) -> None: 1753 self._write(f"Starting creation for {total_tasks} snapshots") 1754 1755 def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None: 1756 self._write(f"Creating {snapshot.name}") 1757 1758 def stop_creation_progress(self, success: bool = True) -> None: 1759 self._write(f"Stopping creation with success={success}") 1760 1761 def update_cleanup_progress(self, object_name: str) -> None: 1762 self._write(f"Cleaning up {object_name}") 1763 1764 def start_promotion_progress( 1765 self, 1766 total_tasks: int, 1767 environment_naming_info: EnvironmentNamingInfo, 1768 default_catalog: t.Optional[str], 1769 ) -> None: 1770 self._write(f"Starting promotion for {total_tasks} snapshots") 1771 1772 def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None: 1773 self._write(f"Promoting {snapshot.name}") 1774 1775 def stop_promotion_progress(self, success: bool = True) -> None: 1776 self._write(f"Stopping promotion with success={success}") 1777 1778 def start_migration_progress(self, total_tasks: int) -> None: 1779 self._write(f"Starting migration for {total_tasks} snapshots") 1780 1781 def update_migration_progress(self, num_tasks: int) -> None: 1782 self._write(f"Migration {num_tasks}") 1783 1784 def stop_migration_progress(self, success: bool = True) -> None: 1785 self._write(f"Stopping migration with success={success}") 1786 1787 def show_model_difference_summary( 1788 self, 1789 context_diff: ContextDiff, 1790 environment_naming_info: EnvironmentNamingInfo, 1791 default_catalog: t.Optional[str], 1792 no_diff: bool = True, 1793 ignored_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None, 1794 ) -> None: 1795 self._write("Model Difference Summary:") 1796 for added in context_diff.new_snapshots: 1797 self._write(f" Added: {added}") 1798 for removed in context_diff.removed_snapshots: 1799 self._write(f" Removed: {removed}") 1800 for modified in context_diff.modified_snapshots: 1801 self._write(f" Modified: {modified}") 1802 1803 def log_test_results( 1804 self, result: unittest.result.TestResult, output: str, target_dialect: str 1805 ) -> None: 1806 self._write("Test Results:", result) 1807 1808 def show_sql(self, sql: str) -> None: 1809 self._write(sql) 1810 1811 def log_status_update(self, message: str) -> None: 1812 self._write(message, style="bold blue") 1813 1814 def log_error(self, message: str) -> None: 1815 self._write(message, style="bold red") 1816 1817 def log_success(self, message: str) -> None: 1818 self._write(message, style="bold green") 1819 1820 def loading_start(self, message: t.Optional[str] = None) -> uuid.UUID: 1821 self._write(message) 1822 return uuid.uuid4() 1823 1824 def loading_stop(self, id: uuid.UUID) -> None: 1825 self._write("Done") 1826 1827 def show_schema_diff(self, schema_diff: SchemaDiff) -> None: 1828 self._write(schema_diff) 1829 1830 def show_row_diff(self, row_diff: RowDiff, show_sample: bool = True) -> None: 1831 self._write(row_diff) 1832 1833 1834def get_console(**kwargs: t.Any) -> TerminalConsole | DatabricksMagicConsole | NotebookMagicConsole: 1835 """ 1836 Returns the console that is appropriate for the current runtime environment. 1837 1838 Note: Google Colab environment is untested and currently assumes is compatible with the base 1839 NotebookMagicConsole. 1840 """ 1841 from sqlmesh import RuntimeEnv 1842 1843 runtime_env = RuntimeEnv.get() 1844 1845 runtime_env_mapping = { 1846 RuntimeEnv.DATABRICKS: DatabricksMagicConsole, 1847 RuntimeEnv.JUPYTER: NotebookMagicConsole, 1848 RuntimeEnv.TERMINAL: TerminalConsole, 1849 RuntimeEnv.GOOGLE_COLAB: NotebookMagicConsole, 1850 RuntimeEnv.DEBUGGER: DebuggerTerminalConsole, 1851 } 1852 rich_console_kwargs: t.Dict[str, t.Any] = {"theme": srich.theme} 1853 if runtime_env.is_jupyter or runtime_env.is_google_colab: 1854 rich_console_kwargs["force_jupyter"] = True 1855 return runtime_env_mapping[runtime_env]( 1856 **{**{"console": RichConsole(**rich_console_kwargs)}, **kwargs} 1857 )
59class Console(abc.ABC): 60 """Abstract base class for defining classes used for displaying information to the user and also interact 61 with them when their input is needed.""" 62 63 @abc.abstractmethod 64 def start_plan_evaluation(self, plan: Plan) -> None: 65 """Indicates that a new evaluation has begun.""" 66 67 @abc.abstractmethod 68 def stop_plan_evaluation(self) -> None: 69 """Indicates that the evaluation has ended.""" 70 71 @abc.abstractmethod 72 def start_evaluation_progress( 73 self, 74 batches: t.Dict[Snapshot, int], 75 environment_naming_info: EnvironmentNamingInfo, 76 default_catalog: t.Optional[str], 77 ) -> None: 78 """Indicates that a new snapshot evaluation progress has begun.""" 79 80 @abc.abstractmethod 81 def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: 82 """Starts the snapshot evaluation progress.""" 83 84 @abc.abstractmethod 85 def update_snapshot_evaluation_progress( 86 self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int] 87 ) -> None: 88 """Updates the snapshot evaluation progress.""" 89 90 @abc.abstractmethod 91 def stop_evaluation_progress(self, success: bool = True) -> None: 92 """Stops the snapshot evaluation progress.""" 93 94 @abc.abstractmethod 95 def start_creation_progress( 96 self, 97 total_tasks: int, 98 environment_naming_info: EnvironmentNamingInfo, 99 default_catalog: t.Optional[str], 100 ) -> None: 101 """Indicates that a new snapshot creation progress has begun.""" 102 103 @abc.abstractmethod 104 def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None: 105 """Update the snapshot creation progress.""" 106 107 @abc.abstractmethod 108 def stop_creation_progress(self, success: bool = True) -> None: 109 """Stop the snapshot creation progress.""" 110 111 @abc.abstractmethod 112 def update_cleanup_progress(self, object_name: str) -> None: 113 """Update the snapshot cleanup progress.""" 114 115 @abc.abstractmethod 116 def start_promotion_progress( 117 self, 118 total_tasks: int, 119 environment_naming_info: EnvironmentNamingInfo, 120 default_catalog: t.Optional[str], 121 ) -> None: 122 """Indicates that a new snapshot promotion progress has begun.""" 123 124 @abc.abstractmethod 125 def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None: 126 """Update the snapshot promotion progress.""" 127 128 @abc.abstractmethod 129 def stop_promotion_progress(self, success: bool = True) -> None: 130 """Stop the snapshot promotion progress.""" 131 132 @abc.abstractmethod 133 def start_migration_progress(self, total_tasks: int) -> None: 134 """Indicates that a new migration progress has begun.""" 135 136 @abc.abstractmethod 137 def update_migration_progress(self, num_tasks: int) -> None: 138 """Update the migration progress.""" 139 140 @abc.abstractmethod 141 def stop_migration_progress(self, success: bool = True) -> None: 142 """Stop the migration progress.""" 143 144 @abc.abstractmethod 145 def show_model_difference_summary( 146 self, 147 context_diff: ContextDiff, 148 environment_naming_info: EnvironmentNamingInfo, 149 default_catalog: t.Optional[str], 150 no_diff: bool = True, 151 ignored_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None, 152 ) -> None: 153 """Displays a summary of differences for the given models.""" 154 155 @abc.abstractmethod 156 def plan( 157 self, 158 plan_builder: PlanBuilder, 159 auto_apply: bool, 160 default_catalog: t.Optional[str], 161 no_diff: bool = False, 162 no_prompts: bool = False, 163 ) -> None: 164 """The main plan flow. 165 166 The console should present the user with choices on how to backfill and version the snapshots 167 of a plan. 168 169 Args: 170 plan: The plan to make choices for. 171 auto_apply: Whether to automatically apply the plan after all choices have been made. 172 no_diff: Hide text differences for changed models. 173 no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that 174 if this flag is set to true and there are uncategorized changes the plan creation will 175 fail. Default: False 176 """ 177 178 @abc.abstractmethod 179 def log_test_results( 180 self, result: unittest.result.TestResult, output: str, target_dialect: str 181 ) -> None: 182 """Display the test result and output. 183 184 Args: 185 result: The unittest test result that contains metrics like num success, fails, ect. 186 output: The generated output from the unittest. 187 target_dialect: The dialect that tests were run against. Assumes all tests run against the same dialect. 188 """ 189 190 @abc.abstractmethod 191 def show_sql(self, sql: str) -> None: 192 """Display to the user SQL.""" 193 194 @abc.abstractmethod 195 def log_status_update(self, message: str) -> None: 196 """Display general status update to the user.""" 197 198 @abc.abstractmethod 199 def log_error(self, message: str) -> None: 200 """Display error info to the user.""" 201 202 @abc.abstractmethod 203 def log_success(self, message: str) -> None: 204 """Display a general successful message to the user.""" 205 206 @abc.abstractmethod 207 def loading_start(self, message: t.Optional[str] = None) -> uuid.UUID: 208 """Starts loading and returns a unique ID that can be used to stop the loading. Optionally can display a message.""" 209 210 @abc.abstractmethod 211 def loading_stop(self, id: uuid.UUID) -> None: 212 """Stop loading for the given id.""" 213 214 @abc.abstractmethod 215 def show_schema_diff(self, schema_diff: SchemaDiff) -> None: 216 """Show table schema diff.""" 217 218 @abc.abstractmethod 219 def show_row_diff(self, row_diff: RowDiff, show_sample: bool = True) -> None: 220 """Show table summary diff."""
Abstract base class for defining classes used for displaying information to the user and also interact with them when their input is needed.
63 @abc.abstractmethod 64 def start_plan_evaluation(self, plan: Plan) -> None: 65 """Indicates that a new evaluation has begun."""
Indicates that a new evaluation has begun.
67 @abc.abstractmethod 68 def stop_plan_evaluation(self) -> None: 69 """Indicates that the evaluation has ended."""
Indicates that the evaluation has ended.
71 @abc.abstractmethod 72 def start_evaluation_progress( 73 self, 74 batches: t.Dict[Snapshot, int], 75 environment_naming_info: EnvironmentNamingInfo, 76 default_catalog: t.Optional[str], 77 ) -> None: 78 """Indicates that a new snapshot evaluation progress has begun."""
Indicates that a new snapshot evaluation progress has begun.
80 @abc.abstractmethod 81 def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: 82 """Starts the snapshot evaluation progress."""
Starts the snapshot evaluation progress.
84 @abc.abstractmethod 85 def update_snapshot_evaluation_progress( 86 self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int] 87 ) -> None: 88 """Updates the snapshot evaluation progress."""
Updates the snapshot evaluation progress.
90 @abc.abstractmethod 91 def stop_evaluation_progress(self, success: bool = True) -> None: 92 """Stops the snapshot evaluation progress."""
Stops the snapshot evaluation progress.
94 @abc.abstractmethod 95 def start_creation_progress( 96 self, 97 total_tasks: int, 98 environment_naming_info: EnvironmentNamingInfo, 99 default_catalog: t.Optional[str], 100 ) -> None: 101 """Indicates that a new snapshot creation progress has begun."""
Indicates that a new snapshot creation progress has begun.
103 @abc.abstractmethod 104 def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None: 105 """Update the snapshot creation progress."""
Update the snapshot creation progress.
107 @abc.abstractmethod 108 def stop_creation_progress(self, success: bool = True) -> None: 109 """Stop the snapshot creation progress."""
Stop the snapshot creation progress.
111 @abc.abstractmethod 112 def update_cleanup_progress(self, object_name: str) -> None: 113 """Update the snapshot cleanup progress."""
Update the snapshot cleanup progress.
115 @abc.abstractmethod 116 def start_promotion_progress( 117 self, 118 total_tasks: int, 119 environment_naming_info: EnvironmentNamingInfo, 120 default_catalog: t.Optional[str], 121 ) -> None: 122 """Indicates that a new snapshot promotion progress has begun."""
Indicates that a new snapshot promotion progress has begun.
124 @abc.abstractmethod 125 def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None: 126 """Update the snapshot promotion progress."""
Update the snapshot promotion progress.
128 @abc.abstractmethod 129 def stop_promotion_progress(self, success: bool = True) -> None: 130 """Stop the snapshot promotion progress."""
Stop the snapshot promotion progress.
132 @abc.abstractmethod 133 def start_migration_progress(self, total_tasks: int) -> None: 134 """Indicates that a new migration progress has begun."""
Indicates that a new migration progress has begun.
136 @abc.abstractmethod 137 def update_migration_progress(self, num_tasks: int) -> None: 138 """Update the migration progress."""
Update the migration progress.
140 @abc.abstractmethod 141 def stop_migration_progress(self, success: bool = True) -> None: 142 """Stop the migration progress."""
Stop the migration progress.
144 @abc.abstractmethod 145 def show_model_difference_summary( 146 self, 147 context_diff: ContextDiff, 148 environment_naming_info: EnvironmentNamingInfo, 149 default_catalog: t.Optional[str], 150 no_diff: bool = True, 151 ignored_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None, 152 ) -> None: 153 """Displays a summary of differences for the given models."""
Displays a summary of differences for the given models.
155 @abc.abstractmethod 156 def plan( 157 self, 158 plan_builder: PlanBuilder, 159 auto_apply: bool, 160 default_catalog: t.Optional[str], 161 no_diff: bool = False, 162 no_prompts: bool = False, 163 ) -> None: 164 """The main plan flow. 165 166 The console should present the user with choices on how to backfill and version the snapshots 167 of a plan. 168 169 Args: 170 plan: The plan to make choices for. 171 auto_apply: Whether to automatically apply the plan after all choices have been made. 172 no_diff: Hide text differences for changed models. 173 no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that 174 if this flag is set to true and there are uncategorized changes the plan creation will 175 fail. Default: False 176 """
The main plan flow.
The console should present the user with choices on how to backfill and version the snapshots of a plan.
Arguments:
- plan: The plan to make choices for.
- auto_apply: Whether to automatically apply the plan after all choices have been made.
- no_diff: Hide text differences for changed models.
- no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that if this flag is set to true and there are uncategorized changes the plan creation will fail. Default: False
178 @abc.abstractmethod 179 def log_test_results( 180 self, result: unittest.result.TestResult, output: str, target_dialect: str 181 ) -> None: 182 """Display the test result and output. 183 184 Args: 185 result: The unittest test result that contains metrics like num success, fails, ect. 186 output: The generated output from the unittest. 187 target_dialect: The dialect that tests were run against. Assumes all tests run against the same dialect. 188 """
Display the test result and output.
Arguments:
- result: The unittest test result that contains metrics like num success, fails, ect.
- output: The generated output from the unittest.
- target_dialect: The dialect that tests were run against. Assumes all tests run against the same dialect.
190 @abc.abstractmethod 191 def show_sql(self, sql: str) -> None: 192 """Display to the user SQL."""
Display to the user SQL.
194 @abc.abstractmethod 195 def log_status_update(self, message: str) -> None: 196 """Display general status update to the user."""
Display general status update to the user.
198 @abc.abstractmethod 199 def log_error(self, message: str) -> None: 200 """Display error info to the user."""
Display error info to the user.
202 @abc.abstractmethod 203 def log_success(self, message: str) -> None: 204 """Display a general successful message to the user."""
Display a general successful message to the user.
206 @abc.abstractmethod 207 def loading_start(self, message: t.Optional[str] = None) -> uuid.UUID: 208 """Starts loading and returns a unique ID that can be used to stop the loading. Optionally can display a message."""
Starts loading and returns a unique ID that can be used to stop the loading. Optionally can display a message.
210 @abc.abstractmethod 211 def loading_stop(self, id: uuid.UUID) -> None: 212 """Stop loading for the given id."""
Stop loading for the given id.
214 @abc.abstractmethod 215 def show_schema_diff(self, schema_diff: SchemaDiff) -> None: 216 """Show table schema diff."""
Show table schema diff.
218 @abc.abstractmethod 219 def show_row_diff(self, row_diff: RowDiff, show_sample: bool = True) -> None: 220 """Show table summary diff."""
Show table summary diff.
223class TerminalConsole(Console): 224 """A rich based implementation of the console.""" 225 226 def __init__( 227 self, console: t.Optional[RichConsole] = None, verbose: bool = False, **kwargs: t.Any 228 ) -> None: 229 self.console: RichConsole = console or srich.console 230 231 self.evaluation_progress_live: t.Optional[Live] = None 232 self.evaluation_total_progress: t.Optional[Progress] = None 233 self.evaluation_total_task: t.Optional[TaskID] = None 234 self.evaluation_model_progress: t.Optional[Progress] = None 235 self.evaluation_model_tasks: t.Dict[str, TaskID] = {} 236 self.evaluation_model_batches: t.Dict[Snapshot, int] = {} 237 238 # Put in temporary values that are replaced when evaluating 239 self.environment_naming_info = EnvironmentNamingInfo() 240 self.default_catalog: t.Optional[str] = None 241 242 self.creation_progress: t.Optional[Progress] = None 243 self.creation_task: t.Optional[TaskID] = None 244 245 self.promotion_progress: t.Optional[Progress] = None 246 self.promotion_task: t.Optional[TaskID] = None 247 248 self.migration_progress: t.Optional[Progress] = None 249 self.migration_task: t.Optional[TaskID] = None 250 251 self.loading_status: t.Dict[uuid.UUID, Status] = {} 252 253 self.verbose = verbose 254 255 def _print(self, value: t.Any, **kwargs: t.Any) -> None: 256 self.console.print(value, **kwargs) 257 258 def _prompt(self, message: str, **kwargs: t.Any) -> t.Any: 259 return Prompt.ask(message, console=self.console, **kwargs) 260 261 def _confirm(self, message: str, **kwargs: t.Any) -> bool: 262 return Confirm.ask(message, console=self.console, **kwargs) 263 264 def start_plan_evaluation(self, plan: Plan) -> None: 265 pass 266 267 def stop_plan_evaluation(self) -> None: 268 pass 269 270 def start_evaluation_progress( 271 self, 272 batches: t.Dict[Snapshot, int], 273 environment_naming_info: EnvironmentNamingInfo, 274 default_catalog: t.Optional[str], 275 ) -> None: 276 """Indicates that a new snapshot evaluation progress has begun.""" 277 if not self.evaluation_progress_live: 278 self.evaluation_total_progress = Progress( 279 TextColumn("[bold blue]Evaluating models", justify="right"), 280 BarColumn(bar_width=40), 281 "[progress.percentage]{task.percentage:>3.1f}%", 282 "•", 283 srich.BatchColumn(), 284 "•", 285 TimeElapsedColumn(), 286 console=self.console, 287 ) 288 289 self.evaluation_model_progress = Progress( 290 TextColumn("{task.fields[view_name]}", justify="right"), 291 SpinnerColumn(spinner_name="simpleDots"), 292 console=self.console, 293 ) 294 295 progress_table = Table.grid() 296 progress_table.add_row(self.evaluation_total_progress) 297 progress_table.add_row(self.evaluation_model_progress) 298 299 self.evaluation_progress_live = Live(progress_table, refresh_per_second=10) 300 self.evaluation_progress_live.start() 301 302 self.evaluation_total_task = self.evaluation_total_progress.add_task( 303 "Evaluating models...", total=sum(batches.values()) 304 ) 305 306 self.evaluation_model_batches = batches 307 self.environment_naming_info = environment_naming_info 308 self.default_catalog = default_catalog 309 310 def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: 311 if self.evaluation_model_progress and snapshot.name not in self.evaluation_model_tasks: 312 display_name = snapshot.display_name(self.environment_naming_info, self.default_catalog) 313 self.evaluation_model_tasks[snapshot.name] = self.evaluation_model_progress.add_task( 314 f"Evaluating {display_name}...", 315 view_name=display_name, 316 total=self.evaluation_model_batches[snapshot], 317 ) 318 319 def update_snapshot_evaluation_progress( 320 self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int] 321 ) -> None: 322 """Update the snapshot evaluation progress.""" 323 if ( 324 self.evaluation_total_progress 325 and self.evaluation_model_progress 326 and self.evaluation_progress_live 327 ): 328 total_batches = self.evaluation_model_batches[snapshot] 329 330 if duration_ms: 331 self.evaluation_progress_live.console.print( 332 f"[{batch_idx + 1}/{total_batches}] {snapshot.display_name(self.environment_naming_info, self.default_catalog)} [green]evaluated[/green] in {(duration_ms / 1000.0):.2f}s" 333 ) 334 335 self.evaluation_total_progress.update( 336 self.evaluation_total_task or TaskID(0), refresh=True, advance=1 337 ) 338 339 model_task_id = self.evaluation_model_tasks[snapshot.name] 340 self.evaluation_model_progress.update(model_task_id, refresh=True, advance=1) 341 if self.evaluation_model_progress._tasks[model_task_id].completed >= total_batches: 342 self.evaluation_model_progress.remove_task(model_task_id) 343 344 def stop_evaluation_progress(self, success: bool = True) -> None: 345 """Stop the snapshot evaluation progress.""" 346 if self.evaluation_progress_live: 347 self.evaluation_progress_live.stop() 348 if success: 349 self.log_success("All model batches have been executed successfully") 350 351 self.evaluation_progress_live = None 352 self.evaluation_total_progress = None 353 self.evaluation_total_task = None 354 self.evaluation_model_progress = None 355 self.evaluation_model_tasks = {} 356 self.evaluation_model_batches = {} 357 self.environment_naming_info = EnvironmentNamingInfo() 358 self.default_catalog = None 359 360 def start_creation_progress( 361 self, 362 total_tasks: int, 363 environment_naming_info: EnvironmentNamingInfo, 364 default_catalog: t.Optional[str], 365 ) -> None: 366 """Indicates that a new creation progress has begun.""" 367 if self.creation_progress is None: 368 self.creation_progress = Progress( 369 TextColumn("[bold blue]Creating physical tables", justify="right"), 370 BarColumn(bar_width=40), 371 "[progress.percentage]{task.percentage:>3.1f}%", 372 "•", 373 srich.BatchColumn(), 374 "•", 375 TimeElapsedColumn(), 376 console=self.console, 377 ) 378 379 self.creation_progress.start() 380 self.creation_task = self.creation_progress.add_task( 381 "Creating physical tables...", 382 total=total_tasks, 383 ) 384 385 self.environment_naming_info = environment_naming_info 386 self.default_catalog = default_catalog 387 388 def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None: 389 """Update the snapshot creation progress.""" 390 if self.creation_progress is not None and self.creation_task is not None: 391 if self.verbose: 392 self.creation_progress.live.console.print( 393 f"{snapshot.display_name(self.environment_naming_info, self.default_catalog)} [green]created[/green]" 394 ) 395 self.creation_progress.update(self.creation_task, refresh=True, advance=1) 396 397 def stop_creation_progress(self, success: bool = True) -> None: 398 """Stop the snapshot creation progress.""" 399 self.creation_task = None 400 if self.creation_progress is not None: 401 self.creation_progress.stop() 402 self.creation_progress = None 403 if success: 404 self.log_success("All model versions have been created successfully") 405 406 self.environment_naming_info = EnvironmentNamingInfo() 407 self.default_catalog = None 408 409 def update_cleanup_progress(self, object_name: str) -> None: 410 """Update the snapshot cleanup progress.""" 411 self._print(f"Deleted object {object_name}") 412 413 def start_promotion_progress( 414 self, 415 total_tasks: int, 416 environment_naming_info: EnvironmentNamingInfo, 417 default_catalog: t.Optional[str], 418 ) -> None: 419 """Indicates that a new snapshot promotion progress has begun.""" 420 if self.promotion_progress is None: 421 self.promotion_progress = Progress( 422 TextColumn( 423 f"[bold blue]Virtually Updating '{environment_naming_info.name}'", 424 justify="right", 425 ), 426 BarColumn(bar_width=40), 427 "[progress.percentage]{task.percentage:>3.1f}%", 428 "•", 429 TimeElapsedColumn(), 430 console=self.console, 431 ) 432 433 self.promotion_progress.start() 434 self.promotion_task = self.promotion_progress.add_task( 435 f"Virtually Updating {environment_naming_info.name}...", 436 total=total_tasks, 437 ) 438 439 self.environment_naming_info = environment_naming_info 440 self.default_catalog = default_catalog 441 442 def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None: 443 """Update the snapshot promotion progress.""" 444 if self.promotion_progress is not None and self.promotion_task is not None: 445 if self.verbose: 446 action_str = "[green]promoted[/green]" if promoted else "[yellow]demoted[/yellow]" 447 self.promotion_progress.live.console.print( 448 f"{snapshot.display_name(self.environment_naming_info, self.default_catalog)} {action_str}" 449 ) 450 self.promotion_progress.update(self.promotion_task, refresh=True, advance=1) 451 452 def stop_promotion_progress(self, success: bool = True) -> None: 453 """Stop the snapshot promotion progress.""" 454 self.promotion_task = None 455 if self.promotion_progress is not None: 456 self.promotion_progress.stop() 457 self.promotion_progress = None 458 if success: 459 self.log_success("The target environment has been updated successfully") 460 461 self.environment_naming_info = EnvironmentNamingInfo() 462 self.default_catalog = None 463 464 def start_migration_progress(self, total_tasks: int) -> None: 465 """Indicates that a new migration progress has begun.""" 466 if self.migration_progress is None: 467 self.migration_progress = Progress( 468 TextColumn("[bold blue]Migrating snapshots", justify="right"), 469 BarColumn(bar_width=40), 470 "[progress.percentage]{task.percentage:>3.1f}%", 471 "•", 472 srich.BatchColumn(), 473 "•", 474 TimeElapsedColumn(), 475 console=self.console, 476 ) 477 478 self.migration_progress.start() 479 self.migration_task = self.migration_progress.add_task( 480 "Migrating snapshots...", 481 total=total_tasks, 482 ) 483 484 def update_migration_progress(self, num_tasks: int) -> None: 485 """Update the migration progress.""" 486 if self.migration_progress is not None and self.migration_task is not None: 487 self.migration_progress.update(self.migration_task, refresh=True, advance=num_tasks) 488 489 def stop_migration_progress(self, success: bool = True) -> None: 490 """Stop the migration progress.""" 491 self.migration_task = None 492 if self.migration_progress is not None: 493 self.migration_progress.stop() 494 self.migration_progress = None 495 if success: 496 self.log_success("The migration has been completed successfully") 497 498 def show_model_difference_summary( 499 self, 500 context_diff: ContextDiff, 501 environment_naming_info: EnvironmentNamingInfo, 502 default_catalog: t.Optional[str], 503 no_diff: bool = True, 504 ignored_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None, 505 ) -> None: 506 """Shows a summary of the differences. 507 508 Args: 509 context_diff: The context diff to use to print the summary 510 environment_naming_info: The environment naming info to reference when printing model names 511 default_catalog: The default catalog to reference when deciding to remove catalog from display names 512 no_diff: Hide the actual SQL differences. 513 ignored_snapshot_ids: A set of snapshot ids that are ignored 514 """ 515 ignored_snapshot_ids = ignored_snapshot_ids or set() 516 if context_diff.is_new_environment: 517 self._print( 518 Tree( 519 f"[bold]New environment `{context_diff.environment}` will be created from `{context_diff.create_from}`" 520 ) 521 ) 522 if not context_diff.has_snapshot_changes: 523 return 524 525 if not context_diff.has_changes: 526 self._print(Tree(f"[bold]No differences when compared to `{context_diff.environment}`")) 527 return 528 529 self._print(Tree(f"[bold]Summary of differences against `{context_diff.environment}`:")) 530 self._show_summary_tree_for( 531 context_diff, 532 "Models", 533 lambda x: x.is_model, 534 environment_naming_info, 535 default_catalog, 536 no_diff=no_diff, 537 ignored_snapshot_ids=ignored_snapshot_ids, 538 ) 539 self._show_summary_tree_for( 540 context_diff, 541 "Standalone Audits", 542 lambda x: x.is_audit, 543 environment_naming_info, 544 default_catalog, 545 no_diff=no_diff, 546 ignored_snapshot_ids=ignored_snapshot_ids, 547 ) 548 549 def plan( 550 self, 551 plan_builder: PlanBuilder, 552 auto_apply: bool, 553 default_catalog: t.Optional[str], 554 no_diff: bool = False, 555 no_prompts: bool = False, 556 ) -> None: 557 """The main plan flow. 558 559 The console should present the user with choices on how to backfill and version the snapshots 560 of a plan. 561 562 Args: 563 plan: The plan to make choices for. 564 auto_apply: Whether to automatically apply the plan after all choices have been made. 565 default_catalog: The default catalog to reference when deciding to remove catalog from display names 566 no_diff: Hide text differences for changed models. 567 no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that 568 if this flag is set to true and there are uncategorized changes the plan creation will 569 fail. Default: False 570 """ 571 self._prompt_categorize( 572 plan_builder, 573 auto_apply, 574 no_diff=no_diff, 575 no_prompts=no_prompts, 576 default_catalog=default_catalog, 577 ) 578 579 if not no_prompts: 580 self._show_options_after_categorization( 581 plan_builder, auto_apply, default_catalog=default_catalog 582 ) 583 584 if auto_apply: 585 plan_builder.apply() 586 587 def _get_ignored_tree( 588 self, 589 ignored_snapshot_ids: t.Set[SnapshotId], 590 snapshots: t.Dict[SnapshotId, Snapshot], 591 environment_naming_info: EnvironmentNamingInfo, 592 default_catalog: t.Optional[str], 593 ) -> Tree: 594 ignored = Tree("[bold][ignored]Ignored Models (Expected Plan Start):") 595 for s_id in ignored_snapshot_ids: 596 snapshot = snapshots[s_id] 597 ignored.add( 598 f"[ignored]{snapshot.display_name(environment_naming_info, default_catalog)} ({snapshot.get_latest(start_date(snapshot, snapshots.values()))})" 599 ) 600 return ignored 601 602 def _show_summary_tree_for( 603 self, 604 context_diff: ContextDiff, 605 header: str, 606 snapshot_selector: t.Callable[[SnapshotInfoLike], bool], 607 environment_naming_info: EnvironmentNamingInfo, 608 default_catalog: t.Optional[str], 609 no_diff: bool = True, 610 ignored_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None, 611 ) -> None: 612 ignored_snapshot_ids = ignored_snapshot_ids or set() 613 selected_snapshots = { 614 s_id: snapshot 615 for s_id, snapshot in context_diff.snapshots.items() 616 if snapshot_selector(snapshot) 617 } 618 selected_ignored_snapshot_ids = { 619 s_id for s_id in selected_snapshots if s_id in ignored_snapshot_ids 620 } 621 added_snapshot_ids = { 622 s_id for s_id in context_diff.added if snapshot_selector(context_diff.snapshots[s_id]) 623 } - selected_ignored_snapshot_ids 624 removed_snapshot_ids = { 625 s_id 626 for s_id, snapshot in context_diff.removed_snapshots.items() 627 if snapshot_selector(snapshot) 628 } - selected_ignored_snapshot_ids 629 modified_snapshot_ids = { 630 current_snapshot.snapshot_id 631 for _, (current_snapshot, _) in context_diff.modified_snapshots.items() 632 if snapshot_selector(current_snapshot) 633 } - selected_ignored_snapshot_ids 634 635 tree_sets = ( 636 added_snapshot_ids, 637 removed_snapshot_ids, 638 modified_snapshot_ids, 639 selected_ignored_snapshot_ids, 640 ) 641 if all(not s_ids for s_ids in tree_sets): 642 return 643 644 tree = Tree(f"[bold]{header}:") 645 if added_snapshot_ids: 646 added_tree = Tree("[bold][added]Added:") 647 for s_id in added_snapshot_ids: 648 snapshot = context_diff.snapshots[s_id] 649 added_tree.add( 650 f"[added]{snapshot.display_name(environment_naming_info, default_catalog)}" 651 ) 652 tree.add(added_tree) 653 if removed_snapshot_ids: 654 removed_tree = Tree("[bold][removed]Removed:") 655 for s_id in removed_snapshot_ids: 656 snapshot_table_info = context_diff.removed_snapshots[s_id] 657 removed_tree.add( 658 f"[removed]{snapshot_table_info.display_name(environment_naming_info, default_catalog)}" 659 ) 660 tree.add(removed_tree) 661 if modified_snapshot_ids: 662 direct = Tree("[bold][direct]Directly Modified:") 663 indirect = Tree("[bold][indirect]Indirectly Modified:") 664 metadata = Tree("[bold][metadata]Metadata Updated:") 665 for s_id in modified_snapshot_ids: 666 name = s_id.name 667 display_name = context_diff.snapshots[s_id].display_name( 668 environment_naming_info, default_catalog 669 ) 670 if context_diff.directly_modified(name): 671 direct.add( 672 f"[direct]{display_name}" 673 if no_diff 674 else Syntax(f"{display_name}\n{context_diff.text_diff(name)}", "sql") 675 ) 676 elif context_diff.indirectly_modified(name): 677 indirect.add(f"[indirect]{display_name}") 678 elif context_diff.metadata_updated(name): 679 metadata.add( 680 f"[metadata]{display_name}" 681 if no_diff 682 else Syntax(f"{display_name}", "sql", word_wrap=True) 683 ) 684 if direct.children: 685 tree.add(direct) 686 if indirect.children: 687 tree.add(indirect) 688 if metadata.children: 689 tree.add(metadata) 690 if selected_ignored_snapshot_ids: 691 tree.add( 692 self._get_ignored_tree( 693 selected_ignored_snapshot_ids, 694 selected_snapshots, 695 environment_naming_info, 696 default_catalog, 697 ) 698 ) 699 self._print(tree) 700 701 def _show_options_after_categorization( 702 self, plan_builder: PlanBuilder, auto_apply: bool, default_catalog: t.Optional[str] 703 ) -> None: 704 plan = plan_builder.build() 705 if plan.forward_only and plan.new_snapshots: 706 self._prompt_effective_from(plan_builder, auto_apply, default_catalog) 707 708 if plan.requires_backfill: 709 self._show_missing_dates(plan_builder.build(), default_catalog) 710 self._prompt_backfill(plan_builder, auto_apply, default_catalog) 711 elif plan.has_changes and not auto_apply: 712 self._prompt_promote(plan_builder) 713 elif plan.has_unmodified_unpromoted and not auto_apply: 714 self.log_status_update("\n[bold]Virtually updating unmodified models\n") 715 self._prompt_promote(plan_builder) 716 717 def _prompt_categorize( 718 self, 719 plan_builder: PlanBuilder, 720 auto_apply: bool, 721 no_diff: bool, 722 no_prompts: bool, 723 default_catalog: t.Optional[str], 724 ) -> None: 725 """Get the user's change category for the directly modified models.""" 726 plan = plan_builder.build() 727 728 self.show_model_difference_summary( 729 plan.context_diff, 730 plan.environment_naming_info, 731 default_catalog=default_catalog, 732 ignored_snapshot_ids=plan.ignored, 733 ) 734 735 if not no_diff: 736 self._show_categorized_snapshots(plan, default_catalog) 737 738 for snapshot in plan.uncategorized: 739 if not no_diff: 740 self.show_sql(plan.context_diff.text_diff(snapshot.name)) 741 tree = Tree( 742 f"[bold][direct]Directly Modified: {snapshot.display_name(plan.environment_naming_info, default_catalog)}" 743 ) 744 indirect_tree = None 745 746 for child_sid in sorted(plan.indirectly_modified.get(snapshot.snapshot_id, set())): 747 child_snapshot = plan.context_diff.snapshots[child_sid] 748 if not indirect_tree: 749 indirect_tree = Tree("[indirect]Indirectly Modified Children:") 750 tree.add(indirect_tree) 751 indirect_tree.add( 752 f"[indirect]{child_snapshot.display_name(plan.environment_naming_info, default_catalog)}" 753 ) 754 self._print(tree) 755 if not no_prompts: 756 self._get_snapshot_change_category( 757 snapshot, plan_builder, auto_apply, default_catalog 758 ) 759 760 def _show_categorized_snapshots(self, plan: Plan, default_catalog: t.Optional[str]) -> None: 761 context_diff = plan.context_diff 762 763 for snapshot in plan.categorized: 764 if not context_diff.directly_modified(snapshot.name): 765 continue 766 767 category_str = SNAPSHOT_CHANGE_CATEGORY_STR[snapshot.change_category] 768 tree = Tree( 769 f"[bold][direct]Directly Modified: {snapshot.display_name(plan.environment_naming_info, default_catalog)} ({category_str})" 770 ) 771 indirect_tree = None 772 for child_sid in sorted(plan.indirectly_modified.get(snapshot.snapshot_id, set())): 773 child_snapshot = context_diff.snapshots[child_sid] 774 if not indirect_tree: 775 indirect_tree = Tree("[indirect]Indirectly Modified Children:") 776 tree.add(indirect_tree) 777 child_category_str = SNAPSHOT_CHANGE_CATEGORY_STR[child_snapshot.change_category] 778 indirect_tree.add( 779 f"[indirect]{child_snapshot.display_name(plan.environment_naming_info, default_catalog)} ({child_category_str})" 780 ) 781 self._print(Syntax(context_diff.text_diff(snapshot.name), "sql", word_wrap=True)) 782 self._print(tree) 783 784 def _show_missing_dates(self, plan: Plan, default_catalog: t.Optional[str]) -> None: 785 """Displays the models with missing dates.""" 786 missing_intervals = plan.missing_intervals 787 if not missing_intervals: 788 return 789 backfill = Tree("[bold]Models needing backfill (missing dates):") 790 for missing in missing_intervals: 791 snapshot = plan.context_diff.snapshots[missing.snapshot_id] 792 if not snapshot.is_model: 793 continue 794 795 preview_modifier = "" 796 if not plan.deployability_index.is_deployable(snapshot): 797 preview_modifier = " ([orange1]preview[/orange1])" 798 799 backfill.add( 800 f"{snapshot.display_name(plan.environment_naming_info, default_catalog)}: {missing.format_intervals(snapshot.node.interval_unit)}{preview_modifier}" 801 ) 802 self._print(backfill) 803 804 def _prompt_effective_from( 805 self, plan_builder: PlanBuilder, auto_apply: bool, default_catalog: t.Optional[str] 806 ) -> None: 807 if not plan_builder.build().effective_from: 808 effective_from = self._prompt( 809 "Enter the effective date (eg. '1 year', '2020-01-01') to apply forward-only changes retroactively or blank to only apply them going forward once changes are deployed to prod" 810 ) 811 if effective_from: 812 plan_builder.set_effective_from(effective_from) 813 814 def _prompt_backfill( 815 self, plan_builder: PlanBuilder, auto_apply: bool, default_catalog: t.Optional[str] 816 ) -> None: 817 plan = plan_builder.build() 818 is_forward_only_dev = plan.is_dev and plan.forward_only 819 backfill_or_preview = "preview" if is_forward_only_dev else "backfill" 820 821 if plan_builder.is_start_and_end_allowed: 822 if not plan_builder.override_start: 823 if is_forward_only_dev: 824 if plan.effective_from: 825 blank_meaning = f"to preview starting from the effective date ('{time_like_to_str(plan.effective_from)}')" 826 default_start = plan.effective_from 827 else: 828 blank_meaning = "to preview starting from yesterday" 829 default_start = yesterday_ds() 830 else: 831 if plan.provided_start: 832 blank_meaning = f"starting from '{time_like_to_str(plan.provided_start)}'" 833 else: 834 blank_meaning = "from the beginning of history" 835 default_start = None 836 837 start = self._prompt( 838 f"Enter the {backfill_or_preview} start date (eg. '1 year', '2020-01-01') or blank to backfill {blank_meaning}", 839 ) 840 if start: 841 plan_builder.set_start(start) 842 elif default_start: 843 plan_builder.set_start(default_start) 844 845 if not plan_builder.override_end: 846 if plan.provided_end: 847 blank_meaning = f"'{time_like_to_str(plan.provided_end)}'" 848 else: 849 blank_meaning = "now" 850 end = self._prompt( 851 f"Enter the {backfill_or_preview} end date (eg. '1 month ago', '2020-01-01') or blank to {backfill_or_preview} up until {blank_meaning}", 852 ) 853 if end: 854 plan_builder.set_end(end) 855 856 plan = plan_builder.build() 857 858 if plan.ignored: 859 self._print( 860 self._get_ignored_tree( 861 plan.ignored, 862 plan.context_diff.snapshots, 863 plan.environment_naming_info, 864 default_catalog, 865 ) 866 ) 867 if not auto_apply and self._confirm(f"Apply - {backfill_or_preview.capitalize()} Tables"): 868 plan_builder.apply() 869 870 def _prompt_promote(self, plan_builder: PlanBuilder) -> None: 871 if self._confirm( 872 "Apply - Virtual Update", 873 ): 874 plan_builder.apply() 875 876 def log_test_results( 877 self, result: unittest.result.TestResult, output: str, target_dialect: str 878 ) -> None: 879 divider_length = 70 880 if result.wasSuccessful(): 881 self._print("=" * divider_length) 882 self._print( 883 f"Successfully Ran {str(result.testsRun)} tests against {target_dialect}", 884 style="green", 885 ) 886 self._print("-" * divider_length) 887 else: 888 self._print("-" * divider_length) 889 self._print("Test Failure Summary") 890 self._print("=" * divider_length) 891 self._print( 892 f"Num Successful Tests: {result.testsRun - len(result.failures) - len(result.errors)}" 893 ) 894 for test, _ in result.failures + result.errors: 895 if isinstance(test, ModelTest): 896 self._print(f"Failure Test: {test.model.name} {test.test_name}") 897 self._print("=" * divider_length) 898 self._print(output) 899 900 def show_sql(self, sql: str) -> None: 901 self._print(Syntax(sql, "sql", word_wrap=True), crop=False) 902 903 def log_status_update(self, message: str) -> None: 904 self._print(message) 905 906 def log_error(self, message: str) -> None: 907 self._print(f"[red]{message}[/red]") 908 909 def log_success(self, message: str) -> None: 910 self._print(f"\n[green]{message}[/green]\n") 911 912 def loading_start(self, message: t.Optional[str] = None) -> uuid.UUID: 913 id = uuid.uuid4() 914 self.loading_status[id] = Status(message or "", console=self.console, spinner="line") 915 self.loading_status[id].start() 916 return id 917 918 def loading_stop(self, id: uuid.UUID) -> None: 919 self.loading_status[id].stop() 920 del self.loading_status[id] 921 922 def show_schema_diff(self, schema_diff: SchemaDiff) -> None: 923 source_name = schema_diff.source 924 if schema_diff.source_alias: 925 source_name = schema_diff.source_alias.upper() 926 target_name = schema_diff.target 927 if schema_diff.target_alias: 928 target_name = schema_diff.target_alias.upper() 929 930 first_line = f"\n[b]Schema Diff Between '[yellow]{source_name}[/yellow]' and '[green]{target_name}[/green]'" 931 if schema_diff.model_name: 932 first_line = ( 933 first_line + f" environments for model '[blue]{schema_diff.model_name}[/blue]'" 934 ) 935 936 tree = Tree(first_line + ":") 937 938 if any([schema_diff.added, schema_diff.removed, schema_diff.modified]): 939 if schema_diff.added: 940 added = Tree("[green]Added Columns:") 941 for c, t in schema_diff.added: 942 added.add(f"[green]{c} ({t})") 943 tree.add(added) 944 945 if schema_diff.removed: 946 removed = Tree("[red]Removed Columns:") 947 for c, t in schema_diff.removed: 948 removed.add(f"[red]{c} ({t})") 949 tree.add(removed) 950 951 if schema_diff.modified: 952 modified = Tree("[magenta]Modified Columns:") 953 for c, (ft, tt) in schema_diff.modified.items(): 954 modified.add(f"[magenta]{c} ({ft} -> {tt})") 955 tree.add(modified) 956 else: 957 tree.add("[b]Schemas match") 958 959 self.console.print(tree) 960 961 def show_row_diff(self, row_diff: RowDiff, show_sample: bool = True) -> None: 962 source_name = row_diff.source 963 if row_diff.source_alias: 964 source_name = row_diff.source_alias.upper() 965 target_name = row_diff.target 966 if row_diff.target_alias: 967 target_name = row_diff.target_alias.upper() 968 969 tree = Tree("[b]Row Counts:[/b]") 970 tree.add(f" [b][blue]COMMON[/blue]:[/b] {row_diff.join_count} rows") 971 tree.add(f" [b][yellow]{source_name} ONLY[/yellow]:[/b] {row_diff.s_only_count} rows") 972 tree.add(f" [b][green]{target_name} ONLY[/green]:[/b] {row_diff.t_only_count} rows") 973 self.console.print("\n", tree) 974 975 self.console.print("\n[b][blue]COMMON ROWS[/blue] column comparison stats:[/b]") 976 if row_diff.column_stats.shape[0] > 0: 977 self.console.print(row_diff.column_stats.to_string(index=True), end="\n\n") 978 else: 979 self.console.print(" No columns with same name and data type in both tables") 980 981 if show_sample: 982 self.console.print("\n[b][blue]COMMON ROWS[/blue] sample data differences:[/b]") 983 if row_diff.joined_sample.shape[0] > 0: 984 self.console.print(row_diff.joined_sample.to_string(index=False), end="\n\n") 985 else: 986 self.console.print(" All joined rows match") 987 988 if row_diff.s_sample.shape[0] > 0: 989 self.console.print(f"\n[b][yellow]{source_name} ONLY[/yellow] sample rows:[/b]") 990 self.console.print(row_diff.s_sample.to_string(index=False), end="\n\n") 991 992 if row_diff.t_sample.shape[0] > 0: 993 self.console.print(f"\n[b][green]{target_name} ONLY[/green] sample rows:[/b]") 994 self.console.print(row_diff.t_sample.to_string(index=False), end="\n\n") 995 996 def _get_snapshot_change_category( 997 self, 998 snapshot: Snapshot, 999 plan_builder: PlanBuilder, 1000 auto_apply: bool, 1001 default_catalog: t.Optional[str], 1002 ) -> None: 1003 choices = self._snapshot_change_choices( 1004 snapshot, plan_builder.environment_naming_info, default_catalog 1005 ) 1006 response = self._prompt( 1007 "\n".join([f"[{i+1}] {choice}" for i, choice in enumerate(choices.values())]), 1008 show_choices=False, 1009 choices=[f"{i+1}" for i in range(len(choices))], 1010 ) 1011 choice = list(choices)[int(response) - 1] 1012 plan_builder.set_choice(snapshot, choice) 1013 1014 def _snapshot_change_choices( 1015 self, 1016 snapshot: Snapshot, 1017 environment_naming_info: EnvironmentNamingInfo, 1018 default_catalog: t.Optional[str], 1019 use_rich_formatting: bool = True, 1020 ) -> t.Dict[SnapshotChangeCategory, str]: 1021 direct = snapshot.display_name(environment_naming_info, default_catalog) 1022 if use_rich_formatting: 1023 direct = f"[direct]{direct}[/direct]" 1024 indirect = "indirectly modified children" 1025 if use_rich_formatting: 1026 indirect = f"[indirect]{indirect}[/indirect]" 1027 if snapshot.is_view: 1028 choices = { 1029 SnapshotChangeCategory.BREAKING: f"Update {direct} and backfill {indirect}", 1030 SnapshotChangeCategory.NON_BREAKING: f"Update {direct} but don't backfill {indirect}", 1031 } 1032 elif snapshot.is_symbolic: 1033 choices = { 1034 SnapshotChangeCategory.BREAKING: f"Backfill {indirect}", 1035 SnapshotChangeCategory.NON_BREAKING: f"Don't backfill {indirect}", 1036 } 1037 else: 1038 choices = { 1039 SnapshotChangeCategory.BREAKING: f"Backfill {direct} and {indirect}", 1040 SnapshotChangeCategory.NON_BREAKING: f"Backfill {direct} but not {indirect}", 1041 } 1042 labeled_choices = { 1043 k: f"[{SNAPSHOT_CHANGE_CATEGORY_STR[k]}] {v}" for k, v in choices.items() 1044 } 1045 return labeled_choices
A rich based implementation of the console.
226 def __init__( 227 self, console: t.Optional[RichConsole] = None, verbose: bool = False, **kwargs: t.Any 228 ) -> None: 229 self.console: RichConsole = console or srich.console 230 231 self.evaluation_progress_live: t.Optional[Live] = None 232 self.evaluation_total_progress: t.Optional[Progress] = None 233 self.evaluation_total_task: t.Optional[TaskID] = None 234 self.evaluation_model_progress: t.Optional[Progress] = None 235 self.evaluation_model_tasks: t.Dict[str, TaskID] = {} 236 self.evaluation_model_batches: t.Dict[Snapshot, int] = {} 237 238 # Put in temporary values that are replaced when evaluating 239 self.environment_naming_info = EnvironmentNamingInfo() 240 self.default_catalog: t.Optional[str] = None 241 242 self.creation_progress: t.Optional[Progress] = None 243 self.creation_task: t.Optional[TaskID] = None 244 245 self.promotion_progress: t.Optional[Progress] = None 246 self.promotion_task: t.Optional[TaskID] = None 247 248 self.migration_progress: t.Optional[Progress] = None 249 self.migration_task: t.Optional[TaskID] = None 250 251 self.loading_status: t.Dict[uuid.UUID, Status] = {} 252 253 self.verbose = verbose
270 def start_evaluation_progress( 271 self, 272 batches: t.Dict[Snapshot, int], 273 environment_naming_info: EnvironmentNamingInfo, 274 default_catalog: t.Optional[str], 275 ) -> None: 276 """Indicates that a new snapshot evaluation progress has begun.""" 277 if not self.evaluation_progress_live: 278 self.evaluation_total_progress = Progress( 279 TextColumn("[bold blue]Evaluating models", justify="right"), 280 BarColumn(bar_width=40), 281 "[progress.percentage]{task.percentage:>3.1f}%", 282 "•", 283 srich.BatchColumn(), 284 "•", 285 TimeElapsedColumn(), 286 console=self.console, 287 ) 288 289 self.evaluation_model_progress = Progress( 290 TextColumn("{task.fields[view_name]}", justify="right"), 291 SpinnerColumn(spinner_name="simpleDots"), 292 console=self.console, 293 ) 294 295 progress_table = Table.grid() 296 progress_table.add_row(self.evaluation_total_progress) 297 progress_table.add_row(self.evaluation_model_progress) 298 299 self.evaluation_progress_live = Live(progress_table, refresh_per_second=10) 300 self.evaluation_progress_live.start() 301 302 self.evaluation_total_task = self.evaluation_total_progress.add_task( 303 "Evaluating models...", total=sum(batches.values()) 304 ) 305 306 self.evaluation_model_batches = batches 307 self.environment_naming_info = environment_naming_info 308 self.default_catalog = default_catalog
Indicates that a new snapshot evaluation progress has begun.
310 def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: 311 if self.evaluation_model_progress and snapshot.name not in self.evaluation_model_tasks: 312 display_name = snapshot.display_name(self.environment_naming_info, self.default_catalog) 313 self.evaluation_model_tasks[snapshot.name] = self.evaluation_model_progress.add_task( 314 f"Evaluating {display_name}...", 315 view_name=display_name, 316 total=self.evaluation_model_batches[snapshot], 317 )
Starts the snapshot evaluation progress.
319 def update_snapshot_evaluation_progress( 320 self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int] 321 ) -> None: 322 """Update the snapshot evaluation progress.""" 323 if ( 324 self.evaluation_total_progress 325 and self.evaluation_model_progress 326 and self.evaluation_progress_live 327 ): 328 total_batches = self.evaluation_model_batches[snapshot] 329 330 if duration_ms: 331 self.evaluation_progress_live.console.print( 332 f"[{batch_idx + 1}/{total_batches}] {snapshot.display_name(self.environment_naming_info, self.default_catalog)} [green]evaluated[/green] in {(duration_ms / 1000.0):.2f}s" 333 ) 334 335 self.evaluation_total_progress.update( 336 self.evaluation_total_task or TaskID(0), refresh=True, advance=1 337 ) 338 339 model_task_id = self.evaluation_model_tasks[snapshot.name] 340 self.evaluation_model_progress.update(model_task_id, refresh=True, advance=1) 341 if self.evaluation_model_progress._tasks[model_task_id].completed >= total_batches: 342 self.evaluation_model_progress.remove_task(model_task_id)
Update the snapshot evaluation progress.
344 def stop_evaluation_progress(self, success: bool = True) -> None: 345 """Stop the snapshot evaluation progress.""" 346 if self.evaluation_progress_live: 347 self.evaluation_progress_live.stop() 348 if success: 349 self.log_success("All model batches have been executed successfully") 350 351 self.evaluation_progress_live = None 352 self.evaluation_total_progress = None 353 self.evaluation_total_task = None 354 self.evaluation_model_progress = None 355 self.evaluation_model_tasks = {} 356 self.evaluation_model_batches = {} 357 self.environment_naming_info = EnvironmentNamingInfo() 358 self.default_catalog = None
Stop the snapshot evaluation progress.
360 def start_creation_progress( 361 self, 362 total_tasks: int, 363 environment_naming_info: EnvironmentNamingInfo, 364 default_catalog: t.Optional[str], 365 ) -> None: 366 """Indicates that a new creation progress has begun.""" 367 if self.creation_progress is None: 368 self.creation_progress = Progress( 369 TextColumn("[bold blue]Creating physical tables", justify="right"), 370 BarColumn(bar_width=40), 371 "[progress.percentage]{task.percentage:>3.1f}%", 372 "•", 373 srich.BatchColumn(), 374 "•", 375 TimeElapsedColumn(), 376 console=self.console, 377 ) 378 379 self.creation_progress.start() 380 self.creation_task = self.creation_progress.add_task( 381 "Creating physical tables...", 382 total=total_tasks, 383 ) 384 385 self.environment_naming_info = environment_naming_info 386 self.default_catalog = default_catalog
Indicates that a new creation progress has begun.
388 def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None: 389 """Update the snapshot creation progress.""" 390 if self.creation_progress is not None and self.creation_task is not None: 391 if self.verbose: 392 self.creation_progress.live.console.print( 393 f"{snapshot.display_name(self.environment_naming_info, self.default_catalog)} [green]created[/green]" 394 ) 395 self.creation_progress.update(self.creation_task, refresh=True, advance=1)
Update the snapshot creation progress.
397 def stop_creation_progress(self, success: bool = True) -> None: 398 """Stop the snapshot creation progress.""" 399 self.creation_task = None 400 if self.creation_progress is not None: 401 self.creation_progress.stop() 402 self.creation_progress = None 403 if success: 404 self.log_success("All model versions have been created successfully") 405 406 self.environment_naming_info = EnvironmentNamingInfo() 407 self.default_catalog = None
Stop the snapshot creation progress.
409 def update_cleanup_progress(self, object_name: str) -> None: 410 """Update the snapshot cleanup progress.""" 411 self._print(f"Deleted object {object_name}")
Update the snapshot cleanup progress.
413 def start_promotion_progress( 414 self, 415 total_tasks: int, 416 environment_naming_info: EnvironmentNamingInfo, 417 default_catalog: t.Optional[str], 418 ) -> None: 419 """Indicates that a new snapshot promotion progress has begun.""" 420 if self.promotion_progress is None: 421 self.promotion_progress = Progress( 422 TextColumn( 423 f"[bold blue]Virtually Updating '{environment_naming_info.name}'", 424 justify="right", 425 ), 426 BarColumn(bar_width=40), 427 "[progress.percentage]{task.percentage:>3.1f}%", 428 "•", 429 TimeElapsedColumn(), 430 console=self.console, 431 ) 432 433 self.promotion_progress.start() 434 self.promotion_task = self.promotion_progress.add_task( 435 f"Virtually Updating {environment_naming_info.name}...", 436 total=total_tasks, 437 ) 438 439 self.environment_naming_info = environment_naming_info 440 self.default_catalog = default_catalog
Indicates that a new snapshot promotion progress has begun.
442 def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None: 443 """Update the snapshot promotion progress.""" 444 if self.promotion_progress is not None and self.promotion_task is not None: 445 if self.verbose: 446 action_str = "[green]promoted[/green]" if promoted else "[yellow]demoted[/yellow]" 447 self.promotion_progress.live.console.print( 448 f"{snapshot.display_name(self.environment_naming_info, self.default_catalog)} {action_str}" 449 ) 450 self.promotion_progress.update(self.promotion_task, refresh=True, advance=1)
Update the snapshot promotion progress.
452 def stop_promotion_progress(self, success: bool = True) -> None: 453 """Stop the snapshot promotion progress.""" 454 self.promotion_task = None 455 if self.promotion_progress is not None: 456 self.promotion_progress.stop() 457 self.promotion_progress = None 458 if success: 459 self.log_success("The target environment has been updated successfully") 460 461 self.environment_naming_info = EnvironmentNamingInfo() 462 self.default_catalog = None
Stop the snapshot promotion progress.
464 def start_migration_progress(self, total_tasks: int) -> None: 465 """Indicates that a new migration progress has begun.""" 466 if self.migration_progress is None: 467 self.migration_progress = Progress( 468 TextColumn("[bold blue]Migrating snapshots", justify="right"), 469 BarColumn(bar_width=40), 470 "[progress.percentage]{task.percentage:>3.1f}%", 471 "•", 472 srich.BatchColumn(), 473 "•", 474 TimeElapsedColumn(), 475 console=self.console, 476 ) 477 478 self.migration_progress.start() 479 self.migration_task = self.migration_progress.add_task( 480 "Migrating snapshots...", 481 total=total_tasks, 482 )
Indicates that a new migration progress has begun.
484 def update_migration_progress(self, num_tasks: int) -> None: 485 """Update the migration progress.""" 486 if self.migration_progress is not None and self.migration_task is not None: 487 self.migration_progress.update(self.migration_task, refresh=True, advance=num_tasks)
Update the migration progress.
489 def stop_migration_progress(self, success: bool = True) -> None: 490 """Stop the migration progress.""" 491 self.migration_task = None 492 if self.migration_progress is not None: 493 self.migration_progress.stop() 494 self.migration_progress = None 495 if success: 496 self.log_success("The migration has been completed successfully")
Stop the migration progress.
498 def show_model_difference_summary( 499 self, 500 context_diff: ContextDiff, 501 environment_naming_info: EnvironmentNamingInfo, 502 default_catalog: t.Optional[str], 503 no_diff: bool = True, 504 ignored_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None, 505 ) -> None: 506 """Shows a summary of the differences. 507 508 Args: 509 context_diff: The context diff to use to print the summary 510 environment_naming_info: The environment naming info to reference when printing model names 511 default_catalog: The default catalog to reference when deciding to remove catalog from display names 512 no_diff: Hide the actual SQL differences. 513 ignored_snapshot_ids: A set of snapshot ids that are ignored 514 """ 515 ignored_snapshot_ids = ignored_snapshot_ids or set() 516 if context_diff.is_new_environment: 517 self._print( 518 Tree( 519 f"[bold]New environment `{context_diff.environment}` will be created from `{context_diff.create_from}`" 520 ) 521 ) 522 if not context_diff.has_snapshot_changes: 523 return 524 525 if not context_diff.has_changes: 526 self._print(Tree(f"[bold]No differences when compared to `{context_diff.environment}`")) 527 return 528 529 self._print(Tree(f"[bold]Summary of differences against `{context_diff.environment}`:")) 530 self._show_summary_tree_for( 531 context_diff, 532 "Models", 533 lambda x: x.is_model, 534 environment_naming_info, 535 default_catalog, 536 no_diff=no_diff, 537 ignored_snapshot_ids=ignored_snapshot_ids, 538 ) 539 self._show_summary_tree_for( 540 context_diff, 541 "Standalone Audits", 542 lambda x: x.is_audit, 543 environment_naming_info, 544 default_catalog, 545 no_diff=no_diff, 546 ignored_snapshot_ids=ignored_snapshot_ids, 547 )
Shows a summary of the differences.
Arguments:
- context_diff: The context diff to use to print the summary
- environment_naming_info: The environment naming info to reference when printing model names
- default_catalog: The default catalog to reference when deciding to remove catalog from display names
- no_diff: Hide the actual SQL differences.
- ignored_snapshot_ids: A set of snapshot ids that are ignored
549 def plan( 550 self, 551 plan_builder: PlanBuilder, 552 auto_apply: bool, 553 default_catalog: t.Optional[str], 554 no_diff: bool = False, 555 no_prompts: bool = False, 556 ) -> None: 557 """The main plan flow. 558 559 The console should present the user with choices on how to backfill and version the snapshots 560 of a plan. 561 562 Args: 563 plan: The plan to make choices for. 564 auto_apply: Whether to automatically apply the plan after all choices have been made. 565 default_catalog: The default catalog to reference when deciding to remove catalog from display names 566 no_diff: Hide text differences for changed models. 567 no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that 568 if this flag is set to true and there are uncategorized changes the plan creation will 569 fail. Default: False 570 """ 571 self._prompt_categorize( 572 plan_builder, 573 auto_apply, 574 no_diff=no_diff, 575 no_prompts=no_prompts, 576 default_catalog=default_catalog, 577 ) 578 579 if not no_prompts: 580 self._show_options_after_categorization( 581 plan_builder, auto_apply, default_catalog=default_catalog 582 ) 583 584 if auto_apply: 585 plan_builder.apply()
The main plan flow.
The console should present the user with choices on how to backfill and version the snapshots of a plan.
Arguments:
- plan: The plan to make choices for.
- auto_apply: Whether to automatically apply the plan after all choices have been made.
- default_catalog: The default catalog to reference when deciding to remove catalog from display names
- no_diff: Hide text differences for changed models.
- no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that if this flag is set to true and there are uncategorized changes the plan creation will fail. Default: False
876 def log_test_results( 877 self, result: unittest.result.TestResult, output: str, target_dialect: str 878 ) -> None: 879 divider_length = 70 880 if result.wasSuccessful(): 881 self._print("=" * divider_length) 882 self._print( 883 f"Successfully Ran {str(result.testsRun)} tests against {target_dialect}", 884 style="green", 885 ) 886 self._print("-" * divider_length) 887 else: 888 self._print("-" * divider_length) 889 self._print("Test Failure Summary") 890 self._print("=" * divider_length) 891 self._print( 892 f"Num Successful Tests: {result.testsRun - len(result.failures) - len(result.errors)}" 893 ) 894 for test, _ in result.failures + result.errors: 895 if isinstance(test, ModelTest): 896 self._print(f"Failure Test: {test.model.name} {test.test_name}") 897 self._print("=" * divider_length) 898 self._print(output)
Display the test result and output.
Arguments:
- result: The unittest test result that contains metrics like num success, fails, ect.
- output: The generated output from the unittest.
- target_dialect: The dialect that tests were run against. Assumes all tests run against the same dialect.
900 def show_sql(self, sql: str) -> None: 901 self._print(Syntax(sql, "sql", word_wrap=True), crop=False)
Display to the user SQL.
912 def loading_start(self, message: t.Optional[str] = None) -> uuid.UUID: 913 id = uuid.uuid4() 914 self.loading_status[id] = Status(message or "", console=self.console, spinner="line") 915 self.loading_status[id].start() 916 return id
Starts loading and returns a unique ID that can be used to stop the loading. Optionally can display a message.
918 def loading_stop(self, id: uuid.UUID) -> None: 919 self.loading_status[id].stop() 920 del self.loading_status[id]
Stop loading for the given id.
922 def show_schema_diff(self, schema_diff: SchemaDiff) -> None: 923 source_name = schema_diff.source 924 if schema_diff.source_alias: 925 source_name = schema_diff.source_alias.upper() 926 target_name = schema_diff.target 927 if schema_diff.target_alias: 928 target_name = schema_diff.target_alias.upper() 929 930 first_line = f"\n[b]Schema Diff Between '[yellow]{source_name}[/yellow]' and '[green]{target_name}[/green]'" 931 if schema_diff.model_name: 932 first_line = ( 933 first_line + f" environments for model '[blue]{schema_diff.model_name}[/blue]'" 934 ) 935 936 tree = Tree(first_line + ":") 937 938 if any([schema_diff.added, schema_diff.removed, schema_diff.modified]): 939 if schema_diff.added: 940 added = Tree("[green]Added Columns:") 941 for c, t in schema_diff.added: 942 added.add(f"[green]{c} ({t})") 943 tree.add(added) 944 945 if schema_diff.removed: 946 removed = Tree("[red]Removed Columns:") 947 for c, t in schema_diff.removed: 948 removed.add(f"[red]{c} ({t})") 949 tree.add(removed) 950 951 if schema_diff.modified: 952 modified = Tree("[magenta]Modified Columns:") 953 for c, (ft, tt) in schema_diff.modified.items(): 954 modified.add(f"[magenta]{c} ({ft} -> {tt})") 955 tree.add(modified) 956 else: 957 tree.add("[b]Schemas match") 958 959 self.console.print(tree)
Show table schema diff.
961 def show_row_diff(self, row_diff: RowDiff, show_sample: bool = True) -> None: 962 source_name = row_diff.source 963 if row_diff.source_alias: 964 source_name = row_diff.source_alias.upper() 965 target_name = row_diff.target 966 if row_diff.target_alias: 967 target_name = row_diff.target_alias.upper() 968 969 tree = Tree("[b]Row Counts:[/b]") 970 tree.add(f" [b][blue]COMMON[/blue]:[/b] {row_diff.join_count} rows") 971 tree.add(f" [b][yellow]{source_name} ONLY[/yellow]:[/b] {row_diff.s_only_count} rows") 972 tree.add(f" [b][green]{target_name} ONLY[/green]:[/b] {row_diff.t_only_count} rows") 973 self.console.print("\n", tree) 974 975 self.console.print("\n[b][blue]COMMON ROWS[/blue] column comparison stats:[/b]") 976 if row_diff.column_stats.shape[0] > 0: 977 self.console.print(row_diff.column_stats.to_string(index=True), end="\n\n") 978 else: 979 self.console.print(" No columns with same name and data type in both tables") 980 981 if show_sample: 982 self.console.print("\n[b][blue]COMMON ROWS[/blue] sample data differences:[/b]") 983 if row_diff.joined_sample.shape[0] > 0: 984 self.console.print(row_diff.joined_sample.to_string(index=False), end="\n\n") 985 else: 986 self.console.print(" All joined rows match") 987 988 if row_diff.s_sample.shape[0] > 0: 989 self.console.print(f"\n[b][yellow]{source_name} ONLY[/yellow] sample rows:[/b]") 990 self.console.print(row_diff.s_sample.to_string(index=False), end="\n\n") 991 992 if row_diff.t_sample.shape[0] > 0: 993 self.console.print(f"\n[b][green]{target_name} ONLY[/green] sample rows:[/b]") 994 self.console.print(row_diff.t_sample.to_string(index=False), end="\n\n")
Show table summary diff.
1048def add_to_layout_widget(target_widget: LayoutWidget, *widgets: widgets.Widget) -> LayoutWidget: 1049 """Helper function to add a widget to a layout widget. 1050 1051 Args: 1052 target_widget: The layout widget to add the other widget(s) to. 1053 *widgets: The widgets to add to the layout widget. 1054 1055 Returns: 1056 The layout widget with the children added. 1057 """ 1058 target_widget.children += tuple(widgets) 1059 return target_widget
Helper function to add a widget to a layout widget.
Arguments:
- target_widget: The layout widget to add the other widget(s) to.
- *widgets: The widgets to add to the layout widget.
Returns:
The layout widget with the children added.
1062class NotebookMagicConsole(TerminalConsole): 1063 """ 1064 Console to be used when using the magic notebook interface (`%<command>`). 1065 Generally reuses the Terminal console when possible by either directly outputing what it provides 1066 or capturing it and converting it into a widget. 1067 """ 1068 1069 def __init__( 1070 self, 1071 display: t.Optional[t.Callable] = None, 1072 console: t.Optional[RichConsole] = None, 1073 **kwargs: t.Any, 1074 ) -> None: 1075 import ipywidgets as widgets 1076 from IPython import get_ipython 1077 from IPython.display import display as ipython_display 1078 1079 super().__init__(console, **kwargs) 1080 1081 self.display = display or get_ipython().user_ns.get("display", ipython_display) 1082 self.missing_dates_output = widgets.Output() 1083 self.dynamic_options_after_categorization_output = widgets.VBox() 1084 1085 def _show_missing_dates(self, plan: Plan, default_catalog: t.Optional[str]) -> None: 1086 self._add_to_dynamic_options(self.missing_dates_output) 1087 self.missing_dates_output.outputs = () 1088 with self.missing_dates_output: 1089 super()._show_missing_dates(plan, default_catalog) 1090 1091 def _apply(self, button: widgets.Button) -> None: 1092 button.disabled = True 1093 with button.output: 1094 button.plan_builder.apply() 1095 1096 def _prompt_promote(self, plan_builder: PlanBuilder) -> None: 1097 import ipywidgets as widgets 1098 1099 button = widgets.Button( 1100 description="Apply - Virtual Update", 1101 disabled=False, 1102 button_style="success", 1103 # Auto will make the button really large. 1104 # Likely changing this soon anyways to be just `Apply` with description above 1105 layout={"width": "10rem"}, 1106 ) 1107 self._add_to_dynamic_options(button) 1108 output = widgets.Output() 1109 self._add_to_dynamic_options(output) 1110 1111 button.plan_builder = plan_builder 1112 button.on_click(self._apply) 1113 button.output = output 1114 1115 def _prompt_effective_from( 1116 self, plan_builder: PlanBuilder, auto_apply: bool, default_catalog: t.Optional[str] 1117 ) -> None: 1118 import ipywidgets as widgets 1119 1120 prompt = widgets.VBox() 1121 1122 def effective_from_change_callback(change: t.Dict[str, datetime.datetime]) -> None: 1123 plan_builder.set_effective_from(change["new"]) 1124 self._show_options_after_categorization(plan_builder, auto_apply, default_catalog) 1125 1126 def going_forward_change_callback(change: t.Dict[str, bool]) -> None: 1127 checked = change["new"] 1128 plan_builder.set_effective_from(None if checked else yesterday_ds()) 1129 self._show_options_after_categorization( 1130 plan_builder, auto_apply=auto_apply, default_catalog=default_catalog 1131 ) 1132 1133 date_picker = widgets.DatePicker( 1134 disabled=plan_builder.build().effective_from is None, 1135 value=to_date(plan_builder.build().effective_from or yesterday_ds()), 1136 layout={"width": "auto"}, 1137 ) 1138 date_picker.observe(effective_from_change_callback, "value") 1139 1140 going_forward_checkbox = widgets.Checkbox( 1141 value=plan_builder.build().effective_from is None, 1142 description="Apply Going Forward Once Deployed To Prod", 1143 disabled=False, 1144 indent=False, 1145 ) 1146 going_forward_checkbox.observe(going_forward_change_callback, "value") 1147 1148 add_to_layout_widget( 1149 prompt, 1150 widgets.HBox( 1151 [ 1152 widgets.Label("Effective From Date:", layout={"width": "8rem"}), 1153 date_picker, 1154 going_forward_checkbox, 1155 ] 1156 ), 1157 ) 1158 1159 self._add_to_dynamic_options(prompt) 1160 1161 def _prompt_backfill( 1162 self, plan_builder: PlanBuilder, auto_apply: bool, default_catalog: t.Optional[str] 1163 ) -> None: 1164 import ipywidgets as widgets 1165 1166 prompt = widgets.VBox() 1167 1168 backfill_or_preview = ( 1169 "Preview" 1170 if plan_builder.build().is_dev and plan_builder.build().forward_only 1171 else "Backfill" 1172 ) 1173 1174 def _date_picker( 1175 plan_builder: PlanBuilder, value: t.Any, on_change: t.Callable, disabled: bool = False 1176 ) -> widgets.DatePicker: 1177 picker = widgets.DatePicker( 1178 disabled=disabled, 1179 value=value, 1180 layout={"width": "auto"}, 1181 ) 1182 1183 picker.observe(on_change, "value") 1184 return picker 1185 1186 def start_change_callback(change: t.Dict[str, datetime.datetime]) -> None: 1187 plan_builder.set_start(change["new"]) 1188 self._show_options_after_categorization(plan_builder, auto_apply, default_catalog) 1189 1190 def end_change_callback(change: t.Dict[str, datetime.datetime]) -> None: 1191 plan_builder.set_end(change["new"]) 1192 self._show_options_after_categorization(plan_builder, auto_apply, default_catalog) 1193 1194 if plan_builder.is_start_and_end_allowed: 1195 add_to_layout_widget( 1196 prompt, 1197 widgets.HBox( 1198 [ 1199 widgets.Label( 1200 f"Start {backfill_or_preview} Date:", layout={"width": "8rem"} 1201 ), 1202 _date_picker( 1203 plan_builder, to_date(plan_builder.build().start), start_change_callback 1204 ), 1205 ] 1206 ), 1207 ) 1208 1209 add_to_layout_widget( 1210 prompt, 1211 widgets.HBox( 1212 [ 1213 widgets.Label(f"End {backfill_or_preview} Date:", layout={"width": "8rem"}), 1214 _date_picker( 1215 plan_builder, 1216 to_date(plan_builder.build().end), 1217 end_change_callback, 1218 ), 1219 ] 1220 ), 1221 ) 1222 1223 self._add_to_dynamic_options(prompt) 1224 1225 if not auto_apply: 1226 button = widgets.Button( 1227 description=f"Apply - {backfill_or_preview} Tables", 1228 disabled=False, 1229 button_style="success", 1230 ) 1231 self._add_to_dynamic_options(button) 1232 output = widgets.Output() 1233 self._add_to_dynamic_options(output) 1234 1235 button.plan_builder = plan_builder 1236 button.on_click(self._apply) 1237 button.output = output 1238 1239 def _show_options_after_categorization( 1240 self, plan_builder: PlanBuilder, auto_apply: bool, default_catalog: t.Optional[str] 1241 ) -> None: 1242 self.dynamic_options_after_categorization_output.children = () 1243 self.display(self.dynamic_options_after_categorization_output) 1244 super()._show_options_after_categorization(plan_builder, auto_apply, default_catalog) 1245 1246 def _add_to_dynamic_options(self, *widgets: widgets.Widget) -> None: 1247 add_to_layout_widget(self.dynamic_options_after_categorization_output, *widgets) 1248 1249 def _get_snapshot_change_category( 1250 self, 1251 snapshot: Snapshot, 1252 plan_builder: PlanBuilder, 1253 auto_apply: bool, 1254 default_catalog: t.Optional[str], 1255 ) -> None: 1256 import ipywidgets as widgets 1257 1258 choice_mapping = self._snapshot_change_choices( 1259 snapshot, 1260 plan_builder.environment_naming_info, 1261 default_catalog, 1262 use_rich_formatting=False, 1263 ) 1264 choices = list(choice_mapping) 1265 plan_builder.set_choice(snapshot, choices[0]) 1266 1267 def radio_button_selected(change: t.Dict[str, t.Any]) -> None: 1268 plan_builder.set_choice(snapshot, choices[change["owner"].index]) 1269 self._show_options_after_categorization(plan_builder, auto_apply, default_catalog) 1270 1271 radio = widgets.RadioButtons( 1272 options=choice_mapping.values(), 1273 layout={"width": "max-content"}, 1274 disabled=False, 1275 ) 1276 radio.observe( 1277 radio_button_selected, 1278 "value", 1279 ) 1280 self.display(radio) 1281 1282 def log_test_results( 1283 self, result: unittest.result.TestResult, output: str, target_dialect: str 1284 ) -> None: 1285 import ipywidgets as widgets 1286 1287 divider_length = 70 1288 shared_style = { 1289 "font-size": "11px", 1290 "font-weight": "bold", 1291 "font-family": "Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace", 1292 } 1293 if result.wasSuccessful(): 1294 success_color = {"color": "#008000"} 1295 header = str(h("span", {"style": shared_style}, "-" * divider_length)) 1296 message = str( 1297 h( 1298 "span", 1299 {"style": {**shared_style, **success_color}}, 1300 f"Successfully Ran {str(result.testsRun)} Tests Against {target_dialect}", 1301 ) 1302 ) 1303 footer = str(h("span", {"style": shared_style}, "=" * divider_length)) 1304 self.display(widgets.HTML("<br>".join([header, message, footer]))) 1305 else: 1306 fail_color = {"color": "#db3737"} 1307 fail_shared_style = {**shared_style, **fail_color} 1308 header = str(h("span", {"style": fail_shared_style}, "-" * divider_length)) 1309 message = str(h("span", {"style": fail_shared_style}, "Test Failure Summary")) 1310 num_success = str( 1311 h( 1312 "span", 1313 {"style": fail_shared_style}, 1314 f"Num Successful Tests: {result.testsRun - len(result.failures) - len(result.errors)}", 1315 ) 1316 ) 1317 failure_tests = [] 1318 for test, _ in result.failures + result.errors: 1319 if isinstance(test, ModelTest): 1320 failure_tests.append( 1321 str( 1322 h( 1323 "span", 1324 {"style": fail_shared_style}, 1325 f"Failure Test: {test.model.name} {test.test_name}", 1326 ) 1327 ) 1328 ) 1329 failures = "<br>".join(failure_tests) 1330 footer = str(h("span", {"style": fail_shared_style}, "=" * divider_length)) 1331 error_output = widgets.Textarea(output, layout={"height": "300px", "width": "100%"}) 1332 test_info = widgets.HTML( 1333 "<br>".join([header, message, footer, num_success, failures, footer]) 1334 ) 1335 self.display(widgets.VBox(children=[test_info, error_output], layout={"width": "100%"}))
Console to be used when using the magic notebook interface (%<command>
).
Generally reuses the Terminal console when possible by either directly outputing what it provides
or capturing it and converting it into a widget.
1069 def __init__( 1070 self, 1071 display: t.Optional[t.Callable] = None, 1072 console: t.Optional[RichConsole] = None, 1073 **kwargs: t.Any, 1074 ) -> None: 1075 import ipywidgets as widgets 1076 from IPython import get_ipython 1077 from IPython.display import display as ipython_display 1078 1079 super().__init__(console, **kwargs) 1080 1081 self.display = display or get_ipython().user_ns.get("display", ipython_display) 1082 self.missing_dates_output = widgets.Output() 1083 self.dynamic_options_after_categorization_output = widgets.VBox()
1282 def log_test_results( 1283 self, result: unittest.result.TestResult, output: str, target_dialect: str 1284 ) -> None: 1285 import ipywidgets as widgets 1286 1287 divider_length = 70 1288 shared_style = { 1289 "font-size": "11px", 1290 "font-weight": "bold", 1291 "font-family": "Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace", 1292 } 1293 if result.wasSuccessful(): 1294 success_color = {"color": "#008000"} 1295 header = str(h("span", {"style": shared_style}, "-" * divider_length)) 1296 message = str( 1297 h( 1298 "span", 1299 {"style": {**shared_style, **success_color}}, 1300 f"Successfully Ran {str(result.testsRun)} Tests Against {target_dialect}", 1301 ) 1302 ) 1303 footer = str(h("span", {"style": shared_style}, "=" * divider_length)) 1304 self.display(widgets.HTML("<br>".join([header, message, footer]))) 1305 else: 1306 fail_color = {"color": "#db3737"} 1307 fail_shared_style = {**shared_style, **fail_color} 1308 header = str(h("span", {"style": fail_shared_style}, "-" * divider_length)) 1309 message = str(h("span", {"style": fail_shared_style}, "Test Failure Summary")) 1310 num_success = str( 1311 h( 1312 "span", 1313 {"style": fail_shared_style}, 1314 f"Num Successful Tests: {result.testsRun - len(result.failures) - len(result.errors)}", 1315 ) 1316 ) 1317 failure_tests = [] 1318 for test, _ in result.failures + result.errors: 1319 if isinstance(test, ModelTest): 1320 failure_tests.append( 1321 str( 1322 h( 1323 "span", 1324 {"style": fail_shared_style}, 1325 f"Failure Test: {test.model.name} {test.test_name}", 1326 ) 1327 ) 1328 ) 1329 failures = "<br>".join(failure_tests) 1330 footer = str(h("span", {"style": fail_shared_style}, "=" * divider_length)) 1331 error_output = widgets.Textarea(output, layout={"height": "300px", "width": "100%"}) 1332 test_info = widgets.HTML( 1333 "<br>".join([header, message, footer, num_success, failures, footer]) 1334 ) 1335 self.display(widgets.VBox(children=[test_info, error_output], layout={"width": "100%"}))
Display the test result and output.
Arguments:
- result: The unittest test result that contains metrics like num success, fails, ect.
- output: The generated output from the unittest.
- target_dialect: The dialect that tests were run against. Assumes all tests run against the same dialect.
Inherited Members
- TerminalConsole
- start_plan_evaluation
- stop_plan_evaluation
- start_evaluation_progress
- start_snapshot_evaluation_progress
- update_snapshot_evaluation_progress
- stop_evaluation_progress
- start_creation_progress
- update_creation_progress
- stop_creation_progress
- update_cleanup_progress
- start_promotion_progress
- update_promotion_progress
- stop_promotion_progress
- start_migration_progress
- update_migration_progress
- stop_migration_progress
- show_model_difference_summary
- plan
- show_sql
- log_status_update
- log_error
- log_success
- loading_start
- loading_stop
- show_schema_diff
- show_row_diff
1338class CaptureTerminalConsole(TerminalConsole): 1339 """ 1340 Captures the output of the terminal console so that it can be extracted out and displayed within other interfaces. 1341 The captured output is cleared out after it is retrieved. 1342 1343 Note: `_prompt` and `_confirm` need to also be overriden to work with the custom interface if you want to use 1344 this console interactively. 1345 """ 1346 1347 def __init__(self, console: t.Optional[RichConsole] = None, **kwargs: t.Any) -> None: 1348 super().__init__(console=console, **kwargs) 1349 self._captured_outputs: t.List[str] = [] 1350 self._errors: t.List[str] = [] 1351 1352 @property 1353 def captured_output(self) -> str: 1354 return "".join(self._captured_outputs) 1355 1356 @property 1357 def captured_errors(self) -> str: 1358 return "".join(self._errors) 1359 1360 def consume_captured_output(self) -> str: 1361 output = self.captured_output 1362 self.clear_captured_outputs() 1363 return output 1364 1365 def consume_captured_errors(self) -> str: 1366 errors = self.captured_errors 1367 self.clear_captured_errors() 1368 return errors 1369 1370 def clear_captured_outputs(self) -> None: 1371 self._captured_outputs = [] 1372 1373 def clear_captured_errors(self) -> None: 1374 self._errors = [] 1375 1376 def log_error(self, message: str) -> None: 1377 self._errors.append(message) 1378 super().log_error(message) 1379 1380 def _print(self, value: t.Any, **kwargs: t.Any) -> None: 1381 with self.console.capture() as capture: 1382 self.console.print(value, **kwargs) 1383 self._captured_outputs.append(capture.get())
Captures the output of the terminal console so that it can be extracted out and displayed within other interfaces. The captured output is cleared out after it is retrieved.
Note: _prompt
and _confirm
need to also be overriden to work with the custom interface if you want to use
this console interactively.
1376 def log_error(self, message: str) -> None: 1377 self._errors.append(message) 1378 super().log_error(message)
Display error info to the user.
Inherited Members
- TerminalConsole
- start_plan_evaluation
- stop_plan_evaluation
- start_evaluation_progress
- start_snapshot_evaluation_progress
- update_snapshot_evaluation_progress
- stop_evaluation_progress
- start_creation_progress
- update_creation_progress
- stop_creation_progress
- update_cleanup_progress
- start_promotion_progress
- update_promotion_progress
- stop_promotion_progress
- start_migration_progress
- update_migration_progress
- stop_migration_progress
- show_model_difference_summary
- plan
- log_test_results
- show_sql
- log_status_update
- log_success
- loading_start
- loading_stop
- show_schema_diff
- show_row_diff
1386class MarkdownConsole(CaptureTerminalConsole): 1387 """ 1388 A console that outputs markdown. Currently this is only configured for non-interactive use so for use cases 1389 where you want to display a plan or test results in markdown. 1390 """ 1391 1392 def show_model_difference_summary( 1393 self, 1394 context_diff: ContextDiff, 1395 environment_naming_info: EnvironmentNamingInfo, 1396 default_catalog: t.Optional[str], 1397 no_diff: bool = True, 1398 ignored_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None, 1399 ) -> None: 1400 """Shows a summary of the differences. 1401 1402 Args: 1403 context_diff: The context diff to use to print the summary. 1404 environment_naming_info: The environment naming info to reference when printing model names 1405 default_catalog: The default catalog to reference when deciding to remove catalog from display names 1406 no_diff: Hide the actual SQL differences. 1407 ignored_snapshot_ids: A set of snapshot names that are ignored 1408 """ 1409 ignored_snapshot_ids = ignored_snapshot_ids or set() 1410 if context_diff.is_new_environment: 1411 self._print( 1412 f"**New environment `{context_diff.environment}` will be created from `{context_diff.create_from}`**\n" 1413 ) 1414 if not context_diff.has_snapshot_changes: 1415 return 1416 1417 if not context_diff.has_changes: 1418 self._print(f"**No differences when compared to `{context_diff.environment}`**\n") 1419 return 1420 1421 self._print(f"**Summary of differences against `{context_diff.environment}`:**\n") 1422 1423 added_snapshots = { 1424 context_diff.snapshots[s_id] 1425 for s_id in context_diff.added 1426 if s_id not in ignored_snapshot_ids 1427 } 1428 added_snapshot_models = {s for s in added_snapshots if s.is_model} 1429 if added_snapshot_models: 1430 self._print("\n**Added Models:**") 1431 for snapshot in sorted(added_snapshot_models): 1432 self._print( 1433 f"- `{snapshot.display_name(environment_naming_info, default_catalog)}`" 1434 ) 1435 1436 added_snapshot_audits = {s for s in added_snapshots if s.is_audit} 1437 if added_snapshot_audits: 1438 self._print("\n**Added Standalone Audits:**") 1439 for snapshot in sorted(added_snapshot_audits): 1440 self._print( 1441 f"- `{snapshot.display_name(environment_naming_info, default_catalog)}`" 1442 ) 1443 1444 removed_snapshot_table_infos = { 1445 snapshot_table_info 1446 for s_id, snapshot_table_info in context_diff.removed_snapshots.items() 1447 if s_id not in ignored_snapshot_ids 1448 } 1449 removed_model_snapshot_table_infos = {s for s in removed_snapshot_table_infos if s.is_model} 1450 if removed_model_snapshot_table_infos: 1451 self._print("\n**Removed Models:**") 1452 for snapshot_table_info in sorted(removed_model_snapshot_table_infos): 1453 self._print( 1454 f"- `{snapshot_table_info.display_name(environment_naming_info, default_catalog)}`" 1455 ) 1456 1457 removed_audit_snapshot_table_infos = {s for s in removed_snapshot_table_infos if s.is_audit} 1458 if removed_audit_snapshot_table_infos: 1459 self._print("\n**Removed Standalone Audits:**") 1460 for snapshot_table_info in sorted(removed_audit_snapshot_table_infos): 1461 self._print( 1462 f"- `{snapshot_table_info.display_name(environment_naming_info, default_catalog)}`" 1463 ) 1464 1465 modified_snapshots = { 1466 current_snapshot 1467 for current_snapshot, _ in context_diff.modified_snapshots.values() 1468 if current_snapshot.snapshot_id not in ignored_snapshot_ids 1469 } 1470 if modified_snapshots: 1471 directly_modified = [] 1472 indirectly_modified = [] 1473 metadata_modified = [] 1474 for snapshot in modified_snapshots: 1475 if context_diff.directly_modified(snapshot.name): 1476 directly_modified.append(snapshot) 1477 elif context_diff.indirectly_modified(snapshot.name): 1478 indirectly_modified.append(snapshot) 1479 elif context_diff.metadata_updated(snapshot.name): 1480 metadata_modified.append(snapshot) 1481 if directly_modified: 1482 self._print("\n**Directly Modified:**") 1483 for snapshot in sorted(directly_modified): 1484 self._print( 1485 f"- `{snapshot.display_name(environment_naming_info, default_catalog)}`" 1486 ) 1487 if not no_diff: 1488 self._print(f"```diff\n{context_diff.text_diff(snapshot.name)}\n```") 1489 if indirectly_modified: 1490 self._print("\n**Indirectly Modified:**") 1491 for snapshot in sorted(indirectly_modified): 1492 self._print( 1493 f"- `{snapshot.display_name(environment_naming_info, default_catalog)}`" 1494 ) 1495 if metadata_modified: 1496 self._print("\n**Metadata Updated:**") 1497 for snapshot in sorted(metadata_modified): 1498 self._print( 1499 f"- `{snapshot.display_name(environment_naming_info, default_catalog)}`" 1500 ) 1501 if ignored_snapshot_ids: 1502 self._print("\n**Ignored Models (Expected Plan Start):**") 1503 for s_id in sorted(ignored_snapshot_ids): 1504 snapshot = context_diff.snapshots[s_id] 1505 self._print( 1506 f"- `{snapshot.display_name(environment_naming_info, default_catalog)}` ({snapshot.get_latest(start_date(snapshot, context_diff.snapshots.values()))})" 1507 ) 1508 1509 def _show_missing_dates(self, plan: Plan, default_catalog: t.Optional[str]) -> None: 1510 """Displays the models with missing dates.""" 1511 missing_intervals = plan.missing_intervals 1512 if not missing_intervals: 1513 return 1514 self._print("\n**Models needing backfill (missing dates):**") 1515 for missing in missing_intervals: 1516 snapshot = plan.context_diff.snapshots[missing.snapshot_id] 1517 if not snapshot.is_model: 1518 continue 1519 1520 preview_modifier = "" 1521 if not plan.deployability_index.is_deployable(snapshot): 1522 preview_modifier = " (**preview**)" 1523 1524 self._print( 1525 f"* `{snapshot.display_name(plan.environment_naming_info, default_catalog)}`: {missing.format_intervals(snapshot.node.interval_unit)}{preview_modifier}" 1526 ) 1527 1528 def _show_categorized_snapshots(self, plan: Plan, default_catalog: t.Optional[str]) -> None: 1529 context_diff = plan.context_diff 1530 for snapshot in plan.categorized: 1531 if not context_diff.directly_modified(snapshot.name): 1532 continue 1533 1534 category_str = SNAPSHOT_CHANGE_CATEGORY_STR[snapshot.change_category] 1535 tree = Tree( 1536 f"[bold][direct]Directly Modified: {snapshot.display_name(plan.environment_naming_info, default_catalog)} ({category_str})" 1537 ) 1538 indirect_tree = None 1539 for child_sid in sorted(plan.indirectly_modified.get(snapshot.snapshot_id, set())): 1540 child_snapshot = context_diff.snapshots[child_sid] 1541 if not indirect_tree: 1542 indirect_tree = Tree("[indirect]Indirectly Modified Children:") 1543 tree.add(indirect_tree) 1544 child_category_str = SNAPSHOT_CHANGE_CATEGORY_STR[child_snapshot.change_category] 1545 indirect_tree.add( 1546 f"[indirect]{child_snapshot.display_name(plan.environment_naming_info, default_catalog)} ({child_category_str})" 1547 ) 1548 self._print(f"```diff\n{context_diff.text_diff(snapshot.name)}\n```\n") 1549 self._print("```\n") 1550 self._print(tree) 1551 self._print("\n```") 1552 1553 def log_test_results( 1554 self, result: unittest.result.TestResult, output: str, target_dialect: str 1555 ) -> None: 1556 # import ipywidgets as widgets 1557 if result.wasSuccessful(): 1558 self._print( 1559 f"**Successfully Ran `{str(result.testsRun)}` Tests Against `{target_dialect}`**\n\n" 1560 ) 1561 else: 1562 self._print( 1563 f"**Num Successful Tests: {result.testsRun - len(result.failures) - len(result.errors)}**\n\n" 1564 ) 1565 for test, _ in result.failures + result.errors: 1566 if isinstance(test, ModelTest): 1567 self._print(f"* Failure Test: `{test.model.name}` - `{test.test_name}`\n\n") 1568 self._print(f"```{output}```\n\n") 1569 1570 def log_error(self, message: str) -> None: 1571 super().log_error(f"```\n{message}```\n\n")
A console that outputs markdown. Currently this is only configured for non-interactive use so for use cases where you want to display a plan or test results in markdown.
1392 def show_model_difference_summary( 1393 self, 1394 context_diff: ContextDiff, 1395 environment_naming_info: EnvironmentNamingInfo, 1396 default_catalog: t.Optional[str], 1397 no_diff: bool = True, 1398 ignored_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None, 1399 ) -> None: 1400 """Shows a summary of the differences. 1401 1402 Args: 1403 context_diff: The context diff to use to print the summary. 1404 environment_naming_info: The environment naming info to reference when printing model names 1405 default_catalog: The default catalog to reference when deciding to remove catalog from display names 1406 no_diff: Hide the actual SQL differences. 1407 ignored_snapshot_ids: A set of snapshot names that are ignored 1408 """ 1409 ignored_snapshot_ids = ignored_snapshot_ids or set() 1410 if context_diff.is_new_environment: 1411 self._print( 1412 f"**New environment `{context_diff.environment}` will be created from `{context_diff.create_from}`**\n" 1413 ) 1414 if not context_diff.has_snapshot_changes: 1415 return 1416 1417 if not context_diff.has_changes: 1418 self._print(f"**No differences when compared to `{context_diff.environment}`**\n") 1419 return 1420 1421 self._print(f"**Summary of differences against `{context_diff.environment}`:**\n") 1422 1423 added_snapshots = { 1424 context_diff.snapshots[s_id] 1425 for s_id in context_diff.added 1426 if s_id not in ignored_snapshot_ids 1427 } 1428 added_snapshot_models = {s for s in added_snapshots if s.is_model} 1429 if added_snapshot_models: 1430 self._print("\n**Added Models:**") 1431 for snapshot in sorted(added_snapshot_models): 1432 self._print( 1433 f"- `{snapshot.display_name(environment_naming_info, default_catalog)}`" 1434 ) 1435 1436 added_snapshot_audits = {s for s in added_snapshots if s.is_audit} 1437 if added_snapshot_audits: 1438 self._print("\n**Added Standalone Audits:**") 1439 for snapshot in sorted(added_snapshot_audits): 1440 self._print( 1441 f"- `{snapshot.display_name(environment_naming_info, default_catalog)}`" 1442 ) 1443 1444 removed_snapshot_table_infos = { 1445 snapshot_table_info 1446 for s_id, snapshot_table_info in context_diff.removed_snapshots.items() 1447 if s_id not in ignored_snapshot_ids 1448 } 1449 removed_model_snapshot_table_infos = {s for s in removed_snapshot_table_infos if s.is_model} 1450 if removed_model_snapshot_table_infos: 1451 self._print("\n**Removed Models:**") 1452 for snapshot_table_info in sorted(removed_model_snapshot_table_infos): 1453 self._print( 1454 f"- `{snapshot_table_info.display_name(environment_naming_info, default_catalog)}`" 1455 ) 1456 1457 removed_audit_snapshot_table_infos = {s for s in removed_snapshot_table_infos if s.is_audit} 1458 if removed_audit_snapshot_table_infos: 1459 self._print("\n**Removed Standalone Audits:**") 1460 for snapshot_table_info in sorted(removed_audit_snapshot_table_infos): 1461 self._print( 1462 f"- `{snapshot_table_info.display_name(environment_naming_info, default_catalog)}`" 1463 ) 1464 1465 modified_snapshots = { 1466 current_snapshot 1467 for current_snapshot, _ in context_diff.modified_snapshots.values() 1468 if current_snapshot.snapshot_id not in ignored_snapshot_ids 1469 } 1470 if modified_snapshots: 1471 directly_modified = [] 1472 indirectly_modified = [] 1473 metadata_modified = [] 1474 for snapshot in modified_snapshots: 1475 if context_diff.directly_modified(snapshot.name): 1476 directly_modified.append(snapshot) 1477 elif context_diff.indirectly_modified(snapshot.name): 1478 indirectly_modified.append(snapshot) 1479 elif context_diff.metadata_updated(snapshot.name): 1480 metadata_modified.append(snapshot) 1481 if directly_modified: 1482 self._print("\n**Directly Modified:**") 1483 for snapshot in sorted(directly_modified): 1484 self._print( 1485 f"- `{snapshot.display_name(environment_naming_info, default_catalog)}`" 1486 ) 1487 if not no_diff: 1488 self._print(f"```diff\n{context_diff.text_diff(snapshot.name)}\n```") 1489 if indirectly_modified: 1490 self._print("\n**Indirectly Modified:**") 1491 for snapshot in sorted(indirectly_modified): 1492 self._print( 1493 f"- `{snapshot.display_name(environment_naming_info, default_catalog)}`" 1494 ) 1495 if metadata_modified: 1496 self._print("\n**Metadata Updated:**") 1497 for snapshot in sorted(metadata_modified): 1498 self._print( 1499 f"- `{snapshot.display_name(environment_naming_info, default_catalog)}`" 1500 ) 1501 if ignored_snapshot_ids: 1502 self._print("\n**Ignored Models (Expected Plan Start):**") 1503 for s_id in sorted(ignored_snapshot_ids): 1504 snapshot = context_diff.snapshots[s_id] 1505 self._print( 1506 f"- `{snapshot.display_name(environment_naming_info, default_catalog)}` ({snapshot.get_latest(start_date(snapshot, context_diff.snapshots.values()))})" 1507 )
Shows a summary of the differences.
Arguments:
- context_diff: The context diff to use to print the summary.
- environment_naming_info: The environment naming info to reference when printing model names
- default_catalog: The default catalog to reference when deciding to remove catalog from display names
- no_diff: Hide the actual SQL differences.
- ignored_snapshot_ids: A set of snapshot names that are ignored
1553 def log_test_results( 1554 self, result: unittest.result.TestResult, output: str, target_dialect: str 1555 ) -> None: 1556 # import ipywidgets as widgets 1557 if result.wasSuccessful(): 1558 self._print( 1559 f"**Successfully Ran `{str(result.testsRun)}` Tests Against `{target_dialect}`**\n\n" 1560 ) 1561 else: 1562 self._print( 1563 f"**Num Successful Tests: {result.testsRun - len(result.failures) - len(result.errors)}**\n\n" 1564 ) 1565 for test, _ in result.failures + result.errors: 1566 if isinstance(test, ModelTest): 1567 self._print(f"* Failure Test: `{test.model.name}` - `{test.test_name}`\n\n") 1568 self._print(f"```{output}```\n\n")
Display the test result and output.
Arguments:
- result: The unittest test result that contains metrics like num success, fails, ect.
- output: The generated output from the unittest.
- target_dialect: The dialect that tests were run against. Assumes all tests run against the same dialect.
Inherited Members
- CaptureTerminalConsole
- CaptureTerminalConsole
- consume_captured_output
- consume_captured_errors
- clear_captured_outputs
- clear_captured_errors
- TerminalConsole
- start_plan_evaluation
- stop_plan_evaluation
- start_evaluation_progress
- start_snapshot_evaluation_progress
- update_snapshot_evaluation_progress
- stop_evaluation_progress
- start_creation_progress
- update_creation_progress
- stop_creation_progress
- update_cleanup_progress
- start_promotion_progress
- update_promotion_progress
- stop_promotion_progress
- start_migration_progress
- update_migration_progress
- stop_migration_progress
- plan
- show_sql
- log_status_update
- log_success
- loading_start
- loading_stop
- show_schema_diff
- show_row_diff
1574class DatabricksMagicConsole(CaptureTerminalConsole): 1575 """ 1576 Note: Databricks Magic Console currently does not support progress bars while a plan is being applied. The 1577 NotebookMagicConsole does support progress bars, but they will time out after 5 minutes of execution 1578 and it makes it difficult to see the progress of the plan. 1579 """ 1580 1581 def __init__(self, *args: t.Any, **kwargs: t.Any) -> None: 1582 super().__init__(*args, **kwargs) 1583 self.evaluation_batch_progress: t.Dict[SnapshotId, t.Tuple[str, int]] = {} 1584 self.promotion_status: t.Tuple[int, int] = (0, 0) 1585 self.model_creation_status: t.Tuple[int, int] = (0, 0) 1586 self.migration_status: t.Tuple[int, int] = (0, 0) 1587 1588 def _print(self, value: t.Any, **kwargs: t.Any) -> None: 1589 super()._print(value, **kwargs) 1590 for captured_output in self._captured_outputs: 1591 print(captured_output) 1592 self.clear_captured_outputs() 1593 1594 def _prompt(self, message: str, **kwargs: t.Any) -> t.Any: 1595 self._print(message) 1596 return super()._prompt("", **kwargs) 1597 1598 def _confirm(self, message: str, **kwargs: t.Any) -> bool: 1599 message = f"{message} [y/n]" 1600 self._print(message) 1601 return super()._confirm("", **kwargs) 1602 1603 def start_evaluation_progress( 1604 self, 1605 batches: t.Dict[Snapshot, int], 1606 environment_naming_info: EnvironmentNamingInfo, 1607 default_catalog: t.Optional[str], 1608 ) -> None: 1609 self.evaluation_batches = batches 1610 self.evaluation_environment_naming_info = environment_naming_info 1611 self.default_catalog = default_catalog 1612 1613 def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: 1614 if not self.evaluation_batch_progress.get(snapshot.snapshot_id): 1615 display_name = snapshot.display_name( 1616 self.evaluation_environment_naming_info, self.default_catalog 1617 ) 1618 self.evaluation_batch_progress[snapshot.snapshot_id] = (display_name, 0) 1619 print(f"Starting '{display_name}', Total batches: {self.evaluation_batches[snapshot]}") 1620 1621 def update_snapshot_evaluation_progress( 1622 self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int] 1623 ) -> None: 1624 view_name, loaded_batches = self.evaluation_batch_progress[snapshot.snapshot_id] 1625 total_batches = self.evaluation_batches[snapshot] 1626 1627 loaded_batches += 1 1628 self.evaluation_batch_progress[snapshot.snapshot_id] = (view_name, loaded_batches) 1629 1630 finished_loading = loaded_batches == total_batches 1631 status = "Loaded" if finished_loading else "Loading" 1632 print(f"{status} '{view_name}', Completed Batches: {loaded_batches}/{total_batches}") 1633 if finished_loading: 1634 total_finished_loading = len( 1635 [ 1636 s 1637 for s, total in self.evaluation_batches.items() 1638 if self.evaluation_batch_progress.get(s.snapshot_id, (None, -1))[1] == total 1639 ] 1640 ) 1641 total = len(self.evaluation_batch_progress) 1642 print(f"Completed Loading {total_finished_loading}/{total} Models") 1643 1644 def stop_evaluation_progress(self, success: bool = True) -> None: 1645 self.evaluation_batch_progress = {} 1646 super().stop_evaluation_progress(success) 1647 print(f"Loading {'succeeded' if success else 'failed'}") 1648 1649 def start_creation_progress( 1650 self, 1651 total_tasks: int, 1652 environment_naming_info: EnvironmentNamingInfo, 1653 default_catalog: t.Optional[str], 1654 ) -> None: 1655 """Indicates that a new creation progress has begun.""" 1656 self.model_creation_status = (0, total_tasks) 1657 print("Starting Creating New Model Versions") 1658 1659 def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None: 1660 """Update the snapshot creation progress.""" 1661 num_creations, total_creations = self.model_creation_status 1662 num_creations += 1 1663 self.model_creation_status = (num_creations, total_creations) 1664 if num_creations % 5 == 0: 1665 print(f"Created New Model Versions: {num_creations}/{total_creations}") 1666 1667 def stop_creation_progress(self, success: bool = True) -> None: 1668 """Stop the snapshot creation progress.""" 1669 self.model_creation_status = (0, 0) 1670 print(f"New Model Creation {'succeeded' if success else 'failed'}") 1671 1672 def start_promotion_progress( 1673 self, 1674 total_tasks: int, 1675 environment_naming_info: EnvironmentNamingInfo, 1676 default_catalog: t.Optional[str], 1677 ) -> None: 1678 """Indicates that a new snapshot promotion progress has begun.""" 1679 self.promotion_status = (0, total_tasks) 1680 print(f"Virtually Updating '{environment_naming_info.name}'") 1681 1682 def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None: 1683 """Update the snapshot promotion progress.""" 1684 num_promotions, total_promotions = self.promotion_status 1685 num_promotions += 1 1686 self.promotion_status = (num_promotions, total_promotions) 1687 if num_promotions % 5 == 0: 1688 print(f"Virtually Updated {num_promotions}/{total_promotions}") 1689 1690 def stop_promotion_progress(self, success: bool = True) -> None: 1691 """Stop the snapshot promotion progress.""" 1692 self.promotion_status = (0, 0) 1693 print(f"Virtual Update {'succeeded' if success else 'failed'}") 1694 1695 def start_migration_progress(self, total_tasks: int) -> None: 1696 """Indicates that a new migration progress has begun.""" 1697 self.migration_status = (0, total_tasks) 1698 print("Starting Migration") 1699 1700 def update_migration_progress(self, num_tasks: int) -> None: 1701 """Update the migration progress.""" 1702 num_migrations, total_migrations = self.migration_status 1703 num_migrations += num_tasks 1704 self.migration_status = (num_migrations, total_migrations) 1705 if num_migrations % 5 == 0: 1706 print(f"Migration Updated {num_migrations}/{total_migrations}") 1707 1708 def stop_migration_progress(self, success: bool = True) -> None: 1709 """Stop the migration progress.""" 1710 self.migration_status = (0, 0) 1711 print(f"Migration {'succeeded' if success else 'failed'}")
Note: Databricks Magic Console currently does not support progress bars while a plan is being applied. The NotebookMagicConsole does support progress bars, but they will time out after 5 minutes of execution and it makes it difficult to see the progress of the plan.
1581 def __init__(self, *args: t.Any, **kwargs: t.Any) -> None: 1582 super().__init__(*args, **kwargs) 1583 self.evaluation_batch_progress: t.Dict[SnapshotId, t.Tuple[str, int]] = {} 1584 self.promotion_status: t.Tuple[int, int] = (0, 0) 1585 self.model_creation_status: t.Tuple[int, int] = (0, 0) 1586 self.migration_status: t.Tuple[int, int] = (0, 0)
1603 def start_evaluation_progress( 1604 self, 1605 batches: t.Dict[Snapshot, int], 1606 environment_naming_info: EnvironmentNamingInfo, 1607 default_catalog: t.Optional[str], 1608 ) -> None: 1609 self.evaluation_batches = batches 1610 self.evaluation_environment_naming_info = environment_naming_info 1611 self.default_catalog = default_catalog
Indicates that a new snapshot evaluation progress has begun.
1613 def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: 1614 if not self.evaluation_batch_progress.get(snapshot.snapshot_id): 1615 display_name = snapshot.display_name( 1616 self.evaluation_environment_naming_info, self.default_catalog 1617 ) 1618 self.evaluation_batch_progress[snapshot.snapshot_id] = (display_name, 0) 1619 print(f"Starting '{display_name}', Total batches: {self.evaluation_batches[snapshot]}")
Starts the snapshot evaluation progress.
1621 def update_snapshot_evaluation_progress( 1622 self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int] 1623 ) -> None: 1624 view_name, loaded_batches = self.evaluation_batch_progress[snapshot.snapshot_id] 1625 total_batches = self.evaluation_batches[snapshot] 1626 1627 loaded_batches += 1 1628 self.evaluation_batch_progress[snapshot.snapshot_id] = (view_name, loaded_batches) 1629 1630 finished_loading = loaded_batches == total_batches 1631 status = "Loaded" if finished_loading else "Loading" 1632 print(f"{status} '{view_name}', Completed Batches: {loaded_batches}/{total_batches}") 1633 if finished_loading: 1634 total_finished_loading = len( 1635 [ 1636 s 1637 for s, total in self.evaluation_batches.items() 1638 if self.evaluation_batch_progress.get(s.snapshot_id, (None, -1))[1] == total 1639 ] 1640 ) 1641 total = len(self.evaluation_batch_progress) 1642 print(f"Completed Loading {total_finished_loading}/{total} Models")
Update the snapshot evaluation progress.
1644 def stop_evaluation_progress(self, success: bool = True) -> None: 1645 self.evaluation_batch_progress = {} 1646 super().stop_evaluation_progress(success) 1647 print(f"Loading {'succeeded' if success else 'failed'}")
Stop the snapshot evaluation progress.
1649 def start_creation_progress( 1650 self, 1651 total_tasks: int, 1652 environment_naming_info: EnvironmentNamingInfo, 1653 default_catalog: t.Optional[str], 1654 ) -> None: 1655 """Indicates that a new creation progress has begun.""" 1656 self.model_creation_status = (0, total_tasks) 1657 print("Starting Creating New Model Versions")
Indicates that a new creation progress has begun.
1659 def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None: 1660 """Update the snapshot creation progress.""" 1661 num_creations, total_creations = self.model_creation_status 1662 num_creations += 1 1663 self.model_creation_status = (num_creations, total_creations) 1664 if num_creations % 5 == 0: 1665 print(f"Created New Model Versions: {num_creations}/{total_creations}")
Update the snapshot creation progress.
1667 def stop_creation_progress(self, success: bool = True) -> None: 1668 """Stop the snapshot creation progress.""" 1669 self.model_creation_status = (0, 0) 1670 print(f"New Model Creation {'succeeded' if success else 'failed'}")
Stop the snapshot creation progress.
1672 def start_promotion_progress( 1673 self, 1674 total_tasks: int, 1675 environment_naming_info: EnvironmentNamingInfo, 1676 default_catalog: t.Optional[str], 1677 ) -> None: 1678 """Indicates that a new snapshot promotion progress has begun.""" 1679 self.promotion_status = (0, total_tasks) 1680 print(f"Virtually Updating '{environment_naming_info.name}'")
Indicates that a new snapshot promotion progress has begun.
1682 def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None: 1683 """Update the snapshot promotion progress.""" 1684 num_promotions, total_promotions = self.promotion_status 1685 num_promotions += 1 1686 self.promotion_status = (num_promotions, total_promotions) 1687 if num_promotions % 5 == 0: 1688 print(f"Virtually Updated {num_promotions}/{total_promotions}")
Update the snapshot promotion progress.
1690 def stop_promotion_progress(self, success: bool = True) -> None: 1691 """Stop the snapshot promotion progress.""" 1692 self.promotion_status = (0, 0) 1693 print(f"Virtual Update {'succeeded' if success else 'failed'}")
Stop the snapshot promotion progress.
1695 def start_migration_progress(self, total_tasks: int) -> None: 1696 """Indicates that a new migration progress has begun.""" 1697 self.migration_status = (0, total_tasks) 1698 print("Starting Migration")
Indicates that a new migration progress has begun.
1700 def update_migration_progress(self, num_tasks: int) -> None: 1701 """Update the migration progress.""" 1702 num_migrations, total_migrations = self.migration_status 1703 num_migrations += num_tasks 1704 self.migration_status = (num_migrations, total_migrations) 1705 if num_migrations % 5 == 0: 1706 print(f"Migration Updated {num_migrations}/{total_migrations}")
Update the migration progress.
1708 def stop_migration_progress(self, success: bool = True) -> None: 1709 """Stop the migration progress.""" 1710 self.migration_status = (0, 0) 1711 print(f"Migration {'succeeded' if success else 'failed'}")
Stop the migration progress.
Inherited Members
1714class DebuggerTerminalConsole(TerminalConsole): 1715 """A terminal console to use while debugging with no fluff, progress bars, etc.""" 1716 1717 def __init__(self, console: t.Optional[RichConsole], *args: t.Any, **kwargs: t.Any) -> None: 1718 self.console: RichConsole = console or srich.console 1719 1720 def _write(self, msg: t.Any, *args: t.Any, **kwargs: t.Any) -> None: 1721 self.console.log(msg, *args, **kwargs) 1722 1723 def start_plan_evaluation(self, plan: Plan) -> None: 1724 self._write("Starting plan", plan.plan_id) 1725 1726 def stop_plan_evaluation(self) -> None: 1727 self._write("Stopping plan") 1728 1729 def start_evaluation_progress( 1730 self, 1731 batches: t.Dict[Snapshot, int], 1732 environment_naming_info: EnvironmentNamingInfo, 1733 default_catalog: t.Optional[str], 1734 ) -> None: 1735 self._write(f"Starting evaluation for {len(batches)} snapshots") 1736 1737 def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: 1738 self._write(f"Evaluating {snapshot.name}") 1739 1740 def update_snapshot_evaluation_progress( 1741 self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int] 1742 ) -> None: 1743 self._write(f"Evaluating {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms") 1744 1745 def stop_evaluation_progress(self, success: bool = True) -> None: 1746 self._write(f"Stopping evaluation with success={success}") 1747 1748 def start_creation_progress( 1749 self, 1750 total_tasks: int, 1751 environment_naming_info: EnvironmentNamingInfo, 1752 default_catalog: t.Optional[str], 1753 ) -> None: 1754 self._write(f"Starting creation for {total_tasks} snapshots") 1755 1756 def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None: 1757 self._write(f"Creating {snapshot.name}") 1758 1759 def stop_creation_progress(self, success: bool = True) -> None: 1760 self._write(f"Stopping creation with success={success}") 1761 1762 def update_cleanup_progress(self, object_name: str) -> None: 1763 self._write(f"Cleaning up {object_name}") 1764 1765 def start_promotion_progress( 1766 self, 1767 total_tasks: int, 1768 environment_naming_info: EnvironmentNamingInfo, 1769 default_catalog: t.Optional[str], 1770 ) -> None: 1771 self._write(f"Starting promotion for {total_tasks} snapshots") 1772 1773 def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None: 1774 self._write(f"Promoting {snapshot.name}") 1775 1776 def stop_promotion_progress(self, success: bool = True) -> None: 1777 self._write(f"Stopping promotion with success={success}") 1778 1779 def start_migration_progress(self, total_tasks: int) -> None: 1780 self._write(f"Starting migration for {total_tasks} snapshots") 1781 1782 def update_migration_progress(self, num_tasks: int) -> None: 1783 self._write(f"Migration {num_tasks}") 1784 1785 def stop_migration_progress(self, success: bool = True) -> None: 1786 self._write(f"Stopping migration with success={success}") 1787 1788 def show_model_difference_summary( 1789 self, 1790 context_diff: ContextDiff, 1791 environment_naming_info: EnvironmentNamingInfo, 1792 default_catalog: t.Optional[str], 1793 no_diff: bool = True, 1794 ignored_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None, 1795 ) -> None: 1796 self._write("Model Difference Summary:") 1797 for added in context_diff.new_snapshots: 1798 self._write(f" Added: {added}") 1799 for removed in context_diff.removed_snapshots: 1800 self._write(f" Removed: {removed}") 1801 for modified in context_diff.modified_snapshots: 1802 self._write(f" Modified: {modified}") 1803 1804 def log_test_results( 1805 self, result: unittest.result.TestResult, output: str, target_dialect: str 1806 ) -> None: 1807 self._write("Test Results:", result) 1808 1809 def show_sql(self, sql: str) -> None: 1810 self._write(sql) 1811 1812 def log_status_update(self, message: str) -> None: 1813 self._write(message, style="bold blue") 1814 1815 def log_error(self, message: str) -> None: 1816 self._write(message, style="bold red") 1817 1818 def log_success(self, message: str) -> None: 1819 self._write(message, style="bold green") 1820 1821 def loading_start(self, message: t.Optional[str] = None) -> uuid.UUID: 1822 self._write(message) 1823 return uuid.uuid4() 1824 1825 def loading_stop(self, id: uuid.UUID) -> None: 1826 self._write("Done") 1827 1828 def show_schema_diff(self, schema_diff: SchemaDiff) -> None: 1829 self._write(schema_diff) 1830 1831 def show_row_diff(self, row_diff: RowDiff, show_sample: bool = True) -> None: 1832 self._write(row_diff)
A terminal console to use while debugging with no fluff, progress bars, etc.
1723 def start_plan_evaluation(self, plan: Plan) -> None: 1724 self._write("Starting plan", plan.plan_id)
Indicates that a new evaluation has begun.
1729 def start_evaluation_progress( 1730 self, 1731 batches: t.Dict[Snapshot, int], 1732 environment_naming_info: EnvironmentNamingInfo, 1733 default_catalog: t.Optional[str], 1734 ) -> None: 1735 self._write(f"Starting evaluation for {len(batches)} snapshots")
Indicates that a new snapshot evaluation progress has begun.
1737 def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None: 1738 self._write(f"Evaluating {snapshot.name}")
Starts the snapshot evaluation progress.
1740 def update_snapshot_evaluation_progress( 1741 self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int] 1742 ) -> None: 1743 self._write(f"Evaluating {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms")
Update the snapshot evaluation progress.
1745 def stop_evaluation_progress(self, success: bool = True) -> None: 1746 self._write(f"Stopping evaluation with success={success}")
Stop the snapshot evaluation progress.
1748 def start_creation_progress( 1749 self, 1750 total_tasks: int, 1751 environment_naming_info: EnvironmentNamingInfo, 1752 default_catalog: t.Optional[str], 1753 ) -> None: 1754 self._write(f"Starting creation for {total_tasks} snapshots")
Indicates that a new creation progress has begun.
1756 def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None: 1757 self._write(f"Creating {snapshot.name}")
Update the snapshot creation progress.
1759 def stop_creation_progress(self, success: bool = True) -> None: 1760 self._write(f"Stopping creation with success={success}")
Stop the snapshot creation progress.
1762 def update_cleanup_progress(self, object_name: str) -> None: 1763 self._write(f"Cleaning up {object_name}")
Update the snapshot cleanup progress.
1765 def start_promotion_progress( 1766 self, 1767 total_tasks: int, 1768 environment_naming_info: EnvironmentNamingInfo, 1769 default_catalog: t.Optional[str], 1770 ) -> None: 1771 self._write(f"Starting promotion for {total_tasks} snapshots")
Indicates that a new snapshot promotion progress has begun.
1773 def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None: 1774 self._write(f"Promoting {snapshot.name}")
Update the snapshot promotion progress.
1776 def stop_promotion_progress(self, success: bool = True) -> None: 1777 self._write(f"Stopping promotion with success={success}")
Stop the snapshot promotion progress.
1779 def start_migration_progress(self, total_tasks: int) -> None: 1780 self._write(f"Starting migration for {total_tasks} snapshots")
Indicates that a new migration progress has begun.
1782 def update_migration_progress(self, num_tasks: int) -> None: 1783 self._write(f"Migration {num_tasks}")
Update the migration progress.
1785 def stop_migration_progress(self, success: bool = True) -> None: 1786 self._write(f"Stopping migration with success={success}")
Stop the migration progress.
1788 def show_model_difference_summary( 1789 self, 1790 context_diff: ContextDiff, 1791 environment_naming_info: EnvironmentNamingInfo, 1792 default_catalog: t.Optional[str], 1793 no_diff: bool = True, 1794 ignored_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None, 1795 ) -> None: 1796 self._write("Model Difference Summary:") 1797 for added in context_diff.new_snapshots: 1798 self._write(f" Added: {added}") 1799 for removed in context_diff.removed_snapshots: 1800 self._write(f" Removed: {removed}") 1801 for modified in context_diff.modified_snapshots: 1802 self._write(f" Modified: {modified}")
Shows a summary of the differences.
Arguments:
- context_diff: The context diff to use to print the summary
- environment_naming_info: The environment naming info to reference when printing model names
- default_catalog: The default catalog to reference when deciding to remove catalog from display names
- no_diff: Hide the actual SQL differences.
- ignored_snapshot_ids: A set of snapshot ids that are ignored
1804 def log_test_results( 1805 self, result: unittest.result.TestResult, output: str, target_dialect: str 1806 ) -> None: 1807 self._write("Test Results:", result)
Display the test result and output.
Arguments:
- result: The unittest test result that contains metrics like num success, fails, ect.
- output: The generated output from the unittest.
- target_dialect: The dialect that tests were run against. Assumes all tests run against the same dialect.
1812 def log_status_update(self, message: str) -> None: 1813 self._write(message, style="bold blue")
Display general status update to the user.
1821 def loading_start(self, message: t.Optional[str] = None) -> uuid.UUID: 1822 self._write(message) 1823 return uuid.uuid4()
Starts loading and returns a unique ID that can be used to stop the loading. Optionally can display a message.
1831 def show_row_diff(self, row_diff: RowDiff, show_sample: bool = True) -> None: 1832 self._write(row_diff)
Show table summary diff.
Inherited Members
1835def get_console(**kwargs: t.Any) -> TerminalConsole | DatabricksMagicConsole | NotebookMagicConsole: 1836 """ 1837 Returns the console that is appropriate for the current runtime environment. 1838 1839 Note: Google Colab environment is untested and currently assumes is compatible with the base 1840 NotebookMagicConsole. 1841 """ 1842 from sqlmesh import RuntimeEnv 1843 1844 runtime_env = RuntimeEnv.get() 1845 1846 runtime_env_mapping = { 1847 RuntimeEnv.DATABRICKS: DatabricksMagicConsole, 1848 RuntimeEnv.JUPYTER: NotebookMagicConsole, 1849 RuntimeEnv.TERMINAL: TerminalConsole, 1850 RuntimeEnv.GOOGLE_COLAB: NotebookMagicConsole, 1851 RuntimeEnv.DEBUGGER: DebuggerTerminalConsole, 1852 } 1853 rich_console_kwargs: t.Dict[str, t.Any] = {"theme": srich.theme} 1854 if runtime_env.is_jupyter or runtime_env.is_google_colab: 1855 rich_console_kwargs["force_jupyter"] = True 1856 return runtime_env_mapping[runtime_env]( 1857 **{**{"console": RichConsole(**rich_console_kwargs)}, **kwargs} 1858 )
Returns the console that is appropriate for the current runtime environment.
Note: Google Colab environment is untested and currently assumes is compatible with the base NotebookMagicConsole.