Edit on GitHub

sqlmesh.core.console

   1from __future__ import annotations
   2
   3import abc
   4import datetime
   5import typing as t
   6import unittest
   7import uuid
   8
   9from hyperscript import h
  10from rich.console import Console as RichConsole
  11from rich.live import Live
  12from rich.progress import (
  13    BarColumn,
  14    Progress,
  15    SpinnerColumn,
  16    TaskID,
  17    TextColumn,
  18    TimeElapsedColumn,
  19)
  20from rich.prompt import Confirm, Prompt
  21from rich.status import Status
  22from rich.syntax import Syntax
  23from rich.table import Table
  24from rich.tree import Tree
  25
  26from sqlmesh.core.environment import EnvironmentNamingInfo
  27from sqlmesh.core.snapshot import (
  28    Snapshot,
  29    SnapshotChangeCategory,
  30    SnapshotId,
  31    SnapshotInfoLike,
  32    start_date,
  33)
  34from sqlmesh.core.test import ModelTest
  35from sqlmesh.utils import rich as srich
  36from sqlmesh.utils.date import time_like_to_str, to_date, yesterday_ds
  37
  38if t.TYPE_CHECKING:
  39    import ipywidgets as widgets
  40
  41    from sqlmesh.core.context_diff import ContextDiff
  42    from sqlmesh.core.plan import Plan, PlanBuilder
  43    from sqlmesh.core.table_diff import RowDiff, SchemaDiff
  44
  45    LayoutWidget = t.TypeVar("LayoutWidget", bound=t.Union[widgets.VBox, widgets.HBox])
  46
  47
  48SNAPSHOT_CHANGE_CATEGORY_STR = {
  49    None: "Unknown",
  50    SnapshotChangeCategory.BREAKING: "Breaking",
  51    SnapshotChangeCategory.NON_BREAKING: "Non-breaking",
  52    SnapshotChangeCategory.FORWARD_ONLY: "Forward-only",
  53    SnapshotChangeCategory.INDIRECT_BREAKING: "Indirect Breaking",
  54    SnapshotChangeCategory.INDIRECT_NON_BREAKING: "Indirect Non-breaking",
  55}
  56
  57
  58class Console(abc.ABC):
  59    """Abstract base class for defining classes used for displaying information to the user and also interact
  60    with them when their input is needed."""
  61
  62    @abc.abstractmethod
  63    def start_plan_evaluation(self, plan: Plan) -> None:
  64        """Indicates that a new evaluation has begun."""
  65
  66    @abc.abstractmethod
  67    def stop_plan_evaluation(self) -> None:
  68        """Indicates that the evaluation has ended."""
  69
  70    @abc.abstractmethod
  71    def start_evaluation_progress(
  72        self,
  73        batches: t.Dict[Snapshot, int],
  74        environment_naming_info: EnvironmentNamingInfo,
  75        default_catalog: t.Optional[str],
  76    ) -> None:
  77        """Indicates that a new snapshot evaluation progress has begun."""
  78
  79    @abc.abstractmethod
  80    def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
  81        """Starts the snapshot evaluation progress."""
  82
  83    @abc.abstractmethod
  84    def update_snapshot_evaluation_progress(
  85        self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int]
  86    ) -> None:
  87        """Updates the snapshot evaluation progress."""
  88
  89    @abc.abstractmethod
  90    def stop_evaluation_progress(self, success: bool = True) -> None:
  91        """Stops the snapshot evaluation progress."""
  92
  93    @abc.abstractmethod
  94    def start_creation_progress(
  95        self,
  96        total_tasks: int,
  97        environment_naming_info: EnvironmentNamingInfo,
  98        default_catalog: t.Optional[str],
  99    ) -> None:
 100        """Indicates that a new snapshot creation progress has begun."""
 101
 102    @abc.abstractmethod
 103    def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None:
 104        """Update the snapshot creation progress."""
 105
 106    @abc.abstractmethod
 107    def stop_creation_progress(self, success: bool = True) -> None:
 108        """Stop the snapshot creation progress."""
 109
 110    @abc.abstractmethod
 111    def update_cleanup_progress(self, object_name: str) -> None:
 112        """Update the snapshot cleanup progress."""
 113
 114    @abc.abstractmethod
 115    def start_promotion_progress(
 116        self,
 117        total_tasks: int,
 118        environment_naming_info: EnvironmentNamingInfo,
 119        default_catalog: t.Optional[str],
 120    ) -> None:
 121        """Indicates that a new snapshot promotion progress has begun."""
 122
 123    @abc.abstractmethod
 124    def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
 125        """Update the snapshot promotion progress."""
 126
 127    @abc.abstractmethod
 128    def stop_promotion_progress(self, success: bool = True) -> None:
 129        """Stop the snapshot promotion progress."""
 130
 131    @abc.abstractmethod
 132    def start_migration_progress(self, total_tasks: int) -> None:
 133        """Indicates that a new migration progress has begun."""
 134
 135    @abc.abstractmethod
 136    def update_migration_progress(self, num_tasks: int) -> None:
 137        """Update the migration progress."""
 138
 139    @abc.abstractmethod
 140    def stop_migration_progress(self, success: bool = True) -> None:
 141        """Stop the migration progress."""
 142
 143    @abc.abstractmethod
 144    def show_model_difference_summary(
 145        self,
 146        context_diff: ContextDiff,
 147        environment_naming_info: EnvironmentNamingInfo,
 148        default_catalog: t.Optional[str],
 149        no_diff: bool = True,
 150        ignored_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None,
 151    ) -> None:
 152        """Displays a summary of differences for the given models."""
 153
 154    @abc.abstractmethod
 155    def plan(
 156        self,
 157        plan_builder: PlanBuilder,
 158        auto_apply: bool,
 159        default_catalog: t.Optional[str],
 160        no_diff: bool = False,
 161        no_prompts: bool = False,
 162    ) -> None:
 163        """The main plan flow.
 164
 165        The console should present the user with choices on how to backfill and version the snapshots
 166        of a plan.
 167
 168        Args:
 169            plan: The plan to make choices for.
 170            auto_apply: Whether to automatically apply the plan after all choices have been made.
 171            no_diff: Hide text differences for changed models.
 172            no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that
 173                if this flag is set to true and there are uncategorized changes the plan creation will
 174                fail. Default: False
 175        """
 176
 177    @abc.abstractmethod
 178    def log_test_results(
 179        self, result: unittest.result.TestResult, output: str, target_dialect: str
 180    ) -> None:
 181        """Display the test result and output.
 182
 183        Args:
 184            result: The unittest test result that contains metrics like num success, fails, ect.
 185            output: The generated output from the unittest.
 186            target_dialect: The dialect that tests were run against. Assumes all tests run against the same dialect.
 187        """
 188
 189    @abc.abstractmethod
 190    def show_sql(self, sql: str) -> None:
 191        """Display to the user SQL."""
 192
 193    @abc.abstractmethod
 194    def log_status_update(self, message: str) -> None:
 195        """Display general status update to the user."""
 196
 197    @abc.abstractmethod
 198    def log_error(self, message: str) -> None:
 199        """Display error info to the user."""
 200
 201    @abc.abstractmethod
 202    def log_success(self, message: str) -> None:
 203        """Display a general successful message to the user."""
 204
 205    @abc.abstractmethod
 206    def loading_start(self, message: t.Optional[str] = None) -> uuid.UUID:
 207        """Starts loading and returns a unique ID that can be used to stop the loading. Optionally can display a message."""
 208
 209    @abc.abstractmethod
 210    def loading_stop(self, id: uuid.UUID) -> None:
 211        """Stop loading for the given id."""
 212
 213    @abc.abstractmethod
 214    def show_schema_diff(self, schema_diff: SchemaDiff) -> None:
 215        """Show table schema diff."""
 216
 217    @abc.abstractmethod
 218    def show_row_diff(self, row_diff: RowDiff, show_sample: bool = True) -> None:
 219        """Show table summary diff."""
 220
 221
 222class TerminalConsole(Console):
 223    """A rich based implementation of the console."""
 224
 225    def __init__(
 226        self, console: t.Optional[RichConsole] = None, verbose: bool = False, **kwargs: t.Any
 227    ) -> None:
 228        self.console: RichConsole = console or srich.console
 229
 230        self.evaluation_progress_live: t.Optional[Live] = None
 231        self.evaluation_total_progress: t.Optional[Progress] = None
 232        self.evaluation_total_task: t.Optional[TaskID] = None
 233        self.evaluation_model_progress: t.Optional[Progress] = None
 234        self.evaluation_model_tasks: t.Dict[str, TaskID] = {}
 235        self.evaluation_model_batches: t.Dict[Snapshot, int] = {}
 236
 237        # Put in temporary values that are replaced when evaluating
 238        self.environment_naming_info = EnvironmentNamingInfo()
 239        self.default_catalog: t.Optional[str] = None
 240
 241        self.creation_progress: t.Optional[Progress] = None
 242        self.creation_task: t.Optional[TaskID] = None
 243
 244        self.promotion_progress: t.Optional[Progress] = None
 245        self.promotion_task: t.Optional[TaskID] = None
 246
 247        self.migration_progress: t.Optional[Progress] = None
 248        self.migration_task: t.Optional[TaskID] = None
 249
 250        self.loading_status: t.Dict[uuid.UUID, Status] = {}
 251
 252        self.verbose = verbose
 253
 254    def _print(self, value: t.Any, **kwargs: t.Any) -> None:
 255        self.console.print(value, **kwargs)
 256
 257    def _prompt(self, message: str, **kwargs: t.Any) -> t.Any:
 258        return Prompt.ask(message, console=self.console, **kwargs)
 259
 260    def _confirm(self, message: str, **kwargs: t.Any) -> bool:
 261        return Confirm.ask(message, console=self.console, **kwargs)
 262
 263    def start_plan_evaluation(self, plan: Plan) -> None:
 264        pass
 265
 266    def stop_plan_evaluation(self) -> None:
 267        pass
 268
 269    def start_evaluation_progress(
 270        self,
 271        batches: t.Dict[Snapshot, int],
 272        environment_naming_info: EnvironmentNamingInfo,
 273        default_catalog: t.Optional[str],
 274    ) -> None:
 275        """Indicates that a new snapshot evaluation progress has begun."""
 276        if not self.evaluation_progress_live:
 277            self.evaluation_total_progress = Progress(
 278                TextColumn("[bold blue]Evaluating models", justify="right"),
 279                BarColumn(bar_width=40),
 280                "[progress.percentage]{task.percentage:>3.1f}%",
 281                "•",
 282                srich.BatchColumn(),
 283                "•",
 284                TimeElapsedColumn(),
 285                console=self.console,
 286            )
 287
 288            self.evaluation_model_progress = Progress(
 289                TextColumn("{task.fields[view_name]}", justify="right"),
 290                SpinnerColumn(spinner_name="simpleDots"),
 291                console=self.console,
 292            )
 293
 294            progress_table = Table.grid()
 295            progress_table.add_row(self.evaluation_total_progress)
 296            progress_table.add_row(self.evaluation_model_progress)
 297
 298            self.evaluation_progress_live = Live(progress_table, refresh_per_second=10)
 299            self.evaluation_progress_live.start()
 300
 301            self.evaluation_total_task = self.evaluation_total_progress.add_task(
 302                "Evaluating models...", total=sum(batches.values())
 303            )
 304
 305            self.evaluation_model_batches = batches
 306            self.environment_naming_info = environment_naming_info
 307            self.default_catalog = default_catalog
 308
 309    def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
 310        if self.evaluation_model_progress and snapshot.name not in self.evaluation_model_tasks:
 311            display_name = snapshot.display_name(self.environment_naming_info, self.default_catalog)
 312            self.evaluation_model_tasks[snapshot.name] = self.evaluation_model_progress.add_task(
 313                f"Evaluating {display_name}...",
 314                view_name=display_name,
 315                total=self.evaluation_model_batches[snapshot],
 316            )
 317
 318    def update_snapshot_evaluation_progress(
 319        self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int]
 320    ) -> None:
 321        """Update the snapshot evaluation progress."""
 322        if (
 323            self.evaluation_total_progress
 324            and self.evaluation_model_progress
 325            and self.evaluation_progress_live
 326        ):
 327            total_batches = self.evaluation_model_batches[snapshot]
 328
 329            if duration_ms:
 330                self.evaluation_progress_live.console.print(
 331                    f"[{batch_idx + 1}/{total_batches}] {snapshot.display_name(self.environment_naming_info, self.default_catalog)} [green]evaluated[/green] in {(duration_ms / 1000.0):.2f}s"
 332                )
 333
 334            self.evaluation_total_progress.update(
 335                self.evaluation_total_task or TaskID(0), refresh=True, advance=1
 336            )
 337
 338            model_task_id = self.evaluation_model_tasks[snapshot.name]
 339            self.evaluation_model_progress.update(model_task_id, refresh=True, advance=1)
 340            if self.evaluation_model_progress._tasks[model_task_id].completed >= total_batches:
 341                self.evaluation_model_progress.remove_task(model_task_id)
 342
 343    def stop_evaluation_progress(self, success: bool = True) -> None:
 344        """Stop the snapshot evaluation progress."""
 345        if self.evaluation_progress_live:
 346            self.evaluation_progress_live.stop()
 347            if success:
 348                self.log_success("All model batches have been executed successfully")
 349
 350        self.evaluation_progress_live = None
 351        self.evaluation_total_progress = None
 352        self.evaluation_total_task = None
 353        self.evaluation_model_progress = None
 354        self.evaluation_model_tasks = {}
 355        self.evaluation_model_batches = {}
 356        self.environment_naming_info = EnvironmentNamingInfo()
 357        self.default_catalog = None
 358
 359    def start_creation_progress(
 360        self,
 361        total_tasks: int,
 362        environment_naming_info: EnvironmentNamingInfo,
 363        default_catalog: t.Optional[str],
 364    ) -> None:
 365        """Indicates that a new creation progress has begun."""
 366        if self.creation_progress is None:
 367            self.creation_progress = Progress(
 368                TextColumn("[bold blue]Creating physical tables", justify="right"),
 369                BarColumn(bar_width=40),
 370                "[progress.percentage]{task.percentage:>3.1f}%",
 371                "•",
 372                srich.BatchColumn(),
 373                "•",
 374                TimeElapsedColumn(),
 375                console=self.console,
 376            )
 377
 378            self.creation_progress.start()
 379            self.creation_task = self.creation_progress.add_task(
 380                "Creating physical tables...",
 381                total=total_tasks,
 382            )
 383
 384            self.environment_naming_info = environment_naming_info
 385            self.default_catalog = default_catalog
 386
 387    def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None:
 388        """Update the snapshot creation progress."""
 389        if self.creation_progress is not None and self.creation_task is not None:
 390            if self.verbose:
 391                self.creation_progress.live.console.print(
 392                    f"{snapshot.display_name(self.environment_naming_info, self.default_catalog)} [green]created[/green]"
 393                )
 394            self.creation_progress.update(self.creation_task, refresh=True, advance=1)
 395
 396    def stop_creation_progress(self, success: bool = True) -> None:
 397        """Stop the snapshot creation progress."""
 398        self.creation_task = None
 399        if self.creation_progress is not None:
 400            self.creation_progress.stop()
 401            self.creation_progress = None
 402            if success:
 403                self.log_success("All model versions have been created successfully")
 404
 405        self.environment_naming_info = EnvironmentNamingInfo()
 406        self.default_catalog = None
 407
 408    def update_cleanup_progress(self, object_name: str) -> None:
 409        """Update the snapshot cleanup progress."""
 410        self._print(f"Deleted object {object_name}")
 411
 412    def start_promotion_progress(
 413        self,
 414        total_tasks: int,
 415        environment_naming_info: EnvironmentNamingInfo,
 416        default_catalog: t.Optional[str],
 417    ) -> None:
 418        """Indicates that a new snapshot promotion progress has begun."""
 419        if self.promotion_progress is None:
 420            self.promotion_progress = Progress(
 421                TextColumn(
 422                    f"[bold blue]Virtually Updating '{environment_naming_info.name}'",
 423                    justify="right",
 424                ),
 425                BarColumn(bar_width=40),
 426                "[progress.percentage]{task.percentage:>3.1f}%",
 427                "•",
 428                TimeElapsedColumn(),
 429                console=self.console,
 430            )
 431
 432            self.promotion_progress.start()
 433            self.promotion_task = self.promotion_progress.add_task(
 434                f"Virtually Updating {environment_naming_info.name}...",
 435                total=total_tasks,
 436            )
 437
 438            self.environment_naming_info = environment_naming_info
 439            self.default_catalog = default_catalog
 440
 441    def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
 442        """Update the snapshot promotion progress."""
 443        if self.promotion_progress is not None and self.promotion_task is not None:
 444            if self.verbose:
 445                action_str = "[green]promoted[/green]" if promoted else "[yellow]demoted[/yellow]"
 446                self.promotion_progress.live.console.print(
 447                    f"{snapshot.display_name(self.environment_naming_info, self.default_catalog)} {action_str}"
 448                )
 449            self.promotion_progress.update(self.promotion_task, refresh=True, advance=1)
 450
 451    def stop_promotion_progress(self, success: bool = True) -> None:
 452        """Stop the snapshot promotion progress."""
 453        self.promotion_task = None
 454        if self.promotion_progress is not None:
 455            self.promotion_progress.stop()
 456            self.promotion_progress = None
 457            if success:
 458                self.log_success("The target environment has been updated successfully")
 459
 460        self.environment_naming_info = EnvironmentNamingInfo()
 461        self.default_catalog = None
 462
 463    def start_migration_progress(self, total_tasks: int) -> None:
 464        """Indicates that a new migration progress has begun."""
 465        if self.migration_progress is None:
 466            self.migration_progress = Progress(
 467                TextColumn("[bold blue]Migrating snapshots", justify="right"),
 468                BarColumn(bar_width=40),
 469                "[progress.percentage]{task.percentage:>3.1f}%",
 470                "•",
 471                srich.BatchColumn(),
 472                "•",
 473                TimeElapsedColumn(),
 474                console=self.console,
 475            )
 476
 477            self.migration_progress.start()
 478            self.migration_task = self.migration_progress.add_task(
 479                "Migrating snapshots...",
 480                total=total_tasks,
 481            )
 482
 483    def update_migration_progress(self, num_tasks: int) -> None:
 484        """Update the migration progress."""
 485        if self.migration_progress is not None and self.migration_task is not None:
 486            self.migration_progress.update(self.migration_task, refresh=True, advance=num_tasks)
 487
 488    def stop_migration_progress(self, success: bool = True) -> None:
 489        """Stop the migration progress."""
 490        self.migration_task = None
 491        if self.migration_progress is not None:
 492            self.migration_progress.stop()
 493            self.migration_progress = None
 494            if success:
 495                self.log_success("The migration has been completed successfully")
 496
 497    def show_model_difference_summary(
 498        self,
 499        context_diff: ContextDiff,
 500        environment_naming_info: EnvironmentNamingInfo,
 501        default_catalog: t.Optional[str],
 502        no_diff: bool = True,
 503        ignored_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None,
 504    ) -> None:
 505        """Shows a summary of the differences.
 506
 507        Args:
 508            context_diff: The context diff to use to print the summary
 509            environment_naming_info: The environment naming info to reference when printing model names
 510            default_catalog: The default catalog to reference when deciding to remove catalog from display names
 511            no_diff: Hide the actual SQL differences.
 512            ignored_snapshot_ids: A set of snapshot ids that are ignored
 513        """
 514        ignored_snapshot_ids = ignored_snapshot_ids or set()
 515        if context_diff.is_new_environment:
 516            self._print(
 517                Tree(
 518                    f"[bold]New environment `{context_diff.environment}` will be created from `{context_diff.create_from}`"
 519                )
 520            )
 521            if not context_diff.has_snapshot_changes:
 522                return
 523
 524        if not context_diff.has_changes:
 525            self._print(Tree(f"[bold]No differences when compared to `{context_diff.environment}`"))
 526            return
 527
 528        self._print(Tree(f"[bold]Summary of differences against `{context_diff.environment}`:"))
 529        self._show_summary_tree_for(
 530            context_diff,
 531            "Models",
 532            lambda x: x.is_model,
 533            environment_naming_info,
 534            default_catalog,
 535            no_diff=no_diff,
 536            ignored_snapshot_ids=ignored_snapshot_ids,
 537        )
 538        self._show_summary_tree_for(
 539            context_diff,
 540            "Standalone Audits",
 541            lambda x: x.is_audit,
 542            environment_naming_info,
 543            default_catalog,
 544            no_diff=no_diff,
 545            ignored_snapshot_ids=ignored_snapshot_ids,
 546        )
 547
 548    def plan(
 549        self,
 550        plan_builder: PlanBuilder,
 551        auto_apply: bool,
 552        default_catalog: t.Optional[str],
 553        no_diff: bool = False,
 554        no_prompts: bool = False,
 555    ) -> None:
 556        """The main plan flow.
 557
 558        The console should present the user with choices on how to backfill and version the snapshots
 559        of a plan.
 560
 561        Args:
 562            plan: The plan to make choices for.
 563            auto_apply: Whether to automatically apply the plan after all choices have been made.
 564            default_catalog: The default catalog to reference when deciding to remove catalog from display names
 565            no_diff: Hide text differences for changed models.
 566            no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that
 567                if this flag is set to true and there are uncategorized changes the plan creation will
 568                fail. Default: False
 569        """
 570        self._prompt_categorize(
 571            plan_builder,
 572            auto_apply,
 573            no_diff=no_diff,
 574            no_prompts=no_prompts,
 575            default_catalog=default_catalog,
 576        )
 577
 578        if not no_prompts:
 579            self._show_options_after_categorization(
 580                plan_builder, auto_apply, default_catalog=default_catalog
 581            )
 582
 583        if auto_apply:
 584            plan_builder.apply()
 585
 586    def _get_ignored_tree(
 587        self,
 588        ignored_snapshot_ids: t.Set[SnapshotId],
 589        snapshots: t.Dict[SnapshotId, Snapshot],
 590        environment_naming_info: EnvironmentNamingInfo,
 591        default_catalog: t.Optional[str],
 592    ) -> Tree:
 593        ignored = Tree("[bold][ignored]Ignored Models (Expected Plan Start):")
 594        for s_id in ignored_snapshot_ids:
 595            snapshot = snapshots[s_id]
 596            ignored.add(
 597                f"[ignored]{snapshot.display_name(environment_naming_info, default_catalog)} ({snapshot.get_latest(start_date(snapshot, snapshots.values()))})"
 598            )
 599        return ignored
 600
 601    def _show_summary_tree_for(
 602        self,
 603        context_diff: ContextDiff,
 604        header: str,
 605        snapshot_selector: t.Callable[[SnapshotInfoLike], bool],
 606        environment_naming_info: EnvironmentNamingInfo,
 607        default_catalog: t.Optional[str],
 608        no_diff: bool = True,
 609        ignored_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None,
 610    ) -> None:
 611        ignored_snapshot_ids = ignored_snapshot_ids or set()
 612        selected_snapshots = {
 613            s_id: snapshot
 614            for s_id, snapshot in context_diff.snapshots.items()
 615            if snapshot_selector(snapshot)
 616        }
 617        selected_ignored_snapshot_ids = {
 618            s_id for s_id in selected_snapshots if s_id in ignored_snapshot_ids
 619        }
 620        added_snapshot_ids = {
 621            s_id for s_id in context_diff.added if snapshot_selector(context_diff.snapshots[s_id])
 622        } - selected_ignored_snapshot_ids
 623        removed_snapshot_ids = {
 624            s_id
 625            for s_id, snapshot in context_diff.removed_snapshots.items()
 626            if snapshot_selector(snapshot)
 627        } - selected_ignored_snapshot_ids
 628        modified_snapshot_ids = {
 629            current_snapshot.snapshot_id
 630            for _, (current_snapshot, _) in context_diff.modified_snapshots.items()
 631            if snapshot_selector(current_snapshot)
 632        } - selected_ignored_snapshot_ids
 633
 634        tree_sets = (
 635            added_snapshot_ids,
 636            removed_snapshot_ids,
 637            modified_snapshot_ids,
 638            selected_ignored_snapshot_ids,
 639        )
 640        if all(not s_ids for s_ids in tree_sets):
 641            return
 642
 643        tree = Tree(f"[bold]{header}:")
 644        if added_snapshot_ids:
 645            added_tree = Tree("[bold][added]Added:")
 646            for s_id in added_snapshot_ids:
 647                snapshot = context_diff.snapshots[s_id]
 648                added_tree.add(
 649                    f"[added]{snapshot.display_name(environment_naming_info, default_catalog)}"
 650                )
 651            tree.add(added_tree)
 652        if removed_snapshot_ids:
 653            removed_tree = Tree("[bold][removed]Removed:")
 654            for s_id in removed_snapshot_ids:
 655                snapshot_table_info = context_diff.removed_snapshots[s_id]
 656                removed_tree.add(
 657                    f"[removed]{snapshot_table_info.display_name(environment_naming_info, default_catalog)}"
 658                )
 659            tree.add(removed_tree)
 660        if modified_snapshot_ids:
 661            direct = Tree("[bold][direct]Directly Modified:")
 662            indirect = Tree("[bold][indirect]Indirectly Modified:")
 663            metadata = Tree("[bold][metadata]Metadata Updated:")
 664            for s_id in modified_snapshot_ids:
 665                name = s_id.name
 666                display_name = context_diff.snapshots[s_id].display_name(
 667                    environment_naming_info, default_catalog
 668                )
 669                if context_diff.directly_modified(name):
 670                    direct.add(
 671                        f"[direct]{display_name}"
 672                        if no_diff
 673                        else Syntax(f"{display_name}\n{context_diff.text_diff(name)}", "sql")
 674                    )
 675                elif context_diff.indirectly_modified(name):
 676                    indirect.add(f"[indirect]{display_name}")
 677                elif context_diff.metadata_updated(name):
 678                    metadata.add(
 679                        f"[metadata]{display_name}"
 680                        if no_diff
 681                        else Syntax(f"{display_name}", "sql", word_wrap=True)
 682                    )
 683            if direct.children:
 684                tree.add(direct)
 685            if indirect.children:
 686                tree.add(indirect)
 687            if metadata.children:
 688                tree.add(metadata)
 689        if selected_ignored_snapshot_ids:
 690            tree.add(
 691                self._get_ignored_tree(
 692                    selected_ignored_snapshot_ids,
 693                    selected_snapshots,
 694                    environment_naming_info,
 695                    default_catalog,
 696                )
 697            )
 698        self._print(tree)
 699
 700    def _show_options_after_categorization(
 701        self, plan_builder: PlanBuilder, auto_apply: bool, default_catalog: t.Optional[str]
 702    ) -> None:
 703        plan = plan_builder.build()
 704        if plan.forward_only and plan.new_snapshots:
 705            self._prompt_effective_from(plan_builder, auto_apply, default_catalog)
 706
 707        if plan.requires_backfill:
 708            self._show_missing_dates(plan_builder.build(), default_catalog)
 709            self._prompt_backfill(plan_builder, auto_apply, default_catalog)
 710        elif plan.has_changes and not auto_apply:
 711            self._prompt_promote(plan_builder)
 712        elif plan.has_unmodified_unpromoted and not auto_apply:
 713            self.log_status_update("\n[bold]Virtually updating unmodified models\n")
 714            self._prompt_promote(plan_builder)
 715
 716    def _prompt_categorize(
 717        self,
 718        plan_builder: PlanBuilder,
 719        auto_apply: bool,
 720        no_diff: bool,
 721        no_prompts: bool,
 722        default_catalog: t.Optional[str],
 723    ) -> None:
 724        """Get the user's change category for the directly modified models."""
 725        plan = plan_builder.build()
 726
 727        self.show_model_difference_summary(
 728            plan.context_diff,
 729            plan.environment_naming_info,
 730            default_catalog=default_catalog,
 731            ignored_snapshot_ids=plan.ignored,
 732        )
 733
 734        if not no_diff:
 735            self._show_categorized_snapshots(plan, default_catalog)
 736
 737        for snapshot in plan.uncategorized:
 738            if not no_diff:
 739                self.show_sql(plan.context_diff.text_diff(snapshot.name))
 740            tree = Tree(
 741                f"[bold][direct]Directly Modified: {snapshot.display_name(plan.environment_naming_info, default_catalog)}"
 742            )
 743            indirect_tree = None
 744
 745            for child_sid in sorted(plan.indirectly_modified.get(snapshot.snapshot_id, set())):
 746                child_snapshot = plan.context_diff.snapshots[child_sid]
 747                if not indirect_tree:
 748                    indirect_tree = Tree("[indirect]Indirectly Modified Children:")
 749                    tree.add(indirect_tree)
 750                indirect_tree.add(
 751                    f"[indirect]{child_snapshot.display_name(plan.environment_naming_info, default_catalog)}"
 752                )
 753            self._print(tree)
 754            if not no_prompts:
 755                self._get_snapshot_change_category(
 756                    snapshot, plan_builder, auto_apply, default_catalog
 757                )
 758
 759    def _show_categorized_snapshots(self, plan: Plan, default_catalog: t.Optional[str]) -> None:
 760        context_diff = plan.context_diff
 761
 762        for snapshot in plan.categorized:
 763            if not context_diff.directly_modified(snapshot.name):
 764                continue
 765
 766            category_str = SNAPSHOT_CHANGE_CATEGORY_STR[snapshot.change_category]
 767            tree = Tree(
 768                f"[bold][direct]Directly Modified: {snapshot.display_name(plan.environment_naming_info, default_catalog)} ({category_str})"
 769            )
 770            indirect_tree = None
 771            for child_sid in sorted(plan.indirectly_modified.get(snapshot.snapshot_id, set())):
 772                child_snapshot = context_diff.snapshots[child_sid]
 773                if not indirect_tree:
 774                    indirect_tree = Tree("[indirect]Indirectly Modified Children:")
 775                    tree.add(indirect_tree)
 776                child_category_str = SNAPSHOT_CHANGE_CATEGORY_STR[child_snapshot.change_category]
 777                indirect_tree.add(
 778                    f"[indirect]{child_snapshot.display_name(plan.environment_naming_info, default_catalog)} ({child_category_str})"
 779                )
 780            self._print(Syntax(context_diff.text_diff(snapshot.name), "sql", word_wrap=True))
 781            self._print(tree)
 782
 783    def _show_missing_dates(self, plan: Plan, default_catalog: t.Optional[str]) -> None:
 784        """Displays the models with missing dates."""
 785        missing_intervals = plan.missing_intervals
 786        if not missing_intervals:
 787            return
 788        backfill = Tree("[bold]Models needing backfill (missing dates):")
 789        for missing in missing_intervals:
 790            snapshot = plan.context_diff.snapshots[missing.snapshot_id]
 791            if not snapshot.is_model:
 792                continue
 793
 794            preview_modifier = ""
 795            if not plan.deployability_index.is_deployable(snapshot):
 796                preview_modifier = " ([orange1]preview[/orange1])"
 797
 798            backfill.add(
 799                f"{snapshot.display_name(plan.environment_naming_info, default_catalog)}: {missing.format_intervals(snapshot.node.interval_unit)}{preview_modifier}"
 800            )
 801        self._print(backfill)
 802
 803    def _prompt_effective_from(
 804        self, plan_builder: PlanBuilder, auto_apply: bool, default_catalog: t.Optional[str]
 805    ) -> None:
 806        if not plan_builder.build().effective_from:
 807            effective_from = self._prompt(
 808                "Enter the effective date (eg. '1 year', '2020-01-01') to apply forward-only changes retroactively or blank to only apply them going forward once changes are deployed to prod"
 809            )
 810            if effective_from:
 811                plan_builder.set_effective_from(effective_from)
 812
 813    def _prompt_backfill(
 814        self, plan_builder: PlanBuilder, auto_apply: bool, default_catalog: t.Optional[str]
 815    ) -> None:
 816        plan = plan_builder.build()
 817        is_forward_only_dev = plan.is_dev and plan.forward_only
 818        backfill_or_preview = "preview" if is_forward_only_dev else "backfill"
 819
 820        if plan_builder.is_start_and_end_allowed:
 821            if not plan_builder.override_start:
 822                if is_forward_only_dev:
 823                    if plan.effective_from:
 824                        blank_meaning = f"to preview starting from the effective date ('{time_like_to_str(plan.effective_from)}')"
 825                        default_start = plan.effective_from
 826                    else:
 827                        blank_meaning = "to preview starting from yesterday"
 828                        default_start = yesterday_ds()
 829                else:
 830                    if plan.provided_start:
 831                        blank_meaning = f"starting from '{time_like_to_str(plan.provided_start)}'"
 832                    else:
 833                        blank_meaning = "from the beginning of history"
 834                    default_start = None
 835
 836                start = self._prompt(
 837                    f"Enter the {backfill_or_preview} start date (eg. '1 year', '2020-01-01') or blank to backfill {blank_meaning}",
 838                )
 839                if start:
 840                    plan_builder.set_start(start)
 841                elif default_start:
 842                    plan_builder.set_start(default_start)
 843
 844            if not plan_builder.override_end:
 845                if plan.provided_end:
 846                    blank_meaning = f"'{time_like_to_str(plan.provided_end)}'"
 847                else:
 848                    blank_meaning = "now"
 849                end = self._prompt(
 850                    f"Enter the {backfill_or_preview} end date (eg. '1 month ago', '2020-01-01') or blank to {backfill_or_preview} up until {blank_meaning}",
 851                )
 852                if end:
 853                    plan_builder.set_end(end)
 854
 855            plan = plan_builder.build()
 856
 857        if plan.ignored:
 858            self._print(
 859                self._get_ignored_tree(
 860                    plan.ignored,
 861                    plan.context_diff.snapshots,
 862                    plan.environment_naming_info,
 863                    default_catalog,
 864                )
 865            )
 866        if not auto_apply and self._confirm(f"Apply - {backfill_or_preview.capitalize()} Tables"):
 867            plan_builder.apply()
 868
 869    def _prompt_promote(self, plan_builder: PlanBuilder) -> None:
 870        if self._confirm(
 871            "Apply - Virtual Update",
 872        ):
 873            plan_builder.apply()
 874
 875    def log_test_results(
 876        self, result: unittest.result.TestResult, output: str, target_dialect: str
 877    ) -> None:
 878        divider_length = 70
 879        if result.wasSuccessful():
 880            self._print("=" * divider_length)
 881            self._print(
 882                f"Successfully Ran {str(result.testsRun)} tests against {target_dialect}",
 883                style="green",
 884            )
 885            self._print("-" * divider_length)
 886        else:
 887            self._print("-" * divider_length)
 888            self._print("Test Failure Summary")
 889            self._print("=" * divider_length)
 890            self._print(
 891                f"Num Successful Tests: {result.testsRun - len(result.failures) - len(result.errors)}"
 892            )
 893            for test, _ in result.failures + result.errors:
 894                if isinstance(test, ModelTest):
 895                    self._print(f"Failure Test: {test.model.name} {test.test_name}")
 896            self._print("=" * divider_length)
 897            self._print(output)
 898
 899    def show_sql(self, sql: str) -> None:
 900        self._print(Syntax(sql, "sql", word_wrap=True), crop=False)
 901
 902    def log_status_update(self, message: str) -> None:
 903        self._print(message)
 904
 905    def log_error(self, message: str) -> None:
 906        self._print(f"[red]{message}[/red]")
 907
 908    def log_success(self, message: str) -> None:
 909        self._print(f"\n[green]{message}[/green]\n")
 910
 911    def loading_start(self, message: t.Optional[str] = None) -> uuid.UUID:
 912        id = uuid.uuid4()
 913        self.loading_status[id] = Status(message or "", console=self.console, spinner="line")
 914        self.loading_status[id].start()
 915        return id
 916
 917    def loading_stop(self, id: uuid.UUID) -> None:
 918        self.loading_status[id].stop()
 919        del self.loading_status[id]
 920
 921    def show_schema_diff(self, schema_diff: SchemaDiff) -> None:
 922        source_name = schema_diff.source
 923        if schema_diff.source_alias:
 924            source_name = schema_diff.source_alias.upper()
 925        target_name = schema_diff.target
 926        if schema_diff.target_alias:
 927            target_name = schema_diff.target_alias.upper()
 928
 929        first_line = f"\n[b]Schema Diff Between '[yellow]{source_name}[/yellow]' and '[green]{target_name}[/green]'"
 930        if schema_diff.model_name:
 931            first_line = (
 932                first_line + f" environments for model '[blue]{schema_diff.model_name}[/blue]'"
 933            )
 934
 935        tree = Tree(first_line + ":")
 936
 937        if any([schema_diff.added, schema_diff.removed, schema_diff.modified]):
 938            if schema_diff.added:
 939                added = Tree("[green]Added Columns:")
 940                for c, t in schema_diff.added:
 941                    added.add(f"[green]{c} ({t})")
 942                tree.add(added)
 943
 944            if schema_diff.removed:
 945                removed = Tree("[red]Removed Columns:")
 946                for c, t in schema_diff.removed:
 947                    removed.add(f"[red]{c} ({t})")
 948                tree.add(removed)
 949
 950            if schema_diff.modified:
 951                modified = Tree("[magenta]Modified Columns:")
 952                for c, (ft, tt) in schema_diff.modified.items():
 953                    modified.add(f"[magenta]{c} ({ft} -> {tt})")
 954                tree.add(modified)
 955        else:
 956            tree.add("[b]Schemas match")
 957
 958        self.console.print(tree)
 959
 960    def show_row_diff(self, row_diff: RowDiff, show_sample: bool = True) -> None:
 961        source_name = row_diff.source
 962        if row_diff.source_alias:
 963            source_name = row_diff.source_alias.upper()
 964        target_name = row_diff.target
 965        if row_diff.target_alias:
 966            target_name = row_diff.target_alias.upper()
 967
 968        tree = Tree("[b]Row Counts:[/b]")
 969        tree.add(f" [b][blue]COMMON[/blue]:[/b] {row_diff.join_count} rows")
 970        tree.add(f" [b][yellow]{source_name} ONLY[/yellow]:[/b] {row_diff.s_only_count} rows")
 971        tree.add(f" [b][green]{target_name} ONLY[/green]:[/b] {row_diff.t_only_count} rows")
 972        self.console.print("\n", tree)
 973
 974        self.console.print("\n[b][blue]COMMON ROWS[/blue] column comparison stats:[/b]")
 975        if row_diff.column_stats.shape[0] > 0:
 976            self.console.print(row_diff.column_stats.to_string(index=True), end="\n\n")
 977        else:
 978            self.console.print("  No columns with same name and data type in both tables")
 979
 980        if show_sample:
 981            self.console.print("\n[b][blue]COMMON ROWS[/blue] sample data differences:[/b]")
 982            if row_diff.joined_sample.shape[0] > 0:
 983                self.console.print(row_diff.joined_sample.to_string(index=False), end="\n\n")
 984            else:
 985                self.console.print("  All joined rows match")
 986
 987            if row_diff.s_sample.shape[0] > 0:
 988                self.console.print(f"\n[b][yellow]{source_name} ONLY[/yellow] sample rows:[/b]")
 989                self.console.print(row_diff.s_sample.to_string(index=False), end="\n\n")
 990
 991            if row_diff.t_sample.shape[0] > 0:
 992                self.console.print(f"\n[b][green]{target_name} ONLY[/green] sample rows:[/b]")
 993                self.console.print(row_diff.t_sample.to_string(index=False), end="\n\n")
 994
 995    def _get_snapshot_change_category(
 996        self,
 997        snapshot: Snapshot,
 998        plan_builder: PlanBuilder,
 999        auto_apply: bool,
1000        default_catalog: t.Optional[str],
1001    ) -> None:
1002        choices = self._snapshot_change_choices(
1003            snapshot, plan_builder.environment_naming_info, default_catalog
1004        )
1005        response = self._prompt(
1006            "\n".join([f"[{i+1}] {choice}" for i, choice in enumerate(choices.values())]),
1007            show_choices=False,
1008            choices=[f"{i+1}" for i in range(len(choices))],
1009        )
1010        choice = list(choices)[int(response) - 1]
1011        plan_builder.set_choice(snapshot, choice)
1012
1013    def _snapshot_change_choices(
1014        self,
1015        snapshot: Snapshot,
1016        environment_naming_info: EnvironmentNamingInfo,
1017        default_catalog: t.Optional[str],
1018        use_rich_formatting: bool = True,
1019    ) -> t.Dict[SnapshotChangeCategory, str]:
1020        direct = snapshot.display_name(environment_naming_info, default_catalog)
1021        if use_rich_formatting:
1022            direct = f"[direct]{direct}[/direct]"
1023        indirect = "indirectly modified children"
1024        if use_rich_formatting:
1025            indirect = f"[indirect]{indirect}[/indirect]"
1026        if snapshot.is_view:
1027            choices = {
1028                SnapshotChangeCategory.BREAKING: f"Update {direct} and backfill {indirect}",
1029                SnapshotChangeCategory.NON_BREAKING: f"Update {direct} but don't backfill {indirect}",
1030            }
1031        elif snapshot.is_symbolic:
1032            choices = {
1033                SnapshotChangeCategory.BREAKING: f"Backfill {indirect}",
1034                SnapshotChangeCategory.NON_BREAKING: f"Don't backfill {indirect}",
1035            }
1036        else:
1037            choices = {
1038                SnapshotChangeCategory.BREAKING: f"Backfill {direct} and {indirect}",
1039                SnapshotChangeCategory.NON_BREAKING: f"Backfill {direct} but not {indirect}",
1040            }
1041        labeled_choices = {
1042            k: f"[{SNAPSHOT_CHANGE_CATEGORY_STR[k]}] {v}" for k, v in choices.items()
1043        }
1044        return labeled_choices
1045
1046
1047def add_to_layout_widget(target_widget: LayoutWidget, *widgets: widgets.Widget) -> LayoutWidget:
1048    """Helper function to add a widget to a layout widget.
1049
1050    Args:
1051        target_widget: The layout widget to add the other widget(s) to.
1052        *widgets: The widgets to add to the layout widget.
1053
1054    Returns:
1055        The layout widget with the children added.
1056    """
1057    target_widget.children += tuple(widgets)
1058    return target_widget
1059
1060
1061class NotebookMagicConsole(TerminalConsole):
1062    """
1063    Console to be used when using the magic notebook interface (`%<command>`).
1064    Generally reuses the Terminal console when possible by either directly outputing what it provides
1065    or capturing it and converting it into a widget.
1066    """
1067
1068    def __init__(
1069        self,
1070        display: t.Optional[t.Callable] = None,
1071        console: t.Optional[RichConsole] = None,
1072        **kwargs: t.Any,
1073    ) -> None:
1074        import ipywidgets as widgets
1075        from IPython import get_ipython
1076        from IPython.display import display as ipython_display
1077
1078        super().__init__(console, **kwargs)
1079
1080        self.display = display or get_ipython().user_ns.get("display", ipython_display)
1081        self.missing_dates_output = widgets.Output()
1082        self.dynamic_options_after_categorization_output = widgets.VBox()
1083
1084    def _show_missing_dates(self, plan: Plan, default_catalog: t.Optional[str]) -> None:
1085        self._add_to_dynamic_options(self.missing_dates_output)
1086        self.missing_dates_output.outputs = ()
1087        with self.missing_dates_output:
1088            super()._show_missing_dates(plan, default_catalog)
1089
1090    def _apply(self, button: widgets.Button) -> None:
1091        button.disabled = True
1092        with button.output:
1093            button.plan_builder.apply()
1094
1095    def _prompt_promote(self, plan_builder: PlanBuilder) -> None:
1096        import ipywidgets as widgets
1097
1098        button = widgets.Button(
1099            description="Apply - Virtual Update",
1100            disabled=False,
1101            button_style="success",
1102            # Auto will make the button really large.
1103            # Likely changing this soon anyways to be just `Apply` with description above
1104            layout={"width": "10rem"},
1105        )
1106        self._add_to_dynamic_options(button)
1107        output = widgets.Output()
1108        self._add_to_dynamic_options(output)
1109
1110        button.plan_builder = plan_builder
1111        button.on_click(self._apply)
1112        button.output = output
1113
1114    def _prompt_effective_from(
1115        self, plan_builder: PlanBuilder, auto_apply: bool, default_catalog: t.Optional[str]
1116    ) -> None:
1117        import ipywidgets as widgets
1118
1119        prompt = widgets.VBox()
1120
1121        def effective_from_change_callback(change: t.Dict[str, datetime.datetime]) -> None:
1122            plan_builder.set_effective_from(change["new"])
1123            self._show_options_after_categorization(plan_builder, auto_apply, default_catalog)
1124
1125        def going_forward_change_callback(change: t.Dict[str, bool]) -> None:
1126            checked = change["new"]
1127            plan_builder.set_effective_from(None if checked else yesterday_ds())
1128            self._show_options_after_categorization(
1129                plan_builder, auto_apply=auto_apply, default_catalog=default_catalog
1130            )
1131
1132        date_picker = widgets.DatePicker(
1133            disabled=plan_builder.build().effective_from is None,
1134            value=to_date(plan_builder.build().effective_from or yesterday_ds()),
1135            layout={"width": "auto"},
1136        )
1137        date_picker.observe(effective_from_change_callback, "value")
1138
1139        going_forward_checkbox = widgets.Checkbox(
1140            value=plan_builder.build().effective_from is None,
1141            description="Apply Going Forward Once Deployed To Prod",
1142            disabled=False,
1143            indent=False,
1144        )
1145        going_forward_checkbox.observe(going_forward_change_callback, "value")
1146
1147        add_to_layout_widget(
1148            prompt,
1149            widgets.HBox(
1150                [
1151                    widgets.Label("Effective From Date:", layout={"width": "8rem"}),
1152                    date_picker,
1153                    going_forward_checkbox,
1154                ]
1155            ),
1156        )
1157
1158        self._add_to_dynamic_options(prompt)
1159
1160    def _prompt_backfill(
1161        self, plan_builder: PlanBuilder, auto_apply: bool, default_catalog: t.Optional[str]
1162    ) -> None:
1163        import ipywidgets as widgets
1164
1165        prompt = widgets.VBox()
1166
1167        backfill_or_preview = (
1168            "Preview"
1169            if plan_builder.build().is_dev and plan_builder.build().forward_only
1170            else "Backfill"
1171        )
1172
1173        def _date_picker(
1174            plan_builder: PlanBuilder, value: t.Any, on_change: t.Callable, disabled: bool = False
1175        ) -> widgets.DatePicker:
1176            picker = widgets.DatePicker(
1177                disabled=disabled,
1178                value=value,
1179                layout={"width": "auto"},
1180            )
1181
1182            picker.observe(on_change, "value")
1183            return picker
1184
1185        def start_change_callback(change: t.Dict[str, datetime.datetime]) -> None:
1186            plan_builder.set_start(change["new"])
1187            self._show_options_after_categorization(plan_builder, auto_apply, default_catalog)
1188
1189        def end_change_callback(change: t.Dict[str, datetime.datetime]) -> None:
1190            plan_builder.set_end(change["new"])
1191            self._show_options_after_categorization(plan_builder, auto_apply, default_catalog)
1192
1193        if plan_builder.is_start_and_end_allowed:
1194            add_to_layout_widget(
1195                prompt,
1196                widgets.HBox(
1197                    [
1198                        widgets.Label(
1199                            f"Start {backfill_or_preview} Date:", layout={"width": "8rem"}
1200                        ),
1201                        _date_picker(
1202                            plan_builder, to_date(plan_builder.build().start), start_change_callback
1203                        ),
1204                    ]
1205                ),
1206            )
1207
1208            add_to_layout_widget(
1209                prompt,
1210                widgets.HBox(
1211                    [
1212                        widgets.Label(f"End {backfill_or_preview} Date:", layout={"width": "8rem"}),
1213                        _date_picker(
1214                            plan_builder,
1215                            to_date(plan_builder.build().end),
1216                            end_change_callback,
1217                        ),
1218                    ]
1219                ),
1220            )
1221
1222        self._add_to_dynamic_options(prompt)
1223
1224        if not auto_apply:
1225            button = widgets.Button(
1226                description=f"Apply - {backfill_or_preview} Tables",
1227                disabled=False,
1228                button_style="success",
1229            )
1230            self._add_to_dynamic_options(button)
1231            output = widgets.Output()
1232            self._add_to_dynamic_options(output)
1233
1234            button.plan_builder = plan_builder
1235            button.on_click(self._apply)
1236            button.output = output
1237
1238    def _show_options_after_categorization(
1239        self, plan_builder: PlanBuilder, auto_apply: bool, default_catalog: t.Optional[str]
1240    ) -> None:
1241        self.dynamic_options_after_categorization_output.children = ()
1242        self.display(self.dynamic_options_after_categorization_output)
1243        super()._show_options_after_categorization(plan_builder, auto_apply, default_catalog)
1244
1245    def _add_to_dynamic_options(self, *widgets: widgets.Widget) -> None:
1246        add_to_layout_widget(self.dynamic_options_after_categorization_output, *widgets)
1247
1248    def _get_snapshot_change_category(
1249        self,
1250        snapshot: Snapshot,
1251        plan_builder: PlanBuilder,
1252        auto_apply: bool,
1253        default_catalog: t.Optional[str],
1254    ) -> None:
1255        import ipywidgets as widgets
1256
1257        choice_mapping = self._snapshot_change_choices(
1258            snapshot,
1259            plan_builder.environment_naming_info,
1260            default_catalog,
1261            use_rich_formatting=False,
1262        )
1263        choices = list(choice_mapping)
1264        plan_builder.set_choice(snapshot, choices[0])
1265
1266        def radio_button_selected(change: t.Dict[str, t.Any]) -> None:
1267            plan_builder.set_choice(snapshot, choices[change["owner"].index])
1268            self._show_options_after_categorization(plan_builder, auto_apply, default_catalog)
1269
1270        radio = widgets.RadioButtons(
1271            options=choice_mapping.values(),
1272            layout={"width": "max-content"},
1273            disabled=False,
1274        )
1275        radio.observe(
1276            radio_button_selected,
1277            "value",
1278        )
1279        self.display(radio)
1280
1281    def log_test_results(
1282        self, result: unittest.result.TestResult, output: str, target_dialect: str
1283    ) -> None:
1284        import ipywidgets as widgets
1285
1286        divider_length = 70
1287        shared_style = {
1288            "font-size": "11px",
1289            "font-weight": "bold",
1290            "font-family": "Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace",
1291        }
1292        if result.wasSuccessful():
1293            success_color = {"color": "#008000"}
1294            header = str(h("span", {"style": shared_style}, "-" * divider_length))
1295            message = str(
1296                h(
1297                    "span",
1298                    {"style": {**shared_style, **success_color}},
1299                    f"Successfully Ran {str(result.testsRun)} Tests Against {target_dialect}",
1300                )
1301            )
1302            footer = str(h("span", {"style": shared_style}, "=" * divider_length))
1303            self.display(widgets.HTML("<br>".join([header, message, footer])))
1304        else:
1305            fail_color = {"color": "#db3737"}
1306            fail_shared_style = {**shared_style, **fail_color}
1307            header = str(h("span", {"style": fail_shared_style}, "-" * divider_length))
1308            message = str(h("span", {"style": fail_shared_style}, "Test Failure Summary"))
1309            num_success = str(
1310                h(
1311                    "span",
1312                    {"style": fail_shared_style},
1313                    f"Num Successful Tests: {result.testsRun - len(result.failures) - len(result.errors)}",
1314                )
1315            )
1316            failure_tests = []
1317            for test, _ in result.failures + result.errors:
1318                if isinstance(test, ModelTest):
1319                    failure_tests.append(
1320                        str(
1321                            h(
1322                                "span",
1323                                {"style": fail_shared_style},
1324                                f"Failure Test: {test.model.name} {test.test_name}",
1325                            )
1326                        )
1327                    )
1328            failures = "<br>".join(failure_tests)
1329            footer = str(h("span", {"style": fail_shared_style}, "=" * divider_length))
1330            error_output = widgets.Textarea(output, layout={"height": "300px", "width": "100%"})
1331            test_info = widgets.HTML(
1332                "<br>".join([header, message, footer, num_success, failures, footer])
1333            )
1334            self.display(widgets.VBox(children=[test_info, error_output], layout={"width": "100%"}))
1335
1336
1337class CaptureTerminalConsole(TerminalConsole):
1338    """
1339    Captures the output of the terminal console so that it can be extracted out and displayed within other interfaces.
1340    The captured output is cleared out after it is retrieved.
1341
1342    Note: `_prompt` and `_confirm` need to also be overriden to work with the custom interface if you want to use
1343    this console interactively.
1344    """
1345
1346    def __init__(self, console: t.Optional[RichConsole] = None, **kwargs: t.Any) -> None:
1347        super().__init__(console=console, **kwargs)
1348        self._captured_outputs: t.List[str] = []
1349        self._errors: t.List[str] = []
1350
1351    @property
1352    def captured_output(self) -> str:
1353        return "".join(self._captured_outputs)
1354
1355    @property
1356    def captured_errors(self) -> str:
1357        return "".join(self._errors)
1358
1359    def consume_captured_output(self) -> str:
1360        output = self.captured_output
1361        self.clear_captured_outputs()
1362        return output
1363
1364    def consume_captured_errors(self) -> str:
1365        errors = self.captured_errors
1366        self.clear_captured_errors()
1367        return errors
1368
1369    def clear_captured_outputs(self) -> None:
1370        self._captured_outputs = []
1371
1372    def clear_captured_errors(self) -> None:
1373        self._errors = []
1374
1375    def log_error(self, message: str) -> None:
1376        self._errors.append(message)
1377        super().log_error(message)
1378
1379    def _print(self, value: t.Any, **kwargs: t.Any) -> None:
1380        with self.console.capture() as capture:
1381            self.console.print(value, **kwargs)
1382        self._captured_outputs.append(capture.get())
1383
1384
1385class MarkdownConsole(CaptureTerminalConsole):
1386    """
1387    A console that outputs markdown. Currently this is only configured for non-interactive use so for use cases
1388    where you want to display a plan or test results in markdown.
1389    """
1390
1391    def show_model_difference_summary(
1392        self,
1393        context_diff: ContextDiff,
1394        environment_naming_info: EnvironmentNamingInfo,
1395        default_catalog: t.Optional[str],
1396        no_diff: bool = True,
1397        ignored_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None,
1398    ) -> None:
1399        """Shows a summary of the differences.
1400
1401        Args:
1402            context_diff: The context diff to use to print the summary.
1403            environment_naming_info: The environment naming info to reference when printing model names
1404            default_catalog: The default catalog to reference when deciding to remove catalog from display names
1405            no_diff: Hide the actual SQL differences.
1406            ignored_snapshot_ids: A set of snapshot names that are ignored
1407        """
1408        ignored_snapshot_ids = ignored_snapshot_ids or set()
1409        if context_diff.is_new_environment:
1410            self._print(
1411                f"**New environment `{context_diff.environment}` will be created from `{context_diff.create_from}`**\n"
1412            )
1413            if not context_diff.has_snapshot_changes:
1414                return
1415
1416        if not context_diff.has_changes:
1417            self._print(f"**No differences when compared to `{context_diff.environment}`**\n")
1418            return
1419
1420        self._print(f"**Summary of differences against `{context_diff.environment}`:**\n")
1421
1422        added_snapshots = {
1423            context_diff.snapshots[s_id]
1424            for s_id in context_diff.added
1425            if s_id not in ignored_snapshot_ids
1426        }
1427        added_snapshot_models = {s for s in added_snapshots if s.is_model}
1428        if added_snapshot_models:
1429            self._print("\n**Added Models:**")
1430            for snapshot in sorted(added_snapshot_models):
1431                self._print(
1432                    f"- `{snapshot.display_name(environment_naming_info, default_catalog)}`"
1433                )
1434
1435        added_snapshot_audits = {s for s in added_snapshots if s.is_audit}
1436        if added_snapshot_audits:
1437            self._print("\n**Added Standalone Audits:**")
1438            for snapshot in sorted(added_snapshot_audits):
1439                self._print(
1440                    f"- `{snapshot.display_name(environment_naming_info, default_catalog)}`"
1441                )
1442
1443        removed_snapshot_table_infos = {
1444            snapshot_table_info
1445            for s_id, snapshot_table_info in context_diff.removed_snapshots.items()
1446            if s_id not in ignored_snapshot_ids
1447        }
1448        removed_model_snapshot_table_infos = {s for s in removed_snapshot_table_infos if s.is_model}
1449        if removed_model_snapshot_table_infos:
1450            self._print("\n**Removed Models:**")
1451            for snapshot_table_info in sorted(removed_model_snapshot_table_infos):
1452                self._print(
1453                    f"- `{snapshot_table_info.display_name(environment_naming_info, default_catalog)}`"
1454                )
1455
1456        removed_audit_snapshot_table_infos = {s for s in removed_snapshot_table_infos if s.is_audit}
1457        if removed_audit_snapshot_table_infos:
1458            self._print("\n**Removed Standalone Audits:**")
1459            for snapshot_table_info in sorted(removed_audit_snapshot_table_infos):
1460                self._print(
1461                    f"- `{snapshot_table_info.display_name(environment_naming_info, default_catalog)}`"
1462                )
1463
1464        modified_snapshots = {
1465            current_snapshot
1466            for current_snapshot, _ in context_diff.modified_snapshots.values()
1467            if current_snapshot.snapshot_id not in ignored_snapshot_ids
1468        }
1469        if modified_snapshots:
1470            directly_modified = []
1471            indirectly_modified = []
1472            metadata_modified = []
1473            for snapshot in modified_snapshots:
1474                if context_diff.directly_modified(snapshot.name):
1475                    directly_modified.append(snapshot)
1476                elif context_diff.indirectly_modified(snapshot.name):
1477                    indirectly_modified.append(snapshot)
1478                elif context_diff.metadata_updated(snapshot.name):
1479                    metadata_modified.append(snapshot)
1480            if directly_modified:
1481                self._print("\n**Directly Modified:**")
1482                for snapshot in sorted(directly_modified):
1483                    self._print(
1484                        f"- `{snapshot.display_name(environment_naming_info, default_catalog)}`"
1485                    )
1486                    if not no_diff:
1487                        self._print(f"```diff\n{context_diff.text_diff(snapshot.name)}\n```")
1488            if indirectly_modified:
1489                self._print("\n**Indirectly Modified:**")
1490                for snapshot in sorted(indirectly_modified):
1491                    self._print(
1492                        f"- `{snapshot.display_name(environment_naming_info, default_catalog)}`"
1493                    )
1494            if metadata_modified:
1495                self._print("\n**Metadata Updated:**")
1496                for snapshot in sorted(metadata_modified):
1497                    self._print(
1498                        f"- `{snapshot.display_name(environment_naming_info, default_catalog)}`"
1499                    )
1500        if ignored_snapshot_ids:
1501            self._print("\n**Ignored Models (Expected Plan Start):**")
1502            for s_id in sorted(ignored_snapshot_ids):
1503                snapshot = context_diff.snapshots[s_id]
1504                self._print(
1505                    f"- `{snapshot.display_name(environment_naming_info, default_catalog)}` ({snapshot.get_latest(start_date(snapshot, context_diff.snapshots.values()))})"
1506                )
1507
1508    def _show_missing_dates(self, plan: Plan, default_catalog: t.Optional[str]) -> None:
1509        """Displays the models with missing dates."""
1510        missing_intervals = plan.missing_intervals
1511        if not missing_intervals:
1512            return
1513        self._print("\n**Models needing backfill (missing dates):**")
1514        for missing in missing_intervals:
1515            snapshot = plan.context_diff.snapshots[missing.snapshot_id]
1516            if not snapshot.is_model:
1517                continue
1518
1519            preview_modifier = ""
1520            if not plan.deployability_index.is_deployable(snapshot):
1521                preview_modifier = " (**preview**)"
1522
1523            self._print(
1524                f"* `{snapshot.display_name(plan.environment_naming_info, default_catalog)}`: {missing.format_intervals(snapshot.node.interval_unit)}{preview_modifier}"
1525            )
1526
1527    def _show_categorized_snapshots(self, plan: Plan, default_catalog: t.Optional[str]) -> None:
1528        context_diff = plan.context_diff
1529        for snapshot in plan.categorized:
1530            if not context_diff.directly_modified(snapshot.name):
1531                continue
1532
1533            category_str = SNAPSHOT_CHANGE_CATEGORY_STR[snapshot.change_category]
1534            tree = Tree(
1535                f"[bold][direct]Directly Modified: {snapshot.display_name(plan.environment_naming_info, default_catalog)} ({category_str})"
1536            )
1537            indirect_tree = None
1538            for child_sid in sorted(plan.indirectly_modified.get(snapshot.snapshot_id, set())):
1539                child_snapshot = context_diff.snapshots[child_sid]
1540                if not indirect_tree:
1541                    indirect_tree = Tree("[indirect]Indirectly Modified Children:")
1542                    tree.add(indirect_tree)
1543                child_category_str = SNAPSHOT_CHANGE_CATEGORY_STR[child_snapshot.change_category]
1544                indirect_tree.add(
1545                    f"[indirect]{child_snapshot.display_name(plan.environment_naming_info, default_catalog)} ({child_category_str})"
1546                )
1547            self._print(f"```diff\n{context_diff.text_diff(snapshot.name)}\n```\n")
1548            self._print("```\n")
1549            self._print(tree)
1550            self._print("\n```")
1551
1552    def log_test_results(
1553        self, result: unittest.result.TestResult, output: str, target_dialect: str
1554    ) -> None:
1555        # import ipywidgets as widgets
1556        if result.wasSuccessful():
1557            self._print(
1558                f"**Successfully Ran `{str(result.testsRun)}` Tests Against `{target_dialect}`**\n\n"
1559            )
1560        else:
1561            self._print(
1562                f"**Num Successful Tests: {result.testsRun - len(result.failures) - len(result.errors)}**\n\n"
1563            )
1564            for test, _ in result.failures + result.errors:
1565                if isinstance(test, ModelTest):
1566                    self._print(f"* Failure Test: `{test.model.name}` - `{test.test_name}`\n\n")
1567            self._print(f"```{output}```\n\n")
1568
1569    def log_error(self, message: str) -> None:
1570        super().log_error(f"```\n{message}```\n\n")
1571
1572
1573class DatabricksMagicConsole(CaptureTerminalConsole):
1574    """
1575    Note: Databricks Magic Console currently does not support progress bars while a plan is being applied. The
1576    NotebookMagicConsole does support progress bars, but they will time out after 5 minutes of execution
1577    and it makes it difficult to see the progress of the plan.
1578    """
1579
1580    def __init__(self, *args: t.Any, **kwargs: t.Any) -> None:
1581        super().__init__(*args, **kwargs)
1582        self.evaluation_batch_progress: t.Dict[SnapshotId, t.Tuple[str, int]] = {}
1583        self.promotion_status: t.Tuple[int, int] = (0, 0)
1584        self.model_creation_status: t.Tuple[int, int] = (0, 0)
1585        self.migration_status: t.Tuple[int, int] = (0, 0)
1586
1587    def _print(self, value: t.Any, **kwargs: t.Any) -> None:
1588        super()._print(value, **kwargs)
1589        for captured_output in self._captured_outputs:
1590            print(captured_output)
1591        self.clear_captured_outputs()
1592
1593    def _prompt(self, message: str, **kwargs: t.Any) -> t.Any:
1594        self._print(message)
1595        return super()._prompt("", **kwargs)
1596
1597    def _confirm(self, message: str, **kwargs: t.Any) -> bool:
1598        message = f"{message} [y/n]"
1599        self._print(message)
1600        return super()._confirm("", **kwargs)
1601
1602    def start_evaluation_progress(
1603        self,
1604        batches: t.Dict[Snapshot, int],
1605        environment_naming_info: EnvironmentNamingInfo,
1606        default_catalog: t.Optional[str],
1607    ) -> None:
1608        self.evaluation_batches = batches
1609        self.evaluation_environment_naming_info = environment_naming_info
1610        self.default_catalog = default_catalog
1611
1612    def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
1613        if not self.evaluation_batch_progress.get(snapshot.snapshot_id):
1614            display_name = snapshot.display_name(
1615                self.evaluation_environment_naming_info, self.default_catalog
1616            )
1617            self.evaluation_batch_progress[snapshot.snapshot_id] = (display_name, 0)
1618            print(f"Starting '{display_name}', Total batches: {self.evaluation_batches[snapshot]}")
1619
1620    def update_snapshot_evaluation_progress(
1621        self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int]
1622    ) -> None:
1623        view_name, loaded_batches = self.evaluation_batch_progress[snapshot.snapshot_id]
1624        total_batches = self.evaluation_batches[snapshot]
1625
1626        loaded_batches += 1
1627        self.evaluation_batch_progress[snapshot.snapshot_id] = (view_name, loaded_batches)
1628
1629        finished_loading = loaded_batches == total_batches
1630        status = "Loaded" if finished_loading else "Loading"
1631        print(f"{status} '{view_name}', Completed Batches: {loaded_batches}/{total_batches}")
1632        if finished_loading:
1633            total_finished_loading = len(
1634                [
1635                    s
1636                    for s, total in self.evaluation_batches.items()
1637                    if self.evaluation_batch_progress.get(s.snapshot_id, (None, -1))[1] == total
1638                ]
1639            )
1640            total = len(self.evaluation_batch_progress)
1641            print(f"Completed Loading {total_finished_loading}/{total} Models")
1642
1643    def stop_evaluation_progress(self, success: bool = True) -> None:
1644        self.evaluation_batch_progress = {}
1645        super().stop_evaluation_progress(success)
1646        print(f"Loading {'succeeded' if success else 'failed'}")
1647
1648    def start_creation_progress(
1649        self,
1650        total_tasks: int,
1651        environment_naming_info: EnvironmentNamingInfo,
1652        default_catalog: t.Optional[str],
1653    ) -> None:
1654        """Indicates that a new creation progress has begun."""
1655        self.model_creation_status = (0, total_tasks)
1656        print("Starting Creating New Model Versions")
1657
1658    def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None:
1659        """Update the snapshot creation progress."""
1660        num_creations, total_creations = self.model_creation_status
1661        num_creations += 1
1662        self.model_creation_status = (num_creations, total_creations)
1663        if num_creations % 5 == 0:
1664            print(f"Created New Model Versions: {num_creations}/{total_creations}")
1665
1666    def stop_creation_progress(self, success: bool = True) -> None:
1667        """Stop the snapshot creation progress."""
1668        self.model_creation_status = (0, 0)
1669        print(f"New Model Creation {'succeeded' if success else 'failed'}")
1670
1671    def start_promotion_progress(
1672        self,
1673        total_tasks: int,
1674        environment_naming_info: EnvironmentNamingInfo,
1675        default_catalog: t.Optional[str],
1676    ) -> None:
1677        """Indicates that a new snapshot promotion progress has begun."""
1678        self.promotion_status = (0, total_tasks)
1679        print(f"Virtually Updating '{environment_naming_info.name}'")
1680
1681    def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
1682        """Update the snapshot promotion progress."""
1683        num_promotions, total_promotions = self.promotion_status
1684        num_promotions += 1
1685        self.promotion_status = (num_promotions, total_promotions)
1686        if num_promotions % 5 == 0:
1687            print(f"Virtually Updated {num_promotions}/{total_promotions}")
1688
1689    def stop_promotion_progress(self, success: bool = True) -> None:
1690        """Stop the snapshot promotion progress."""
1691        self.promotion_status = (0, 0)
1692        print(f"Virtual Update {'succeeded' if success else 'failed'}")
1693
1694    def start_migration_progress(self, total_tasks: int) -> None:
1695        """Indicates that a new migration progress has begun."""
1696        self.migration_status = (0, total_tasks)
1697        print("Starting Migration")
1698
1699    def update_migration_progress(self, num_tasks: int) -> None:
1700        """Update the migration progress."""
1701        num_migrations, total_migrations = self.migration_status
1702        num_migrations += num_tasks
1703        self.migration_status = (num_migrations, total_migrations)
1704        if num_migrations % 5 == 0:
1705            print(f"Migration Updated {num_migrations}/{total_migrations}")
1706
1707    def stop_migration_progress(self, success: bool = True) -> None:
1708        """Stop the migration progress."""
1709        self.migration_status = (0, 0)
1710        print(f"Migration {'succeeded' if success else 'failed'}")
1711
1712
1713class DebuggerTerminalConsole(TerminalConsole):
1714    """A terminal console to use while debugging with no fluff, progress bars, etc."""
1715
1716    def __init__(self, console: t.Optional[RichConsole], *args: t.Any, **kwargs: t.Any) -> None:
1717        self.console: RichConsole = console or srich.console
1718
1719    def _write(self, msg: t.Any, *args: t.Any, **kwargs: t.Any) -> None:
1720        self.console.log(msg, *args, **kwargs)
1721
1722    def start_plan_evaluation(self, plan: Plan) -> None:
1723        self._write("Starting plan", plan.plan_id)
1724
1725    def stop_plan_evaluation(self) -> None:
1726        self._write("Stopping plan")
1727
1728    def start_evaluation_progress(
1729        self,
1730        batches: t.Dict[Snapshot, int],
1731        environment_naming_info: EnvironmentNamingInfo,
1732        default_catalog: t.Optional[str],
1733    ) -> None:
1734        self._write(f"Starting evaluation for {len(batches)} snapshots")
1735
1736    def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
1737        self._write(f"Evaluating {snapshot.name}")
1738
1739    def update_snapshot_evaluation_progress(
1740        self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int]
1741    ) -> None:
1742        self._write(f"Evaluating {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms")
1743
1744    def stop_evaluation_progress(self, success: bool = True) -> None:
1745        self._write(f"Stopping evaluation with success={success}")
1746
1747    def start_creation_progress(
1748        self,
1749        total_tasks: int,
1750        environment_naming_info: EnvironmentNamingInfo,
1751        default_catalog: t.Optional[str],
1752    ) -> None:
1753        self._write(f"Starting creation for {total_tasks} snapshots")
1754
1755    def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None:
1756        self._write(f"Creating {snapshot.name}")
1757
1758    def stop_creation_progress(self, success: bool = True) -> None:
1759        self._write(f"Stopping creation with success={success}")
1760
1761    def update_cleanup_progress(self, object_name: str) -> None:
1762        self._write(f"Cleaning up {object_name}")
1763
1764    def start_promotion_progress(
1765        self,
1766        total_tasks: int,
1767        environment_naming_info: EnvironmentNamingInfo,
1768        default_catalog: t.Optional[str],
1769    ) -> None:
1770        self._write(f"Starting promotion for {total_tasks} snapshots")
1771
1772    def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
1773        self._write(f"Promoting {snapshot.name}")
1774
1775    def stop_promotion_progress(self, success: bool = True) -> None:
1776        self._write(f"Stopping promotion with success={success}")
1777
1778    def start_migration_progress(self, total_tasks: int) -> None:
1779        self._write(f"Starting migration for {total_tasks} snapshots")
1780
1781    def update_migration_progress(self, num_tasks: int) -> None:
1782        self._write(f"Migration {num_tasks}")
1783
1784    def stop_migration_progress(self, success: bool = True) -> None:
1785        self._write(f"Stopping migration with success={success}")
1786
1787    def show_model_difference_summary(
1788        self,
1789        context_diff: ContextDiff,
1790        environment_naming_info: EnvironmentNamingInfo,
1791        default_catalog: t.Optional[str],
1792        no_diff: bool = True,
1793        ignored_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None,
1794    ) -> None:
1795        self._write("Model Difference Summary:")
1796        for added in context_diff.new_snapshots:
1797            self._write(f"  Added: {added}")
1798        for removed in context_diff.removed_snapshots:
1799            self._write(f"  Removed: {removed}")
1800        for modified in context_diff.modified_snapshots:
1801            self._write(f"  Modified: {modified}")
1802
1803    def log_test_results(
1804        self, result: unittest.result.TestResult, output: str, target_dialect: str
1805    ) -> None:
1806        self._write("Test Results:", result)
1807
1808    def show_sql(self, sql: str) -> None:
1809        self._write(sql)
1810
1811    def log_status_update(self, message: str) -> None:
1812        self._write(message, style="bold blue")
1813
1814    def log_error(self, message: str) -> None:
1815        self._write(message, style="bold red")
1816
1817    def log_success(self, message: str) -> None:
1818        self._write(message, style="bold green")
1819
1820    def loading_start(self, message: t.Optional[str] = None) -> uuid.UUID:
1821        self._write(message)
1822        return uuid.uuid4()
1823
1824    def loading_stop(self, id: uuid.UUID) -> None:
1825        self._write("Done")
1826
1827    def show_schema_diff(self, schema_diff: SchemaDiff) -> None:
1828        self._write(schema_diff)
1829
1830    def show_row_diff(self, row_diff: RowDiff, show_sample: bool = True) -> None:
1831        self._write(row_diff)
1832
1833
1834def get_console(**kwargs: t.Any) -> TerminalConsole | DatabricksMagicConsole | NotebookMagicConsole:
1835    """
1836    Returns the console that is appropriate for the current runtime environment.
1837
1838    Note: Google Colab environment is untested and currently assumes is compatible with the base
1839    NotebookMagicConsole.
1840    """
1841    from sqlmesh import RuntimeEnv
1842
1843    runtime_env = RuntimeEnv.get()
1844
1845    runtime_env_mapping = {
1846        RuntimeEnv.DATABRICKS: DatabricksMagicConsole,
1847        RuntimeEnv.JUPYTER: NotebookMagicConsole,
1848        RuntimeEnv.TERMINAL: TerminalConsole,
1849        RuntimeEnv.GOOGLE_COLAB: NotebookMagicConsole,
1850        RuntimeEnv.DEBUGGER: DebuggerTerminalConsole,
1851    }
1852    rich_console_kwargs: t.Dict[str, t.Any] = {"theme": srich.theme}
1853    if runtime_env.is_jupyter or runtime_env.is_google_colab:
1854        rich_console_kwargs["force_jupyter"] = True
1855    return runtime_env_mapping[runtime_env](
1856        **{**{"console": RichConsole(**rich_console_kwargs)}, **kwargs}
1857    )
class Console(abc.ABC):
 59class Console(abc.ABC):
 60    """Abstract base class for defining classes used for displaying information to the user and also interact
 61    with them when their input is needed."""
 62
 63    @abc.abstractmethod
 64    def start_plan_evaluation(self, plan: Plan) -> None:
 65        """Indicates that a new evaluation has begun."""
 66
 67    @abc.abstractmethod
 68    def stop_plan_evaluation(self) -> None:
 69        """Indicates that the evaluation has ended."""
 70
 71    @abc.abstractmethod
 72    def start_evaluation_progress(
 73        self,
 74        batches: t.Dict[Snapshot, int],
 75        environment_naming_info: EnvironmentNamingInfo,
 76        default_catalog: t.Optional[str],
 77    ) -> None:
 78        """Indicates that a new snapshot evaluation progress has begun."""
 79
 80    @abc.abstractmethod
 81    def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
 82        """Starts the snapshot evaluation progress."""
 83
 84    @abc.abstractmethod
 85    def update_snapshot_evaluation_progress(
 86        self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int]
 87    ) -> None:
 88        """Updates the snapshot evaluation progress."""
 89
 90    @abc.abstractmethod
 91    def stop_evaluation_progress(self, success: bool = True) -> None:
 92        """Stops the snapshot evaluation progress."""
 93
 94    @abc.abstractmethod
 95    def start_creation_progress(
 96        self,
 97        total_tasks: int,
 98        environment_naming_info: EnvironmentNamingInfo,
 99        default_catalog: t.Optional[str],
100    ) -> None:
101        """Indicates that a new snapshot creation progress has begun."""
102
103    @abc.abstractmethod
104    def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None:
105        """Update the snapshot creation progress."""
106
107    @abc.abstractmethod
108    def stop_creation_progress(self, success: bool = True) -> None:
109        """Stop the snapshot creation progress."""
110
111    @abc.abstractmethod
112    def update_cleanup_progress(self, object_name: str) -> None:
113        """Update the snapshot cleanup progress."""
114
115    @abc.abstractmethod
116    def start_promotion_progress(
117        self,
118        total_tasks: int,
119        environment_naming_info: EnvironmentNamingInfo,
120        default_catalog: t.Optional[str],
121    ) -> None:
122        """Indicates that a new snapshot promotion progress has begun."""
123
124    @abc.abstractmethod
125    def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
126        """Update the snapshot promotion progress."""
127
128    @abc.abstractmethod
129    def stop_promotion_progress(self, success: bool = True) -> None:
130        """Stop the snapshot promotion progress."""
131
132    @abc.abstractmethod
133    def start_migration_progress(self, total_tasks: int) -> None:
134        """Indicates that a new migration progress has begun."""
135
136    @abc.abstractmethod
137    def update_migration_progress(self, num_tasks: int) -> None:
138        """Update the migration progress."""
139
140    @abc.abstractmethod
141    def stop_migration_progress(self, success: bool = True) -> None:
142        """Stop the migration progress."""
143
144    @abc.abstractmethod
145    def show_model_difference_summary(
146        self,
147        context_diff: ContextDiff,
148        environment_naming_info: EnvironmentNamingInfo,
149        default_catalog: t.Optional[str],
150        no_diff: bool = True,
151        ignored_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None,
152    ) -> None:
153        """Displays a summary of differences for the given models."""
154
155    @abc.abstractmethod
156    def plan(
157        self,
158        plan_builder: PlanBuilder,
159        auto_apply: bool,
160        default_catalog: t.Optional[str],
161        no_diff: bool = False,
162        no_prompts: bool = False,
163    ) -> None:
164        """The main plan flow.
165
166        The console should present the user with choices on how to backfill and version the snapshots
167        of a plan.
168
169        Args:
170            plan: The plan to make choices for.
171            auto_apply: Whether to automatically apply the plan after all choices have been made.
172            no_diff: Hide text differences for changed models.
173            no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that
174                if this flag is set to true and there are uncategorized changes the plan creation will
175                fail. Default: False
176        """
177
178    @abc.abstractmethod
179    def log_test_results(
180        self, result: unittest.result.TestResult, output: str, target_dialect: str
181    ) -> None:
182        """Display the test result and output.
183
184        Args:
185            result: The unittest test result that contains metrics like num success, fails, ect.
186            output: The generated output from the unittest.
187            target_dialect: The dialect that tests were run against. Assumes all tests run against the same dialect.
188        """
189
190    @abc.abstractmethod
191    def show_sql(self, sql: str) -> None:
192        """Display to the user SQL."""
193
194    @abc.abstractmethod
195    def log_status_update(self, message: str) -> None:
196        """Display general status update to the user."""
197
198    @abc.abstractmethod
199    def log_error(self, message: str) -> None:
200        """Display error info to the user."""
201
202    @abc.abstractmethod
203    def log_success(self, message: str) -> None:
204        """Display a general successful message to the user."""
205
206    @abc.abstractmethod
207    def loading_start(self, message: t.Optional[str] = None) -> uuid.UUID:
208        """Starts loading and returns a unique ID that can be used to stop the loading. Optionally can display a message."""
209
210    @abc.abstractmethod
211    def loading_stop(self, id: uuid.UUID) -> None:
212        """Stop loading for the given id."""
213
214    @abc.abstractmethod
215    def show_schema_diff(self, schema_diff: SchemaDiff) -> None:
216        """Show table schema diff."""
217
218    @abc.abstractmethod
219    def show_row_diff(self, row_diff: RowDiff, show_sample: bool = True) -> None:
220        """Show table summary diff."""

Abstract base class for defining classes used for displaying information to the user and also interact with them when their input is needed.

@abc.abstractmethod
def start_plan_evaluation(self, plan: sqlmesh.core.plan.definition.Plan) -> None:
63    @abc.abstractmethod
64    def start_plan_evaluation(self, plan: Plan) -> None:
65        """Indicates that a new evaluation has begun."""

Indicates that a new evaluation has begun.

@abc.abstractmethod
def stop_plan_evaluation(self) -> None:
67    @abc.abstractmethod
68    def stop_plan_evaluation(self) -> None:
69        """Indicates that the evaluation has ended."""

Indicates that the evaluation has ended.

@abc.abstractmethod
def start_evaluation_progress( self, batches: Dict[sqlmesh.core.snapshot.definition.Snapshot, int], environment_naming_info: sqlmesh.core.environment.EnvironmentNamingInfo, default_catalog: Union[str, NoneType]) -> None:
71    @abc.abstractmethod
72    def start_evaluation_progress(
73        self,
74        batches: t.Dict[Snapshot, int],
75        environment_naming_info: EnvironmentNamingInfo,
76        default_catalog: t.Optional[str],
77    ) -> None:
78        """Indicates that a new snapshot evaluation progress has begun."""

Indicates that a new snapshot evaluation progress has begun.

@abc.abstractmethod
def start_snapshot_evaluation_progress(self, snapshot: sqlmesh.core.snapshot.definition.Snapshot) -> None:
80    @abc.abstractmethod
81    def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
82        """Starts the snapshot evaluation progress."""

Starts the snapshot evaluation progress.

@abc.abstractmethod
def update_snapshot_evaluation_progress( self, snapshot: sqlmesh.core.snapshot.definition.Snapshot, batch_idx: int, duration_ms: Union[int, NoneType]) -> None:
84    @abc.abstractmethod
85    def update_snapshot_evaluation_progress(
86        self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int]
87    ) -> None:
88        """Updates the snapshot evaluation progress."""

Updates the snapshot evaluation progress.

@abc.abstractmethod
def stop_evaluation_progress(self, success: bool = True) -> None:
90    @abc.abstractmethod
91    def stop_evaluation_progress(self, success: bool = True) -> None:
92        """Stops the snapshot evaluation progress."""

Stops the snapshot evaluation progress.

@abc.abstractmethod
def start_creation_progress( self, total_tasks: int, environment_naming_info: sqlmesh.core.environment.EnvironmentNamingInfo, default_catalog: Union[str, NoneType]) -> None:
 94    @abc.abstractmethod
 95    def start_creation_progress(
 96        self,
 97        total_tasks: int,
 98        environment_naming_info: EnvironmentNamingInfo,
 99        default_catalog: t.Optional[str],
100    ) -> None:
101        """Indicates that a new snapshot creation progress has begun."""

Indicates that a new snapshot creation progress has begun.

@abc.abstractmethod
def update_creation_progress( self, snapshot: Union[sqlmesh.core.snapshot.definition.SnapshotTableInfo, sqlmesh.core.snapshot.definition.Snapshot]) -> None:
103    @abc.abstractmethod
104    def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None:
105        """Update the snapshot creation progress."""

Update the snapshot creation progress.

@abc.abstractmethod
def stop_creation_progress(self, success: bool = True) -> None:
107    @abc.abstractmethod
108    def stop_creation_progress(self, success: bool = True) -> None:
109        """Stop the snapshot creation progress."""

Stop the snapshot creation progress.

@abc.abstractmethod
def update_cleanup_progress(self, object_name: str) -> None:
111    @abc.abstractmethod
112    def update_cleanup_progress(self, object_name: str) -> None:
113        """Update the snapshot cleanup progress."""

Update the snapshot cleanup progress.

@abc.abstractmethod
def start_promotion_progress( self, total_tasks: int, environment_naming_info: sqlmesh.core.environment.EnvironmentNamingInfo, default_catalog: Union[str, NoneType]) -> None:
115    @abc.abstractmethod
116    def start_promotion_progress(
117        self,
118        total_tasks: int,
119        environment_naming_info: EnvironmentNamingInfo,
120        default_catalog: t.Optional[str],
121    ) -> None:
122        """Indicates that a new snapshot promotion progress has begun."""

Indicates that a new snapshot promotion progress has begun.

@abc.abstractmethod
def update_promotion_progress( self, snapshot: Union[sqlmesh.core.snapshot.definition.SnapshotTableInfo, sqlmesh.core.snapshot.definition.Snapshot], promoted: bool) -> None:
124    @abc.abstractmethod
125    def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
126        """Update the snapshot promotion progress."""

Update the snapshot promotion progress.

@abc.abstractmethod
def stop_promotion_progress(self, success: bool = True) -> None:
128    @abc.abstractmethod
129    def stop_promotion_progress(self, success: bool = True) -> None:
130        """Stop the snapshot promotion progress."""

Stop the snapshot promotion progress.

@abc.abstractmethod
def start_migration_progress(self, total_tasks: int) -> None:
132    @abc.abstractmethod
133    def start_migration_progress(self, total_tasks: int) -> None:
134        """Indicates that a new migration progress has begun."""

Indicates that a new migration progress has begun.

@abc.abstractmethod
def update_migration_progress(self, num_tasks: int) -> None:
136    @abc.abstractmethod
137    def update_migration_progress(self, num_tasks: int) -> None:
138        """Update the migration progress."""

Update the migration progress.

@abc.abstractmethod
def stop_migration_progress(self, success: bool = True) -> None:
140    @abc.abstractmethod
141    def stop_migration_progress(self, success: bool = True) -> None:
142        """Stop the migration progress."""

Stop the migration progress.

@abc.abstractmethod
def show_model_difference_summary( self, context_diff: sqlmesh.core.context_diff.ContextDiff, environment_naming_info: sqlmesh.core.environment.EnvironmentNamingInfo, default_catalog: Union[str, NoneType], no_diff: bool = True, ignored_snapshot_ids: Union[Set[sqlmesh.core.snapshot.definition.SnapshotId], NoneType] = None) -> None:
144    @abc.abstractmethod
145    def show_model_difference_summary(
146        self,
147        context_diff: ContextDiff,
148        environment_naming_info: EnvironmentNamingInfo,
149        default_catalog: t.Optional[str],
150        no_diff: bool = True,
151        ignored_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None,
152    ) -> None:
153        """Displays a summary of differences for the given models."""

Displays a summary of differences for the given models.

@abc.abstractmethod
def plan( self, plan_builder: sqlmesh.core.plan.builder.PlanBuilder, auto_apply: bool, default_catalog: Union[str, NoneType], no_diff: bool = False, no_prompts: bool = False) -> None:
155    @abc.abstractmethod
156    def plan(
157        self,
158        plan_builder: PlanBuilder,
159        auto_apply: bool,
160        default_catalog: t.Optional[str],
161        no_diff: bool = False,
162        no_prompts: bool = False,
163    ) -> None:
164        """The main plan flow.
165
166        The console should present the user with choices on how to backfill and version the snapshots
167        of a plan.
168
169        Args:
170            plan: The plan to make choices for.
171            auto_apply: Whether to automatically apply the plan after all choices have been made.
172            no_diff: Hide text differences for changed models.
173            no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that
174                if this flag is set to true and there are uncategorized changes the plan creation will
175                fail. Default: False
176        """

The main plan flow.

The console should present the user with choices on how to backfill and version the snapshots of a plan.

Arguments:
  • plan: The plan to make choices for.
  • auto_apply: Whether to automatically apply the plan after all choices have been made.
  • no_diff: Hide text differences for changed models.
  • no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that if this flag is set to true and there are uncategorized changes the plan creation will fail. Default: False
@abc.abstractmethod
def log_test_results( self, result: unittest.result.TestResult, output: str, target_dialect: str) -> None:
178    @abc.abstractmethod
179    def log_test_results(
180        self, result: unittest.result.TestResult, output: str, target_dialect: str
181    ) -> None:
182        """Display the test result and output.
183
184        Args:
185            result: The unittest test result that contains metrics like num success, fails, ect.
186            output: The generated output from the unittest.
187            target_dialect: The dialect that tests were run against. Assumes all tests run against the same dialect.
188        """

Display the test result and output.

Arguments:
  • result: The unittest test result that contains metrics like num success, fails, ect.
  • output: The generated output from the unittest.
  • target_dialect: The dialect that tests were run against. Assumes all tests run against the same dialect.
@abc.abstractmethod
def show_sql(self, sql: str) -> None:
190    @abc.abstractmethod
191    def show_sql(self, sql: str) -> None:
192        """Display to the user SQL."""

Display to the user SQL.

@abc.abstractmethod
def log_status_update(self, message: str) -> None:
194    @abc.abstractmethod
195    def log_status_update(self, message: str) -> None:
196        """Display general status update to the user."""

Display general status update to the user.

@abc.abstractmethod
def log_error(self, message: str) -> None:
198    @abc.abstractmethod
199    def log_error(self, message: str) -> None:
200        """Display error info to the user."""

Display error info to the user.

@abc.abstractmethod
def log_success(self, message: str) -> None:
202    @abc.abstractmethod
203    def log_success(self, message: str) -> None:
204        """Display a general successful message to the user."""

Display a general successful message to the user.

@abc.abstractmethod
def loading_start(self, message: Union[str, NoneType] = None) -> uuid.UUID:
206    @abc.abstractmethod
207    def loading_start(self, message: t.Optional[str] = None) -> uuid.UUID:
208        """Starts loading and returns a unique ID that can be used to stop the loading. Optionally can display a message."""

Starts loading and returns a unique ID that can be used to stop the loading. Optionally can display a message.

@abc.abstractmethod
def loading_stop(self, id: uuid.UUID) -> None:
210    @abc.abstractmethod
211    def loading_stop(self, id: uuid.UUID) -> None:
212        """Stop loading for the given id."""

Stop loading for the given id.

@abc.abstractmethod
def show_schema_diff(self, schema_diff: sqlmesh.core.table_diff.SchemaDiff) -> None:
214    @abc.abstractmethod
215    def show_schema_diff(self, schema_diff: SchemaDiff) -> None:
216        """Show table schema diff."""

Show table schema diff.

@abc.abstractmethod
def show_row_diff( self, row_diff: sqlmesh.core.table_diff.RowDiff, show_sample: bool = True) -> None:
218    @abc.abstractmethod
219    def show_row_diff(self, row_diff: RowDiff, show_sample: bool = True) -> None:
220        """Show table summary diff."""

Show table summary diff.

class TerminalConsole(Console):
 223class TerminalConsole(Console):
 224    """A rich based implementation of the console."""
 225
 226    def __init__(
 227        self, console: t.Optional[RichConsole] = None, verbose: bool = False, **kwargs: t.Any
 228    ) -> None:
 229        self.console: RichConsole = console or srich.console
 230
 231        self.evaluation_progress_live: t.Optional[Live] = None
 232        self.evaluation_total_progress: t.Optional[Progress] = None
 233        self.evaluation_total_task: t.Optional[TaskID] = None
 234        self.evaluation_model_progress: t.Optional[Progress] = None
 235        self.evaluation_model_tasks: t.Dict[str, TaskID] = {}
 236        self.evaluation_model_batches: t.Dict[Snapshot, int] = {}
 237
 238        # Put in temporary values that are replaced when evaluating
 239        self.environment_naming_info = EnvironmentNamingInfo()
 240        self.default_catalog: t.Optional[str] = None
 241
 242        self.creation_progress: t.Optional[Progress] = None
 243        self.creation_task: t.Optional[TaskID] = None
 244
 245        self.promotion_progress: t.Optional[Progress] = None
 246        self.promotion_task: t.Optional[TaskID] = None
 247
 248        self.migration_progress: t.Optional[Progress] = None
 249        self.migration_task: t.Optional[TaskID] = None
 250
 251        self.loading_status: t.Dict[uuid.UUID, Status] = {}
 252
 253        self.verbose = verbose
 254
 255    def _print(self, value: t.Any, **kwargs: t.Any) -> None:
 256        self.console.print(value, **kwargs)
 257
 258    def _prompt(self, message: str, **kwargs: t.Any) -> t.Any:
 259        return Prompt.ask(message, console=self.console, **kwargs)
 260
 261    def _confirm(self, message: str, **kwargs: t.Any) -> bool:
 262        return Confirm.ask(message, console=self.console, **kwargs)
 263
 264    def start_plan_evaluation(self, plan: Plan) -> None:
 265        pass
 266
 267    def stop_plan_evaluation(self) -> None:
 268        pass
 269
 270    def start_evaluation_progress(
 271        self,
 272        batches: t.Dict[Snapshot, int],
 273        environment_naming_info: EnvironmentNamingInfo,
 274        default_catalog: t.Optional[str],
 275    ) -> None:
 276        """Indicates that a new snapshot evaluation progress has begun."""
 277        if not self.evaluation_progress_live:
 278            self.evaluation_total_progress = Progress(
 279                TextColumn("[bold blue]Evaluating models", justify="right"),
 280                BarColumn(bar_width=40),
 281                "[progress.percentage]{task.percentage:>3.1f}%",
 282                "•",
 283                srich.BatchColumn(),
 284                "•",
 285                TimeElapsedColumn(),
 286                console=self.console,
 287            )
 288
 289            self.evaluation_model_progress = Progress(
 290                TextColumn("{task.fields[view_name]}", justify="right"),
 291                SpinnerColumn(spinner_name="simpleDots"),
 292                console=self.console,
 293            )
 294
 295            progress_table = Table.grid()
 296            progress_table.add_row(self.evaluation_total_progress)
 297            progress_table.add_row(self.evaluation_model_progress)
 298
 299            self.evaluation_progress_live = Live(progress_table, refresh_per_second=10)
 300            self.evaluation_progress_live.start()
 301
 302            self.evaluation_total_task = self.evaluation_total_progress.add_task(
 303                "Evaluating models...", total=sum(batches.values())
 304            )
 305
 306            self.evaluation_model_batches = batches
 307            self.environment_naming_info = environment_naming_info
 308            self.default_catalog = default_catalog
 309
 310    def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
 311        if self.evaluation_model_progress and snapshot.name not in self.evaluation_model_tasks:
 312            display_name = snapshot.display_name(self.environment_naming_info, self.default_catalog)
 313            self.evaluation_model_tasks[snapshot.name] = self.evaluation_model_progress.add_task(
 314                f"Evaluating {display_name}...",
 315                view_name=display_name,
 316                total=self.evaluation_model_batches[snapshot],
 317            )
 318
 319    def update_snapshot_evaluation_progress(
 320        self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int]
 321    ) -> None:
 322        """Update the snapshot evaluation progress."""
 323        if (
 324            self.evaluation_total_progress
 325            and self.evaluation_model_progress
 326            and self.evaluation_progress_live
 327        ):
 328            total_batches = self.evaluation_model_batches[snapshot]
 329
 330            if duration_ms:
 331                self.evaluation_progress_live.console.print(
 332                    f"[{batch_idx + 1}/{total_batches}] {snapshot.display_name(self.environment_naming_info, self.default_catalog)} [green]evaluated[/green] in {(duration_ms / 1000.0):.2f}s"
 333                )
 334
 335            self.evaluation_total_progress.update(
 336                self.evaluation_total_task or TaskID(0), refresh=True, advance=1
 337            )
 338
 339            model_task_id = self.evaluation_model_tasks[snapshot.name]
 340            self.evaluation_model_progress.update(model_task_id, refresh=True, advance=1)
 341            if self.evaluation_model_progress._tasks[model_task_id].completed >= total_batches:
 342                self.evaluation_model_progress.remove_task(model_task_id)
 343
 344    def stop_evaluation_progress(self, success: bool = True) -> None:
 345        """Stop the snapshot evaluation progress."""
 346        if self.evaluation_progress_live:
 347            self.evaluation_progress_live.stop()
 348            if success:
 349                self.log_success("All model batches have been executed successfully")
 350
 351        self.evaluation_progress_live = None
 352        self.evaluation_total_progress = None
 353        self.evaluation_total_task = None
 354        self.evaluation_model_progress = None
 355        self.evaluation_model_tasks = {}
 356        self.evaluation_model_batches = {}
 357        self.environment_naming_info = EnvironmentNamingInfo()
 358        self.default_catalog = None
 359
 360    def start_creation_progress(
 361        self,
 362        total_tasks: int,
 363        environment_naming_info: EnvironmentNamingInfo,
 364        default_catalog: t.Optional[str],
 365    ) -> None:
 366        """Indicates that a new creation progress has begun."""
 367        if self.creation_progress is None:
 368            self.creation_progress = Progress(
 369                TextColumn("[bold blue]Creating physical tables", justify="right"),
 370                BarColumn(bar_width=40),
 371                "[progress.percentage]{task.percentage:>3.1f}%",
 372                "•",
 373                srich.BatchColumn(),
 374                "•",
 375                TimeElapsedColumn(),
 376                console=self.console,
 377            )
 378
 379            self.creation_progress.start()
 380            self.creation_task = self.creation_progress.add_task(
 381                "Creating physical tables...",
 382                total=total_tasks,
 383            )
 384
 385            self.environment_naming_info = environment_naming_info
 386            self.default_catalog = default_catalog
 387
 388    def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None:
 389        """Update the snapshot creation progress."""
 390        if self.creation_progress is not None and self.creation_task is not None:
 391            if self.verbose:
 392                self.creation_progress.live.console.print(
 393                    f"{snapshot.display_name(self.environment_naming_info, self.default_catalog)} [green]created[/green]"
 394                )
 395            self.creation_progress.update(self.creation_task, refresh=True, advance=1)
 396
 397    def stop_creation_progress(self, success: bool = True) -> None:
 398        """Stop the snapshot creation progress."""
 399        self.creation_task = None
 400        if self.creation_progress is not None:
 401            self.creation_progress.stop()
 402            self.creation_progress = None
 403            if success:
 404                self.log_success("All model versions have been created successfully")
 405
 406        self.environment_naming_info = EnvironmentNamingInfo()
 407        self.default_catalog = None
 408
 409    def update_cleanup_progress(self, object_name: str) -> None:
 410        """Update the snapshot cleanup progress."""
 411        self._print(f"Deleted object {object_name}")
 412
 413    def start_promotion_progress(
 414        self,
 415        total_tasks: int,
 416        environment_naming_info: EnvironmentNamingInfo,
 417        default_catalog: t.Optional[str],
 418    ) -> None:
 419        """Indicates that a new snapshot promotion progress has begun."""
 420        if self.promotion_progress is None:
 421            self.promotion_progress = Progress(
 422                TextColumn(
 423                    f"[bold blue]Virtually Updating '{environment_naming_info.name}'",
 424                    justify="right",
 425                ),
 426                BarColumn(bar_width=40),
 427                "[progress.percentage]{task.percentage:>3.1f}%",
 428                "•",
 429                TimeElapsedColumn(),
 430                console=self.console,
 431            )
 432
 433            self.promotion_progress.start()
 434            self.promotion_task = self.promotion_progress.add_task(
 435                f"Virtually Updating {environment_naming_info.name}...",
 436                total=total_tasks,
 437            )
 438
 439            self.environment_naming_info = environment_naming_info
 440            self.default_catalog = default_catalog
 441
 442    def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
 443        """Update the snapshot promotion progress."""
 444        if self.promotion_progress is not None and self.promotion_task is not None:
 445            if self.verbose:
 446                action_str = "[green]promoted[/green]" if promoted else "[yellow]demoted[/yellow]"
 447                self.promotion_progress.live.console.print(
 448                    f"{snapshot.display_name(self.environment_naming_info, self.default_catalog)} {action_str}"
 449                )
 450            self.promotion_progress.update(self.promotion_task, refresh=True, advance=1)
 451
 452    def stop_promotion_progress(self, success: bool = True) -> None:
 453        """Stop the snapshot promotion progress."""
 454        self.promotion_task = None
 455        if self.promotion_progress is not None:
 456            self.promotion_progress.stop()
 457            self.promotion_progress = None
 458            if success:
 459                self.log_success("The target environment has been updated successfully")
 460
 461        self.environment_naming_info = EnvironmentNamingInfo()
 462        self.default_catalog = None
 463
 464    def start_migration_progress(self, total_tasks: int) -> None:
 465        """Indicates that a new migration progress has begun."""
 466        if self.migration_progress is None:
 467            self.migration_progress = Progress(
 468                TextColumn("[bold blue]Migrating snapshots", justify="right"),
 469                BarColumn(bar_width=40),
 470                "[progress.percentage]{task.percentage:>3.1f}%",
 471                "•",
 472                srich.BatchColumn(),
 473                "•",
 474                TimeElapsedColumn(),
 475                console=self.console,
 476            )
 477
 478            self.migration_progress.start()
 479            self.migration_task = self.migration_progress.add_task(
 480                "Migrating snapshots...",
 481                total=total_tasks,
 482            )
 483
 484    def update_migration_progress(self, num_tasks: int) -> None:
 485        """Update the migration progress."""
 486        if self.migration_progress is not None and self.migration_task is not None:
 487            self.migration_progress.update(self.migration_task, refresh=True, advance=num_tasks)
 488
 489    def stop_migration_progress(self, success: bool = True) -> None:
 490        """Stop the migration progress."""
 491        self.migration_task = None
 492        if self.migration_progress is not None:
 493            self.migration_progress.stop()
 494            self.migration_progress = None
 495            if success:
 496                self.log_success("The migration has been completed successfully")
 497
 498    def show_model_difference_summary(
 499        self,
 500        context_diff: ContextDiff,
 501        environment_naming_info: EnvironmentNamingInfo,
 502        default_catalog: t.Optional[str],
 503        no_diff: bool = True,
 504        ignored_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None,
 505    ) -> None:
 506        """Shows a summary of the differences.
 507
 508        Args:
 509            context_diff: The context diff to use to print the summary
 510            environment_naming_info: The environment naming info to reference when printing model names
 511            default_catalog: The default catalog to reference when deciding to remove catalog from display names
 512            no_diff: Hide the actual SQL differences.
 513            ignored_snapshot_ids: A set of snapshot ids that are ignored
 514        """
 515        ignored_snapshot_ids = ignored_snapshot_ids or set()
 516        if context_diff.is_new_environment:
 517            self._print(
 518                Tree(
 519                    f"[bold]New environment `{context_diff.environment}` will be created from `{context_diff.create_from}`"
 520                )
 521            )
 522            if not context_diff.has_snapshot_changes:
 523                return
 524
 525        if not context_diff.has_changes:
 526            self._print(Tree(f"[bold]No differences when compared to `{context_diff.environment}`"))
 527            return
 528
 529        self._print(Tree(f"[bold]Summary of differences against `{context_diff.environment}`:"))
 530        self._show_summary_tree_for(
 531            context_diff,
 532            "Models",
 533            lambda x: x.is_model,
 534            environment_naming_info,
 535            default_catalog,
 536            no_diff=no_diff,
 537            ignored_snapshot_ids=ignored_snapshot_ids,
 538        )
 539        self._show_summary_tree_for(
 540            context_diff,
 541            "Standalone Audits",
 542            lambda x: x.is_audit,
 543            environment_naming_info,
 544            default_catalog,
 545            no_diff=no_diff,
 546            ignored_snapshot_ids=ignored_snapshot_ids,
 547        )
 548
 549    def plan(
 550        self,
 551        plan_builder: PlanBuilder,
 552        auto_apply: bool,
 553        default_catalog: t.Optional[str],
 554        no_diff: bool = False,
 555        no_prompts: bool = False,
 556    ) -> None:
 557        """The main plan flow.
 558
 559        The console should present the user with choices on how to backfill and version the snapshots
 560        of a plan.
 561
 562        Args:
 563            plan: The plan to make choices for.
 564            auto_apply: Whether to automatically apply the plan after all choices have been made.
 565            default_catalog: The default catalog to reference when deciding to remove catalog from display names
 566            no_diff: Hide text differences for changed models.
 567            no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that
 568                if this flag is set to true and there are uncategorized changes the plan creation will
 569                fail. Default: False
 570        """
 571        self._prompt_categorize(
 572            plan_builder,
 573            auto_apply,
 574            no_diff=no_diff,
 575            no_prompts=no_prompts,
 576            default_catalog=default_catalog,
 577        )
 578
 579        if not no_prompts:
 580            self._show_options_after_categorization(
 581                plan_builder, auto_apply, default_catalog=default_catalog
 582            )
 583
 584        if auto_apply:
 585            plan_builder.apply()
 586
 587    def _get_ignored_tree(
 588        self,
 589        ignored_snapshot_ids: t.Set[SnapshotId],
 590        snapshots: t.Dict[SnapshotId, Snapshot],
 591        environment_naming_info: EnvironmentNamingInfo,
 592        default_catalog: t.Optional[str],
 593    ) -> Tree:
 594        ignored = Tree("[bold][ignored]Ignored Models (Expected Plan Start):")
 595        for s_id in ignored_snapshot_ids:
 596            snapshot = snapshots[s_id]
 597            ignored.add(
 598                f"[ignored]{snapshot.display_name(environment_naming_info, default_catalog)} ({snapshot.get_latest(start_date(snapshot, snapshots.values()))})"
 599            )
 600        return ignored
 601
 602    def _show_summary_tree_for(
 603        self,
 604        context_diff: ContextDiff,
 605        header: str,
 606        snapshot_selector: t.Callable[[SnapshotInfoLike], bool],
 607        environment_naming_info: EnvironmentNamingInfo,
 608        default_catalog: t.Optional[str],
 609        no_diff: bool = True,
 610        ignored_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None,
 611    ) -> None:
 612        ignored_snapshot_ids = ignored_snapshot_ids or set()
 613        selected_snapshots = {
 614            s_id: snapshot
 615            for s_id, snapshot in context_diff.snapshots.items()
 616            if snapshot_selector(snapshot)
 617        }
 618        selected_ignored_snapshot_ids = {
 619            s_id for s_id in selected_snapshots if s_id in ignored_snapshot_ids
 620        }
 621        added_snapshot_ids = {
 622            s_id for s_id in context_diff.added if snapshot_selector(context_diff.snapshots[s_id])
 623        } - selected_ignored_snapshot_ids
 624        removed_snapshot_ids = {
 625            s_id
 626            for s_id, snapshot in context_diff.removed_snapshots.items()
 627            if snapshot_selector(snapshot)
 628        } - selected_ignored_snapshot_ids
 629        modified_snapshot_ids = {
 630            current_snapshot.snapshot_id
 631            for _, (current_snapshot, _) in context_diff.modified_snapshots.items()
 632            if snapshot_selector(current_snapshot)
 633        } - selected_ignored_snapshot_ids
 634
 635        tree_sets = (
 636            added_snapshot_ids,
 637            removed_snapshot_ids,
 638            modified_snapshot_ids,
 639            selected_ignored_snapshot_ids,
 640        )
 641        if all(not s_ids for s_ids in tree_sets):
 642            return
 643
 644        tree = Tree(f"[bold]{header}:")
 645        if added_snapshot_ids:
 646            added_tree = Tree("[bold][added]Added:")
 647            for s_id in added_snapshot_ids:
 648                snapshot = context_diff.snapshots[s_id]
 649                added_tree.add(
 650                    f"[added]{snapshot.display_name(environment_naming_info, default_catalog)}"
 651                )
 652            tree.add(added_tree)
 653        if removed_snapshot_ids:
 654            removed_tree = Tree("[bold][removed]Removed:")
 655            for s_id in removed_snapshot_ids:
 656                snapshot_table_info = context_diff.removed_snapshots[s_id]
 657                removed_tree.add(
 658                    f"[removed]{snapshot_table_info.display_name(environment_naming_info, default_catalog)}"
 659                )
 660            tree.add(removed_tree)
 661        if modified_snapshot_ids:
 662            direct = Tree("[bold][direct]Directly Modified:")
 663            indirect = Tree("[bold][indirect]Indirectly Modified:")
 664            metadata = Tree("[bold][metadata]Metadata Updated:")
 665            for s_id in modified_snapshot_ids:
 666                name = s_id.name
 667                display_name = context_diff.snapshots[s_id].display_name(
 668                    environment_naming_info, default_catalog
 669                )
 670                if context_diff.directly_modified(name):
 671                    direct.add(
 672                        f"[direct]{display_name}"
 673                        if no_diff
 674                        else Syntax(f"{display_name}\n{context_diff.text_diff(name)}", "sql")
 675                    )
 676                elif context_diff.indirectly_modified(name):
 677                    indirect.add(f"[indirect]{display_name}")
 678                elif context_diff.metadata_updated(name):
 679                    metadata.add(
 680                        f"[metadata]{display_name}"
 681                        if no_diff
 682                        else Syntax(f"{display_name}", "sql", word_wrap=True)
 683                    )
 684            if direct.children:
 685                tree.add(direct)
 686            if indirect.children:
 687                tree.add(indirect)
 688            if metadata.children:
 689                tree.add(metadata)
 690        if selected_ignored_snapshot_ids:
 691            tree.add(
 692                self._get_ignored_tree(
 693                    selected_ignored_snapshot_ids,
 694                    selected_snapshots,
 695                    environment_naming_info,
 696                    default_catalog,
 697                )
 698            )
 699        self._print(tree)
 700
 701    def _show_options_after_categorization(
 702        self, plan_builder: PlanBuilder, auto_apply: bool, default_catalog: t.Optional[str]
 703    ) -> None:
 704        plan = plan_builder.build()
 705        if plan.forward_only and plan.new_snapshots:
 706            self._prompt_effective_from(plan_builder, auto_apply, default_catalog)
 707
 708        if plan.requires_backfill:
 709            self._show_missing_dates(plan_builder.build(), default_catalog)
 710            self._prompt_backfill(plan_builder, auto_apply, default_catalog)
 711        elif plan.has_changes and not auto_apply:
 712            self._prompt_promote(plan_builder)
 713        elif plan.has_unmodified_unpromoted and not auto_apply:
 714            self.log_status_update("\n[bold]Virtually updating unmodified models\n")
 715            self._prompt_promote(plan_builder)
 716
 717    def _prompt_categorize(
 718        self,
 719        plan_builder: PlanBuilder,
 720        auto_apply: bool,
 721        no_diff: bool,
 722        no_prompts: bool,
 723        default_catalog: t.Optional[str],
 724    ) -> None:
 725        """Get the user's change category for the directly modified models."""
 726        plan = plan_builder.build()
 727
 728        self.show_model_difference_summary(
 729            plan.context_diff,
 730            plan.environment_naming_info,
 731            default_catalog=default_catalog,
 732            ignored_snapshot_ids=plan.ignored,
 733        )
 734
 735        if not no_diff:
 736            self._show_categorized_snapshots(plan, default_catalog)
 737
 738        for snapshot in plan.uncategorized:
 739            if not no_diff:
 740                self.show_sql(plan.context_diff.text_diff(snapshot.name))
 741            tree = Tree(
 742                f"[bold][direct]Directly Modified: {snapshot.display_name(plan.environment_naming_info, default_catalog)}"
 743            )
 744            indirect_tree = None
 745
 746            for child_sid in sorted(plan.indirectly_modified.get(snapshot.snapshot_id, set())):
 747                child_snapshot = plan.context_diff.snapshots[child_sid]
 748                if not indirect_tree:
 749                    indirect_tree = Tree("[indirect]Indirectly Modified Children:")
 750                    tree.add(indirect_tree)
 751                indirect_tree.add(
 752                    f"[indirect]{child_snapshot.display_name(plan.environment_naming_info, default_catalog)}"
 753                )
 754            self._print(tree)
 755            if not no_prompts:
 756                self._get_snapshot_change_category(
 757                    snapshot, plan_builder, auto_apply, default_catalog
 758                )
 759
 760    def _show_categorized_snapshots(self, plan: Plan, default_catalog: t.Optional[str]) -> None:
 761        context_diff = plan.context_diff
 762
 763        for snapshot in plan.categorized:
 764            if not context_diff.directly_modified(snapshot.name):
 765                continue
 766
 767            category_str = SNAPSHOT_CHANGE_CATEGORY_STR[snapshot.change_category]
 768            tree = Tree(
 769                f"[bold][direct]Directly Modified: {snapshot.display_name(plan.environment_naming_info, default_catalog)} ({category_str})"
 770            )
 771            indirect_tree = None
 772            for child_sid in sorted(plan.indirectly_modified.get(snapshot.snapshot_id, set())):
 773                child_snapshot = context_diff.snapshots[child_sid]
 774                if not indirect_tree:
 775                    indirect_tree = Tree("[indirect]Indirectly Modified Children:")
 776                    tree.add(indirect_tree)
 777                child_category_str = SNAPSHOT_CHANGE_CATEGORY_STR[child_snapshot.change_category]
 778                indirect_tree.add(
 779                    f"[indirect]{child_snapshot.display_name(plan.environment_naming_info, default_catalog)} ({child_category_str})"
 780                )
 781            self._print(Syntax(context_diff.text_diff(snapshot.name), "sql", word_wrap=True))
 782            self._print(tree)
 783
 784    def _show_missing_dates(self, plan: Plan, default_catalog: t.Optional[str]) -> None:
 785        """Displays the models with missing dates."""
 786        missing_intervals = plan.missing_intervals
 787        if not missing_intervals:
 788            return
 789        backfill = Tree("[bold]Models needing backfill (missing dates):")
 790        for missing in missing_intervals:
 791            snapshot = plan.context_diff.snapshots[missing.snapshot_id]
 792            if not snapshot.is_model:
 793                continue
 794
 795            preview_modifier = ""
 796            if not plan.deployability_index.is_deployable(snapshot):
 797                preview_modifier = " ([orange1]preview[/orange1])"
 798
 799            backfill.add(
 800                f"{snapshot.display_name(plan.environment_naming_info, default_catalog)}: {missing.format_intervals(snapshot.node.interval_unit)}{preview_modifier}"
 801            )
 802        self._print(backfill)
 803
 804    def _prompt_effective_from(
 805        self, plan_builder: PlanBuilder, auto_apply: bool, default_catalog: t.Optional[str]
 806    ) -> None:
 807        if not plan_builder.build().effective_from:
 808            effective_from = self._prompt(
 809                "Enter the effective date (eg. '1 year', '2020-01-01') to apply forward-only changes retroactively or blank to only apply them going forward once changes are deployed to prod"
 810            )
 811            if effective_from:
 812                plan_builder.set_effective_from(effective_from)
 813
 814    def _prompt_backfill(
 815        self, plan_builder: PlanBuilder, auto_apply: bool, default_catalog: t.Optional[str]
 816    ) -> None:
 817        plan = plan_builder.build()
 818        is_forward_only_dev = plan.is_dev and plan.forward_only
 819        backfill_or_preview = "preview" if is_forward_only_dev else "backfill"
 820
 821        if plan_builder.is_start_and_end_allowed:
 822            if not plan_builder.override_start:
 823                if is_forward_only_dev:
 824                    if plan.effective_from:
 825                        blank_meaning = f"to preview starting from the effective date ('{time_like_to_str(plan.effective_from)}')"
 826                        default_start = plan.effective_from
 827                    else:
 828                        blank_meaning = "to preview starting from yesterday"
 829                        default_start = yesterday_ds()
 830                else:
 831                    if plan.provided_start:
 832                        blank_meaning = f"starting from '{time_like_to_str(plan.provided_start)}'"
 833                    else:
 834                        blank_meaning = "from the beginning of history"
 835                    default_start = None
 836
 837                start = self._prompt(
 838                    f"Enter the {backfill_or_preview} start date (eg. '1 year', '2020-01-01') or blank to backfill {blank_meaning}",
 839                )
 840                if start:
 841                    plan_builder.set_start(start)
 842                elif default_start:
 843                    plan_builder.set_start(default_start)
 844
 845            if not plan_builder.override_end:
 846                if plan.provided_end:
 847                    blank_meaning = f"'{time_like_to_str(plan.provided_end)}'"
 848                else:
 849                    blank_meaning = "now"
 850                end = self._prompt(
 851                    f"Enter the {backfill_or_preview} end date (eg. '1 month ago', '2020-01-01') or blank to {backfill_or_preview} up until {blank_meaning}",
 852                )
 853                if end:
 854                    plan_builder.set_end(end)
 855
 856            plan = plan_builder.build()
 857
 858        if plan.ignored:
 859            self._print(
 860                self._get_ignored_tree(
 861                    plan.ignored,
 862                    plan.context_diff.snapshots,
 863                    plan.environment_naming_info,
 864                    default_catalog,
 865                )
 866            )
 867        if not auto_apply and self._confirm(f"Apply - {backfill_or_preview.capitalize()} Tables"):
 868            plan_builder.apply()
 869
 870    def _prompt_promote(self, plan_builder: PlanBuilder) -> None:
 871        if self._confirm(
 872            "Apply - Virtual Update",
 873        ):
 874            plan_builder.apply()
 875
 876    def log_test_results(
 877        self, result: unittest.result.TestResult, output: str, target_dialect: str
 878    ) -> None:
 879        divider_length = 70
 880        if result.wasSuccessful():
 881            self._print("=" * divider_length)
 882            self._print(
 883                f"Successfully Ran {str(result.testsRun)} tests against {target_dialect}",
 884                style="green",
 885            )
 886            self._print("-" * divider_length)
 887        else:
 888            self._print("-" * divider_length)
 889            self._print("Test Failure Summary")
 890            self._print("=" * divider_length)
 891            self._print(
 892                f"Num Successful Tests: {result.testsRun - len(result.failures) - len(result.errors)}"
 893            )
 894            for test, _ in result.failures + result.errors:
 895                if isinstance(test, ModelTest):
 896                    self._print(f"Failure Test: {test.model.name} {test.test_name}")
 897            self._print("=" * divider_length)
 898            self._print(output)
 899
 900    def show_sql(self, sql: str) -> None:
 901        self._print(Syntax(sql, "sql", word_wrap=True), crop=False)
 902
 903    def log_status_update(self, message: str) -> None:
 904        self._print(message)
 905
 906    def log_error(self, message: str) -> None:
 907        self._print(f"[red]{message}[/red]")
 908
 909    def log_success(self, message: str) -> None:
 910        self._print(f"\n[green]{message}[/green]\n")
 911
 912    def loading_start(self, message: t.Optional[str] = None) -> uuid.UUID:
 913        id = uuid.uuid4()
 914        self.loading_status[id] = Status(message or "", console=self.console, spinner="line")
 915        self.loading_status[id].start()
 916        return id
 917
 918    def loading_stop(self, id: uuid.UUID) -> None:
 919        self.loading_status[id].stop()
 920        del self.loading_status[id]
 921
 922    def show_schema_diff(self, schema_diff: SchemaDiff) -> None:
 923        source_name = schema_diff.source
 924        if schema_diff.source_alias:
 925            source_name = schema_diff.source_alias.upper()
 926        target_name = schema_diff.target
 927        if schema_diff.target_alias:
 928            target_name = schema_diff.target_alias.upper()
 929
 930        first_line = f"\n[b]Schema Diff Between '[yellow]{source_name}[/yellow]' and '[green]{target_name}[/green]'"
 931        if schema_diff.model_name:
 932            first_line = (
 933                first_line + f" environments for model '[blue]{schema_diff.model_name}[/blue]'"
 934            )
 935
 936        tree = Tree(first_line + ":")
 937
 938        if any([schema_diff.added, schema_diff.removed, schema_diff.modified]):
 939            if schema_diff.added:
 940                added = Tree("[green]Added Columns:")
 941                for c, t in schema_diff.added:
 942                    added.add(f"[green]{c} ({t})")
 943                tree.add(added)
 944
 945            if schema_diff.removed:
 946                removed = Tree("[red]Removed Columns:")
 947                for c, t in schema_diff.removed:
 948                    removed.add(f"[red]{c} ({t})")
 949                tree.add(removed)
 950
 951            if schema_diff.modified:
 952                modified = Tree("[magenta]Modified Columns:")
 953                for c, (ft, tt) in schema_diff.modified.items():
 954                    modified.add(f"[magenta]{c} ({ft} -> {tt})")
 955                tree.add(modified)
 956        else:
 957            tree.add("[b]Schemas match")
 958
 959        self.console.print(tree)
 960
 961    def show_row_diff(self, row_diff: RowDiff, show_sample: bool = True) -> None:
 962        source_name = row_diff.source
 963        if row_diff.source_alias:
 964            source_name = row_diff.source_alias.upper()
 965        target_name = row_diff.target
 966        if row_diff.target_alias:
 967            target_name = row_diff.target_alias.upper()
 968
 969        tree = Tree("[b]Row Counts:[/b]")
 970        tree.add(f" [b][blue]COMMON[/blue]:[/b] {row_diff.join_count} rows")
 971        tree.add(f" [b][yellow]{source_name} ONLY[/yellow]:[/b] {row_diff.s_only_count} rows")
 972        tree.add(f" [b][green]{target_name} ONLY[/green]:[/b] {row_diff.t_only_count} rows")
 973        self.console.print("\n", tree)
 974
 975        self.console.print("\n[b][blue]COMMON ROWS[/blue] column comparison stats:[/b]")
 976        if row_diff.column_stats.shape[0] > 0:
 977            self.console.print(row_diff.column_stats.to_string(index=True), end="\n\n")
 978        else:
 979            self.console.print("  No columns with same name and data type in both tables")
 980
 981        if show_sample:
 982            self.console.print("\n[b][blue]COMMON ROWS[/blue] sample data differences:[/b]")
 983            if row_diff.joined_sample.shape[0] > 0:
 984                self.console.print(row_diff.joined_sample.to_string(index=False), end="\n\n")
 985            else:
 986                self.console.print("  All joined rows match")
 987
 988            if row_diff.s_sample.shape[0] > 0:
 989                self.console.print(f"\n[b][yellow]{source_name} ONLY[/yellow] sample rows:[/b]")
 990                self.console.print(row_diff.s_sample.to_string(index=False), end="\n\n")
 991
 992            if row_diff.t_sample.shape[0] > 0:
 993                self.console.print(f"\n[b][green]{target_name} ONLY[/green] sample rows:[/b]")
 994                self.console.print(row_diff.t_sample.to_string(index=False), end="\n\n")
 995
 996    def _get_snapshot_change_category(
 997        self,
 998        snapshot: Snapshot,
 999        plan_builder: PlanBuilder,
1000        auto_apply: bool,
1001        default_catalog: t.Optional[str],
1002    ) -> None:
1003        choices = self._snapshot_change_choices(
1004            snapshot, plan_builder.environment_naming_info, default_catalog
1005        )
1006        response = self._prompt(
1007            "\n".join([f"[{i+1}] {choice}" for i, choice in enumerate(choices.values())]),
1008            show_choices=False,
1009            choices=[f"{i+1}" for i in range(len(choices))],
1010        )
1011        choice = list(choices)[int(response) - 1]
1012        plan_builder.set_choice(snapshot, choice)
1013
1014    def _snapshot_change_choices(
1015        self,
1016        snapshot: Snapshot,
1017        environment_naming_info: EnvironmentNamingInfo,
1018        default_catalog: t.Optional[str],
1019        use_rich_formatting: bool = True,
1020    ) -> t.Dict[SnapshotChangeCategory, str]:
1021        direct = snapshot.display_name(environment_naming_info, default_catalog)
1022        if use_rich_formatting:
1023            direct = f"[direct]{direct}[/direct]"
1024        indirect = "indirectly modified children"
1025        if use_rich_formatting:
1026            indirect = f"[indirect]{indirect}[/indirect]"
1027        if snapshot.is_view:
1028            choices = {
1029                SnapshotChangeCategory.BREAKING: f"Update {direct} and backfill {indirect}",
1030                SnapshotChangeCategory.NON_BREAKING: f"Update {direct} but don't backfill {indirect}",
1031            }
1032        elif snapshot.is_symbolic:
1033            choices = {
1034                SnapshotChangeCategory.BREAKING: f"Backfill {indirect}",
1035                SnapshotChangeCategory.NON_BREAKING: f"Don't backfill {indirect}",
1036            }
1037        else:
1038            choices = {
1039                SnapshotChangeCategory.BREAKING: f"Backfill {direct} and {indirect}",
1040                SnapshotChangeCategory.NON_BREAKING: f"Backfill {direct} but not {indirect}",
1041            }
1042        labeled_choices = {
1043            k: f"[{SNAPSHOT_CHANGE_CATEGORY_STR[k]}] {v}" for k, v in choices.items()
1044        }
1045        return labeled_choices

A rich based implementation of the console.

TerminalConsole( console: Union[rich.console.Console, NoneType] = None, verbose: bool = False, **kwargs: Any)
226    def __init__(
227        self, console: t.Optional[RichConsole] = None, verbose: bool = False, **kwargs: t.Any
228    ) -> None:
229        self.console: RichConsole = console or srich.console
230
231        self.evaluation_progress_live: t.Optional[Live] = None
232        self.evaluation_total_progress: t.Optional[Progress] = None
233        self.evaluation_total_task: t.Optional[TaskID] = None
234        self.evaluation_model_progress: t.Optional[Progress] = None
235        self.evaluation_model_tasks: t.Dict[str, TaskID] = {}
236        self.evaluation_model_batches: t.Dict[Snapshot, int] = {}
237
238        # Put in temporary values that are replaced when evaluating
239        self.environment_naming_info = EnvironmentNamingInfo()
240        self.default_catalog: t.Optional[str] = None
241
242        self.creation_progress: t.Optional[Progress] = None
243        self.creation_task: t.Optional[TaskID] = None
244
245        self.promotion_progress: t.Optional[Progress] = None
246        self.promotion_task: t.Optional[TaskID] = None
247
248        self.migration_progress: t.Optional[Progress] = None
249        self.migration_task: t.Optional[TaskID] = None
250
251        self.loading_status: t.Dict[uuid.UUID, Status] = {}
252
253        self.verbose = verbose
def start_plan_evaluation(self, plan: sqlmesh.core.plan.definition.Plan) -> None:
264    def start_plan_evaluation(self, plan: Plan) -> None:
265        pass

Indicates that a new evaluation has begun.

def stop_plan_evaluation(self) -> None:
267    def stop_plan_evaluation(self) -> None:
268        pass

Indicates that the evaluation has ended.

def start_evaluation_progress( self, batches: Dict[sqlmesh.core.snapshot.definition.Snapshot, int], environment_naming_info: sqlmesh.core.environment.EnvironmentNamingInfo, default_catalog: Union[str, NoneType]) -> None:
270    def start_evaluation_progress(
271        self,
272        batches: t.Dict[Snapshot, int],
273        environment_naming_info: EnvironmentNamingInfo,
274        default_catalog: t.Optional[str],
275    ) -> None:
276        """Indicates that a new snapshot evaluation progress has begun."""
277        if not self.evaluation_progress_live:
278            self.evaluation_total_progress = Progress(
279                TextColumn("[bold blue]Evaluating models", justify="right"),
280                BarColumn(bar_width=40),
281                "[progress.percentage]{task.percentage:>3.1f}%",
282                "•",
283                srich.BatchColumn(),
284                "•",
285                TimeElapsedColumn(),
286                console=self.console,
287            )
288
289            self.evaluation_model_progress = Progress(
290                TextColumn("{task.fields[view_name]}", justify="right"),
291                SpinnerColumn(spinner_name="simpleDots"),
292                console=self.console,
293            )
294
295            progress_table = Table.grid()
296            progress_table.add_row(self.evaluation_total_progress)
297            progress_table.add_row(self.evaluation_model_progress)
298
299            self.evaluation_progress_live = Live(progress_table, refresh_per_second=10)
300            self.evaluation_progress_live.start()
301
302            self.evaluation_total_task = self.evaluation_total_progress.add_task(
303                "Evaluating models...", total=sum(batches.values())
304            )
305
306            self.evaluation_model_batches = batches
307            self.environment_naming_info = environment_naming_info
308            self.default_catalog = default_catalog

Indicates that a new snapshot evaluation progress has begun.

def start_snapshot_evaluation_progress(self, snapshot: sqlmesh.core.snapshot.definition.Snapshot) -> None:
310    def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
311        if self.evaluation_model_progress and snapshot.name not in self.evaluation_model_tasks:
312            display_name = snapshot.display_name(self.environment_naming_info, self.default_catalog)
313            self.evaluation_model_tasks[snapshot.name] = self.evaluation_model_progress.add_task(
314                f"Evaluating {display_name}...",
315                view_name=display_name,
316                total=self.evaluation_model_batches[snapshot],
317            )

Starts the snapshot evaluation progress.

def update_snapshot_evaluation_progress( self, snapshot: sqlmesh.core.snapshot.definition.Snapshot, batch_idx: int, duration_ms: Union[int, NoneType]) -> None:
319    def update_snapshot_evaluation_progress(
320        self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int]
321    ) -> None:
322        """Update the snapshot evaluation progress."""
323        if (
324            self.evaluation_total_progress
325            and self.evaluation_model_progress
326            and self.evaluation_progress_live
327        ):
328            total_batches = self.evaluation_model_batches[snapshot]
329
330            if duration_ms:
331                self.evaluation_progress_live.console.print(
332                    f"[{batch_idx + 1}/{total_batches}] {snapshot.display_name(self.environment_naming_info, self.default_catalog)} [green]evaluated[/green] in {(duration_ms / 1000.0):.2f}s"
333                )
334
335            self.evaluation_total_progress.update(
336                self.evaluation_total_task or TaskID(0), refresh=True, advance=1
337            )
338
339            model_task_id = self.evaluation_model_tasks[snapshot.name]
340            self.evaluation_model_progress.update(model_task_id, refresh=True, advance=1)
341            if self.evaluation_model_progress._tasks[model_task_id].completed >= total_batches:
342                self.evaluation_model_progress.remove_task(model_task_id)

Update the snapshot evaluation progress.

def stop_evaluation_progress(self, success: bool = True) -> None:
344    def stop_evaluation_progress(self, success: bool = True) -> None:
345        """Stop the snapshot evaluation progress."""
346        if self.evaluation_progress_live:
347            self.evaluation_progress_live.stop()
348            if success:
349                self.log_success("All model batches have been executed successfully")
350
351        self.evaluation_progress_live = None
352        self.evaluation_total_progress = None
353        self.evaluation_total_task = None
354        self.evaluation_model_progress = None
355        self.evaluation_model_tasks = {}
356        self.evaluation_model_batches = {}
357        self.environment_naming_info = EnvironmentNamingInfo()
358        self.default_catalog = None

Stop the snapshot evaluation progress.

def start_creation_progress( self, total_tasks: int, environment_naming_info: sqlmesh.core.environment.EnvironmentNamingInfo, default_catalog: Union[str, NoneType]) -> None:
360    def start_creation_progress(
361        self,
362        total_tasks: int,
363        environment_naming_info: EnvironmentNamingInfo,
364        default_catalog: t.Optional[str],
365    ) -> None:
366        """Indicates that a new creation progress has begun."""
367        if self.creation_progress is None:
368            self.creation_progress = Progress(
369                TextColumn("[bold blue]Creating physical tables", justify="right"),
370                BarColumn(bar_width=40),
371                "[progress.percentage]{task.percentage:>3.1f}%",
372                "•",
373                srich.BatchColumn(),
374                "•",
375                TimeElapsedColumn(),
376                console=self.console,
377            )
378
379            self.creation_progress.start()
380            self.creation_task = self.creation_progress.add_task(
381                "Creating physical tables...",
382                total=total_tasks,
383            )
384
385            self.environment_naming_info = environment_naming_info
386            self.default_catalog = default_catalog

Indicates that a new creation progress has begun.

def update_creation_progress( self, snapshot: Union[sqlmesh.core.snapshot.definition.SnapshotTableInfo, sqlmesh.core.snapshot.definition.Snapshot]) -> None:
388    def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None:
389        """Update the snapshot creation progress."""
390        if self.creation_progress is not None and self.creation_task is not None:
391            if self.verbose:
392                self.creation_progress.live.console.print(
393                    f"{snapshot.display_name(self.environment_naming_info, self.default_catalog)} [green]created[/green]"
394                )
395            self.creation_progress.update(self.creation_task, refresh=True, advance=1)

Update the snapshot creation progress.

def stop_creation_progress(self, success: bool = True) -> None:
397    def stop_creation_progress(self, success: bool = True) -> None:
398        """Stop the snapshot creation progress."""
399        self.creation_task = None
400        if self.creation_progress is not None:
401            self.creation_progress.stop()
402            self.creation_progress = None
403            if success:
404                self.log_success("All model versions have been created successfully")
405
406        self.environment_naming_info = EnvironmentNamingInfo()
407        self.default_catalog = None

Stop the snapshot creation progress.

def update_cleanup_progress(self, object_name: str) -> None:
409    def update_cleanup_progress(self, object_name: str) -> None:
410        """Update the snapshot cleanup progress."""
411        self._print(f"Deleted object {object_name}")

Update the snapshot cleanup progress.

def start_promotion_progress( self, total_tasks: int, environment_naming_info: sqlmesh.core.environment.EnvironmentNamingInfo, default_catalog: Union[str, NoneType]) -> None:
413    def start_promotion_progress(
414        self,
415        total_tasks: int,
416        environment_naming_info: EnvironmentNamingInfo,
417        default_catalog: t.Optional[str],
418    ) -> None:
419        """Indicates that a new snapshot promotion progress has begun."""
420        if self.promotion_progress is None:
421            self.promotion_progress = Progress(
422                TextColumn(
423                    f"[bold blue]Virtually Updating '{environment_naming_info.name}'",
424                    justify="right",
425                ),
426                BarColumn(bar_width=40),
427                "[progress.percentage]{task.percentage:>3.1f}%",
428                "•",
429                TimeElapsedColumn(),
430                console=self.console,
431            )
432
433            self.promotion_progress.start()
434            self.promotion_task = self.promotion_progress.add_task(
435                f"Virtually Updating {environment_naming_info.name}...",
436                total=total_tasks,
437            )
438
439            self.environment_naming_info = environment_naming_info
440            self.default_catalog = default_catalog

Indicates that a new snapshot promotion progress has begun.

def update_promotion_progress( self, snapshot: Union[sqlmesh.core.snapshot.definition.SnapshotTableInfo, sqlmesh.core.snapshot.definition.Snapshot], promoted: bool) -> None:
442    def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
443        """Update the snapshot promotion progress."""
444        if self.promotion_progress is not None and self.promotion_task is not None:
445            if self.verbose:
446                action_str = "[green]promoted[/green]" if promoted else "[yellow]demoted[/yellow]"
447                self.promotion_progress.live.console.print(
448                    f"{snapshot.display_name(self.environment_naming_info, self.default_catalog)} {action_str}"
449                )
450            self.promotion_progress.update(self.promotion_task, refresh=True, advance=1)

Update the snapshot promotion progress.

def stop_promotion_progress(self, success: bool = True) -> None:
452    def stop_promotion_progress(self, success: bool = True) -> None:
453        """Stop the snapshot promotion progress."""
454        self.promotion_task = None
455        if self.promotion_progress is not None:
456            self.promotion_progress.stop()
457            self.promotion_progress = None
458            if success:
459                self.log_success("The target environment has been updated successfully")
460
461        self.environment_naming_info = EnvironmentNamingInfo()
462        self.default_catalog = None

Stop the snapshot promotion progress.

def start_migration_progress(self, total_tasks: int) -> None:
464    def start_migration_progress(self, total_tasks: int) -> None:
465        """Indicates that a new migration progress has begun."""
466        if self.migration_progress is None:
467            self.migration_progress = Progress(
468                TextColumn("[bold blue]Migrating snapshots", justify="right"),
469                BarColumn(bar_width=40),
470                "[progress.percentage]{task.percentage:>3.1f}%",
471                "•",
472                srich.BatchColumn(),
473                "•",
474                TimeElapsedColumn(),
475                console=self.console,
476            )
477
478            self.migration_progress.start()
479            self.migration_task = self.migration_progress.add_task(
480                "Migrating snapshots...",
481                total=total_tasks,
482            )

Indicates that a new migration progress has begun.

def update_migration_progress(self, num_tasks: int) -> None:
484    def update_migration_progress(self, num_tasks: int) -> None:
485        """Update the migration progress."""
486        if self.migration_progress is not None and self.migration_task is not None:
487            self.migration_progress.update(self.migration_task, refresh=True, advance=num_tasks)

Update the migration progress.

def stop_migration_progress(self, success: bool = True) -> None:
489    def stop_migration_progress(self, success: bool = True) -> None:
490        """Stop the migration progress."""
491        self.migration_task = None
492        if self.migration_progress is not None:
493            self.migration_progress.stop()
494            self.migration_progress = None
495            if success:
496                self.log_success("The migration has been completed successfully")

Stop the migration progress.

def show_model_difference_summary( self, context_diff: sqlmesh.core.context_diff.ContextDiff, environment_naming_info: sqlmesh.core.environment.EnvironmentNamingInfo, default_catalog: Union[str, NoneType], no_diff: bool = True, ignored_snapshot_ids: Union[Set[sqlmesh.core.snapshot.definition.SnapshotId], NoneType] = None) -> None:
498    def show_model_difference_summary(
499        self,
500        context_diff: ContextDiff,
501        environment_naming_info: EnvironmentNamingInfo,
502        default_catalog: t.Optional[str],
503        no_diff: bool = True,
504        ignored_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None,
505    ) -> None:
506        """Shows a summary of the differences.
507
508        Args:
509            context_diff: The context diff to use to print the summary
510            environment_naming_info: The environment naming info to reference when printing model names
511            default_catalog: The default catalog to reference when deciding to remove catalog from display names
512            no_diff: Hide the actual SQL differences.
513            ignored_snapshot_ids: A set of snapshot ids that are ignored
514        """
515        ignored_snapshot_ids = ignored_snapshot_ids or set()
516        if context_diff.is_new_environment:
517            self._print(
518                Tree(
519                    f"[bold]New environment `{context_diff.environment}` will be created from `{context_diff.create_from}`"
520                )
521            )
522            if not context_diff.has_snapshot_changes:
523                return
524
525        if not context_diff.has_changes:
526            self._print(Tree(f"[bold]No differences when compared to `{context_diff.environment}`"))
527            return
528
529        self._print(Tree(f"[bold]Summary of differences against `{context_diff.environment}`:"))
530        self._show_summary_tree_for(
531            context_diff,
532            "Models",
533            lambda x: x.is_model,
534            environment_naming_info,
535            default_catalog,
536            no_diff=no_diff,
537            ignored_snapshot_ids=ignored_snapshot_ids,
538        )
539        self._show_summary_tree_for(
540            context_diff,
541            "Standalone Audits",
542            lambda x: x.is_audit,
543            environment_naming_info,
544            default_catalog,
545            no_diff=no_diff,
546            ignored_snapshot_ids=ignored_snapshot_ids,
547        )

Shows a summary of the differences.

Arguments:
  • context_diff: The context diff to use to print the summary
  • environment_naming_info: The environment naming info to reference when printing model names
  • default_catalog: The default catalog to reference when deciding to remove catalog from display names
  • no_diff: Hide the actual SQL differences.
  • ignored_snapshot_ids: A set of snapshot ids that are ignored
def plan( self, plan_builder: sqlmesh.core.plan.builder.PlanBuilder, auto_apply: bool, default_catalog: Union[str, NoneType], no_diff: bool = False, no_prompts: bool = False) -> None:
549    def plan(
550        self,
551        plan_builder: PlanBuilder,
552        auto_apply: bool,
553        default_catalog: t.Optional[str],
554        no_diff: bool = False,
555        no_prompts: bool = False,
556    ) -> None:
557        """The main plan flow.
558
559        The console should present the user with choices on how to backfill and version the snapshots
560        of a plan.
561
562        Args:
563            plan: The plan to make choices for.
564            auto_apply: Whether to automatically apply the plan after all choices have been made.
565            default_catalog: The default catalog to reference when deciding to remove catalog from display names
566            no_diff: Hide text differences for changed models.
567            no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that
568                if this flag is set to true and there are uncategorized changes the plan creation will
569                fail. Default: False
570        """
571        self._prompt_categorize(
572            plan_builder,
573            auto_apply,
574            no_diff=no_diff,
575            no_prompts=no_prompts,
576            default_catalog=default_catalog,
577        )
578
579        if not no_prompts:
580            self._show_options_after_categorization(
581                plan_builder, auto_apply, default_catalog=default_catalog
582            )
583
584        if auto_apply:
585            plan_builder.apply()

The main plan flow.

The console should present the user with choices on how to backfill and version the snapshots of a plan.

Arguments:
  • plan: The plan to make choices for.
  • auto_apply: Whether to automatically apply the plan after all choices have been made.
  • default_catalog: The default catalog to reference when deciding to remove catalog from display names
  • no_diff: Hide text differences for changed models.
  • no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that if this flag is set to true and there are uncategorized changes the plan creation will fail. Default: False
def log_test_results( self, result: unittest.result.TestResult, output: str, target_dialect: str) -> None:
876    def log_test_results(
877        self, result: unittest.result.TestResult, output: str, target_dialect: str
878    ) -> None:
879        divider_length = 70
880        if result.wasSuccessful():
881            self._print("=" * divider_length)
882            self._print(
883                f"Successfully Ran {str(result.testsRun)} tests against {target_dialect}",
884                style="green",
885            )
886            self._print("-" * divider_length)
887        else:
888            self._print("-" * divider_length)
889            self._print("Test Failure Summary")
890            self._print("=" * divider_length)
891            self._print(
892                f"Num Successful Tests: {result.testsRun - len(result.failures) - len(result.errors)}"
893            )
894            for test, _ in result.failures + result.errors:
895                if isinstance(test, ModelTest):
896                    self._print(f"Failure Test: {test.model.name} {test.test_name}")
897            self._print("=" * divider_length)
898            self._print(output)

Display the test result and output.

Arguments:
  • result: The unittest test result that contains metrics like num success, fails, ect.
  • output: The generated output from the unittest.
  • target_dialect: The dialect that tests were run against. Assumes all tests run against the same dialect.
def show_sql(self, sql: str) -> None:
900    def show_sql(self, sql: str) -> None:
901        self._print(Syntax(sql, "sql", word_wrap=True), crop=False)

Display to the user SQL.

def log_status_update(self, message: str) -> None:
903    def log_status_update(self, message: str) -> None:
904        self._print(message)

Display general status update to the user.

def log_error(self, message: str) -> None:
906    def log_error(self, message: str) -> None:
907        self._print(f"[red]{message}[/red]")

Display error info to the user.

def log_success(self, message: str) -> None:
909    def log_success(self, message: str) -> None:
910        self._print(f"\n[green]{message}[/green]\n")

Display a general successful message to the user.

def loading_start(self, message: Union[str, NoneType] = None) -> uuid.UUID:
912    def loading_start(self, message: t.Optional[str] = None) -> uuid.UUID:
913        id = uuid.uuid4()
914        self.loading_status[id] = Status(message or "", console=self.console, spinner="line")
915        self.loading_status[id].start()
916        return id

Starts loading and returns a unique ID that can be used to stop the loading. Optionally can display a message.

def loading_stop(self, id: uuid.UUID) -> None:
918    def loading_stop(self, id: uuid.UUID) -> None:
919        self.loading_status[id].stop()
920        del self.loading_status[id]

Stop loading for the given id.

def show_schema_diff(self, schema_diff: sqlmesh.core.table_diff.SchemaDiff) -> None:
922    def show_schema_diff(self, schema_diff: SchemaDiff) -> None:
923        source_name = schema_diff.source
924        if schema_diff.source_alias:
925            source_name = schema_diff.source_alias.upper()
926        target_name = schema_diff.target
927        if schema_diff.target_alias:
928            target_name = schema_diff.target_alias.upper()
929
930        first_line = f"\n[b]Schema Diff Between '[yellow]{source_name}[/yellow]' and '[green]{target_name}[/green]'"
931        if schema_diff.model_name:
932            first_line = (
933                first_line + f" environments for model '[blue]{schema_diff.model_name}[/blue]'"
934            )
935
936        tree = Tree(first_line + ":")
937
938        if any([schema_diff.added, schema_diff.removed, schema_diff.modified]):
939            if schema_diff.added:
940                added = Tree("[green]Added Columns:")
941                for c, t in schema_diff.added:
942                    added.add(f"[green]{c} ({t})")
943                tree.add(added)
944
945            if schema_diff.removed:
946                removed = Tree("[red]Removed Columns:")
947                for c, t in schema_diff.removed:
948                    removed.add(f"[red]{c} ({t})")
949                tree.add(removed)
950
951            if schema_diff.modified:
952                modified = Tree("[magenta]Modified Columns:")
953                for c, (ft, tt) in schema_diff.modified.items():
954                    modified.add(f"[magenta]{c} ({ft} -> {tt})")
955                tree.add(modified)
956        else:
957            tree.add("[b]Schemas match")
958
959        self.console.print(tree)

Show table schema diff.

def show_row_diff( self, row_diff: sqlmesh.core.table_diff.RowDiff, show_sample: bool = True) -> None:
961    def show_row_diff(self, row_diff: RowDiff, show_sample: bool = True) -> None:
962        source_name = row_diff.source
963        if row_diff.source_alias:
964            source_name = row_diff.source_alias.upper()
965        target_name = row_diff.target
966        if row_diff.target_alias:
967            target_name = row_diff.target_alias.upper()
968
969        tree = Tree("[b]Row Counts:[/b]")
970        tree.add(f" [b][blue]COMMON[/blue]:[/b] {row_diff.join_count} rows")
971        tree.add(f" [b][yellow]{source_name} ONLY[/yellow]:[/b] {row_diff.s_only_count} rows")
972        tree.add(f" [b][green]{target_name} ONLY[/green]:[/b] {row_diff.t_only_count} rows")
973        self.console.print("\n", tree)
974
975        self.console.print("\n[b][blue]COMMON ROWS[/blue] column comparison stats:[/b]")
976        if row_diff.column_stats.shape[0] > 0:
977            self.console.print(row_diff.column_stats.to_string(index=True), end="\n\n")
978        else:
979            self.console.print("  No columns with same name and data type in both tables")
980
981        if show_sample:
982            self.console.print("\n[b][blue]COMMON ROWS[/blue] sample data differences:[/b]")
983            if row_diff.joined_sample.shape[0] > 0:
984                self.console.print(row_diff.joined_sample.to_string(index=False), end="\n\n")
985            else:
986                self.console.print("  All joined rows match")
987
988            if row_diff.s_sample.shape[0] > 0:
989                self.console.print(f"\n[b][yellow]{source_name} ONLY[/yellow] sample rows:[/b]")
990                self.console.print(row_diff.s_sample.to_string(index=False), end="\n\n")
991
992            if row_diff.t_sample.shape[0] > 0:
993                self.console.print(f"\n[b][green]{target_name} ONLY[/green] sample rows:[/b]")
994                self.console.print(row_diff.t_sample.to_string(index=False), end="\n\n")

Show table summary diff.

def add_to_layout_widget( target_widget: ~LayoutWidget, *widgets: ipywidgets.widgets.widget.Widget) -> ~LayoutWidget:
1048def add_to_layout_widget(target_widget: LayoutWidget, *widgets: widgets.Widget) -> LayoutWidget:
1049    """Helper function to add a widget to a layout widget.
1050
1051    Args:
1052        target_widget: The layout widget to add the other widget(s) to.
1053        *widgets: The widgets to add to the layout widget.
1054
1055    Returns:
1056        The layout widget with the children added.
1057    """
1058    target_widget.children += tuple(widgets)
1059    return target_widget

Helper function to add a widget to a layout widget.

Arguments:
  • target_widget: The layout widget to add the other widget(s) to.
  • *widgets: The widgets to add to the layout widget.
Returns:

The layout widget with the children added.

class NotebookMagicConsole(TerminalConsole):
1062class NotebookMagicConsole(TerminalConsole):
1063    """
1064    Console to be used when using the magic notebook interface (`%<command>`).
1065    Generally reuses the Terminal console when possible by either directly outputing what it provides
1066    or capturing it and converting it into a widget.
1067    """
1068
1069    def __init__(
1070        self,
1071        display: t.Optional[t.Callable] = None,
1072        console: t.Optional[RichConsole] = None,
1073        **kwargs: t.Any,
1074    ) -> None:
1075        import ipywidgets as widgets
1076        from IPython import get_ipython
1077        from IPython.display import display as ipython_display
1078
1079        super().__init__(console, **kwargs)
1080
1081        self.display = display or get_ipython().user_ns.get("display", ipython_display)
1082        self.missing_dates_output = widgets.Output()
1083        self.dynamic_options_after_categorization_output = widgets.VBox()
1084
1085    def _show_missing_dates(self, plan: Plan, default_catalog: t.Optional[str]) -> None:
1086        self._add_to_dynamic_options(self.missing_dates_output)
1087        self.missing_dates_output.outputs = ()
1088        with self.missing_dates_output:
1089            super()._show_missing_dates(plan, default_catalog)
1090
1091    def _apply(self, button: widgets.Button) -> None:
1092        button.disabled = True
1093        with button.output:
1094            button.plan_builder.apply()
1095
1096    def _prompt_promote(self, plan_builder: PlanBuilder) -> None:
1097        import ipywidgets as widgets
1098
1099        button = widgets.Button(
1100            description="Apply - Virtual Update",
1101            disabled=False,
1102            button_style="success",
1103            # Auto will make the button really large.
1104            # Likely changing this soon anyways to be just `Apply` with description above
1105            layout={"width": "10rem"},
1106        )
1107        self._add_to_dynamic_options(button)
1108        output = widgets.Output()
1109        self._add_to_dynamic_options(output)
1110
1111        button.plan_builder = plan_builder
1112        button.on_click(self._apply)
1113        button.output = output
1114
1115    def _prompt_effective_from(
1116        self, plan_builder: PlanBuilder, auto_apply: bool, default_catalog: t.Optional[str]
1117    ) -> None:
1118        import ipywidgets as widgets
1119
1120        prompt = widgets.VBox()
1121
1122        def effective_from_change_callback(change: t.Dict[str, datetime.datetime]) -> None:
1123            plan_builder.set_effective_from(change["new"])
1124            self._show_options_after_categorization(plan_builder, auto_apply, default_catalog)
1125
1126        def going_forward_change_callback(change: t.Dict[str, bool]) -> None:
1127            checked = change["new"]
1128            plan_builder.set_effective_from(None if checked else yesterday_ds())
1129            self._show_options_after_categorization(
1130                plan_builder, auto_apply=auto_apply, default_catalog=default_catalog
1131            )
1132
1133        date_picker = widgets.DatePicker(
1134            disabled=plan_builder.build().effective_from is None,
1135            value=to_date(plan_builder.build().effective_from or yesterday_ds()),
1136            layout={"width": "auto"},
1137        )
1138        date_picker.observe(effective_from_change_callback, "value")
1139
1140        going_forward_checkbox = widgets.Checkbox(
1141            value=plan_builder.build().effective_from is None,
1142            description="Apply Going Forward Once Deployed To Prod",
1143            disabled=False,
1144            indent=False,
1145        )
1146        going_forward_checkbox.observe(going_forward_change_callback, "value")
1147
1148        add_to_layout_widget(
1149            prompt,
1150            widgets.HBox(
1151                [
1152                    widgets.Label("Effective From Date:", layout={"width": "8rem"}),
1153                    date_picker,
1154                    going_forward_checkbox,
1155                ]
1156            ),
1157        )
1158
1159        self._add_to_dynamic_options(prompt)
1160
1161    def _prompt_backfill(
1162        self, plan_builder: PlanBuilder, auto_apply: bool, default_catalog: t.Optional[str]
1163    ) -> None:
1164        import ipywidgets as widgets
1165
1166        prompt = widgets.VBox()
1167
1168        backfill_or_preview = (
1169            "Preview"
1170            if plan_builder.build().is_dev and plan_builder.build().forward_only
1171            else "Backfill"
1172        )
1173
1174        def _date_picker(
1175            plan_builder: PlanBuilder, value: t.Any, on_change: t.Callable, disabled: bool = False
1176        ) -> widgets.DatePicker:
1177            picker = widgets.DatePicker(
1178                disabled=disabled,
1179                value=value,
1180                layout={"width": "auto"},
1181            )
1182
1183            picker.observe(on_change, "value")
1184            return picker
1185
1186        def start_change_callback(change: t.Dict[str, datetime.datetime]) -> None:
1187            plan_builder.set_start(change["new"])
1188            self._show_options_after_categorization(plan_builder, auto_apply, default_catalog)
1189
1190        def end_change_callback(change: t.Dict[str, datetime.datetime]) -> None:
1191            plan_builder.set_end(change["new"])
1192            self._show_options_after_categorization(plan_builder, auto_apply, default_catalog)
1193
1194        if plan_builder.is_start_and_end_allowed:
1195            add_to_layout_widget(
1196                prompt,
1197                widgets.HBox(
1198                    [
1199                        widgets.Label(
1200                            f"Start {backfill_or_preview} Date:", layout={"width": "8rem"}
1201                        ),
1202                        _date_picker(
1203                            plan_builder, to_date(plan_builder.build().start), start_change_callback
1204                        ),
1205                    ]
1206                ),
1207            )
1208
1209            add_to_layout_widget(
1210                prompt,
1211                widgets.HBox(
1212                    [
1213                        widgets.Label(f"End {backfill_or_preview} Date:", layout={"width": "8rem"}),
1214                        _date_picker(
1215                            plan_builder,
1216                            to_date(plan_builder.build().end),
1217                            end_change_callback,
1218                        ),
1219                    ]
1220                ),
1221            )
1222
1223        self._add_to_dynamic_options(prompt)
1224
1225        if not auto_apply:
1226            button = widgets.Button(
1227                description=f"Apply - {backfill_or_preview} Tables",
1228                disabled=False,
1229                button_style="success",
1230            )
1231            self._add_to_dynamic_options(button)
1232            output = widgets.Output()
1233            self._add_to_dynamic_options(output)
1234
1235            button.plan_builder = plan_builder
1236            button.on_click(self._apply)
1237            button.output = output
1238
1239    def _show_options_after_categorization(
1240        self, plan_builder: PlanBuilder, auto_apply: bool, default_catalog: t.Optional[str]
1241    ) -> None:
1242        self.dynamic_options_after_categorization_output.children = ()
1243        self.display(self.dynamic_options_after_categorization_output)
1244        super()._show_options_after_categorization(plan_builder, auto_apply, default_catalog)
1245
1246    def _add_to_dynamic_options(self, *widgets: widgets.Widget) -> None:
1247        add_to_layout_widget(self.dynamic_options_after_categorization_output, *widgets)
1248
1249    def _get_snapshot_change_category(
1250        self,
1251        snapshot: Snapshot,
1252        plan_builder: PlanBuilder,
1253        auto_apply: bool,
1254        default_catalog: t.Optional[str],
1255    ) -> None:
1256        import ipywidgets as widgets
1257
1258        choice_mapping = self._snapshot_change_choices(
1259            snapshot,
1260            plan_builder.environment_naming_info,
1261            default_catalog,
1262            use_rich_formatting=False,
1263        )
1264        choices = list(choice_mapping)
1265        plan_builder.set_choice(snapshot, choices[0])
1266
1267        def radio_button_selected(change: t.Dict[str, t.Any]) -> None:
1268            plan_builder.set_choice(snapshot, choices[change["owner"].index])
1269            self._show_options_after_categorization(plan_builder, auto_apply, default_catalog)
1270
1271        radio = widgets.RadioButtons(
1272            options=choice_mapping.values(),
1273            layout={"width": "max-content"},
1274            disabled=False,
1275        )
1276        radio.observe(
1277            radio_button_selected,
1278            "value",
1279        )
1280        self.display(radio)
1281
1282    def log_test_results(
1283        self, result: unittest.result.TestResult, output: str, target_dialect: str
1284    ) -> None:
1285        import ipywidgets as widgets
1286
1287        divider_length = 70
1288        shared_style = {
1289            "font-size": "11px",
1290            "font-weight": "bold",
1291            "font-family": "Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace",
1292        }
1293        if result.wasSuccessful():
1294            success_color = {"color": "#008000"}
1295            header = str(h("span", {"style": shared_style}, "-" * divider_length))
1296            message = str(
1297                h(
1298                    "span",
1299                    {"style": {**shared_style, **success_color}},
1300                    f"Successfully Ran {str(result.testsRun)} Tests Against {target_dialect}",
1301                )
1302            )
1303            footer = str(h("span", {"style": shared_style}, "=" * divider_length))
1304            self.display(widgets.HTML("<br>".join([header, message, footer])))
1305        else:
1306            fail_color = {"color": "#db3737"}
1307            fail_shared_style = {**shared_style, **fail_color}
1308            header = str(h("span", {"style": fail_shared_style}, "-" * divider_length))
1309            message = str(h("span", {"style": fail_shared_style}, "Test Failure Summary"))
1310            num_success = str(
1311                h(
1312                    "span",
1313                    {"style": fail_shared_style},
1314                    f"Num Successful Tests: {result.testsRun - len(result.failures) - len(result.errors)}",
1315                )
1316            )
1317            failure_tests = []
1318            for test, _ in result.failures + result.errors:
1319                if isinstance(test, ModelTest):
1320                    failure_tests.append(
1321                        str(
1322                            h(
1323                                "span",
1324                                {"style": fail_shared_style},
1325                                f"Failure Test: {test.model.name} {test.test_name}",
1326                            )
1327                        )
1328                    )
1329            failures = "<br>".join(failure_tests)
1330            footer = str(h("span", {"style": fail_shared_style}, "=" * divider_length))
1331            error_output = widgets.Textarea(output, layout={"height": "300px", "width": "100%"})
1332            test_info = widgets.HTML(
1333                "<br>".join([header, message, footer, num_success, failures, footer])
1334            )
1335            self.display(widgets.VBox(children=[test_info, error_output], layout={"width": "100%"}))

Console to be used when using the magic notebook interface (%<command>). Generally reuses the Terminal console when possible by either directly outputing what it provides or capturing it and converting it into a widget.

NotebookMagicConsole( display: Union[Callable, NoneType] = None, console: Union[rich.console.Console, NoneType] = None, **kwargs: Any)
1069    def __init__(
1070        self,
1071        display: t.Optional[t.Callable] = None,
1072        console: t.Optional[RichConsole] = None,
1073        **kwargs: t.Any,
1074    ) -> None:
1075        import ipywidgets as widgets
1076        from IPython import get_ipython
1077        from IPython.display import display as ipython_display
1078
1079        super().__init__(console, **kwargs)
1080
1081        self.display = display or get_ipython().user_ns.get("display", ipython_display)
1082        self.missing_dates_output = widgets.Output()
1083        self.dynamic_options_after_categorization_output = widgets.VBox()
def log_test_results( self, result: unittest.result.TestResult, output: str, target_dialect: str) -> None:
1282    def log_test_results(
1283        self, result: unittest.result.TestResult, output: str, target_dialect: str
1284    ) -> None:
1285        import ipywidgets as widgets
1286
1287        divider_length = 70
1288        shared_style = {
1289            "font-size": "11px",
1290            "font-weight": "bold",
1291            "font-family": "Menlo,'DejaVu Sans Mono',consolas,'Courier New',monospace",
1292        }
1293        if result.wasSuccessful():
1294            success_color = {"color": "#008000"}
1295            header = str(h("span", {"style": shared_style}, "-" * divider_length))
1296            message = str(
1297                h(
1298                    "span",
1299                    {"style": {**shared_style, **success_color}},
1300                    f"Successfully Ran {str(result.testsRun)} Tests Against {target_dialect}",
1301                )
1302            )
1303            footer = str(h("span", {"style": shared_style}, "=" * divider_length))
1304            self.display(widgets.HTML("<br>".join([header, message, footer])))
1305        else:
1306            fail_color = {"color": "#db3737"}
1307            fail_shared_style = {**shared_style, **fail_color}
1308            header = str(h("span", {"style": fail_shared_style}, "-" * divider_length))
1309            message = str(h("span", {"style": fail_shared_style}, "Test Failure Summary"))
1310            num_success = str(
1311                h(
1312                    "span",
1313                    {"style": fail_shared_style},
1314                    f"Num Successful Tests: {result.testsRun - len(result.failures) - len(result.errors)}",
1315                )
1316            )
1317            failure_tests = []
1318            for test, _ in result.failures + result.errors:
1319                if isinstance(test, ModelTest):
1320                    failure_tests.append(
1321                        str(
1322                            h(
1323                                "span",
1324                                {"style": fail_shared_style},
1325                                f"Failure Test: {test.model.name} {test.test_name}",
1326                            )
1327                        )
1328                    )
1329            failures = "<br>".join(failure_tests)
1330            footer = str(h("span", {"style": fail_shared_style}, "=" * divider_length))
1331            error_output = widgets.Textarea(output, layout={"height": "300px", "width": "100%"})
1332            test_info = widgets.HTML(
1333                "<br>".join([header, message, footer, num_success, failures, footer])
1334            )
1335            self.display(widgets.VBox(children=[test_info, error_output], layout={"width": "100%"}))

Display the test result and output.

Arguments:
  • result: The unittest test result that contains metrics like num success, fails, ect.
  • output: The generated output from the unittest.
  • target_dialect: The dialect that tests were run against. Assumes all tests run against the same dialect.
class CaptureTerminalConsole(TerminalConsole):
1338class CaptureTerminalConsole(TerminalConsole):
1339    """
1340    Captures the output of the terminal console so that it can be extracted out and displayed within other interfaces.
1341    The captured output is cleared out after it is retrieved.
1342
1343    Note: `_prompt` and `_confirm` need to also be overriden to work with the custom interface if you want to use
1344    this console interactively.
1345    """
1346
1347    def __init__(self, console: t.Optional[RichConsole] = None, **kwargs: t.Any) -> None:
1348        super().__init__(console=console, **kwargs)
1349        self._captured_outputs: t.List[str] = []
1350        self._errors: t.List[str] = []
1351
1352    @property
1353    def captured_output(self) -> str:
1354        return "".join(self._captured_outputs)
1355
1356    @property
1357    def captured_errors(self) -> str:
1358        return "".join(self._errors)
1359
1360    def consume_captured_output(self) -> str:
1361        output = self.captured_output
1362        self.clear_captured_outputs()
1363        return output
1364
1365    def consume_captured_errors(self) -> str:
1366        errors = self.captured_errors
1367        self.clear_captured_errors()
1368        return errors
1369
1370    def clear_captured_outputs(self) -> None:
1371        self._captured_outputs = []
1372
1373    def clear_captured_errors(self) -> None:
1374        self._errors = []
1375
1376    def log_error(self, message: str) -> None:
1377        self._errors.append(message)
1378        super().log_error(message)
1379
1380    def _print(self, value: t.Any, **kwargs: t.Any) -> None:
1381        with self.console.capture() as capture:
1382            self.console.print(value, **kwargs)
1383        self._captured_outputs.append(capture.get())

Captures the output of the terminal console so that it can be extracted out and displayed within other interfaces. The captured output is cleared out after it is retrieved.

Note: _prompt and _confirm need to also be overriden to work with the custom interface if you want to use this console interactively.

CaptureTerminalConsole(console: Union[rich.console.Console, NoneType] = None, **kwargs: Any)
1347    def __init__(self, console: t.Optional[RichConsole] = None, **kwargs: t.Any) -> None:
1348        super().__init__(console=console, **kwargs)
1349        self._captured_outputs: t.List[str] = []
1350        self._errors: t.List[str] = []
def consume_captured_output(self) -> str:
1360    def consume_captured_output(self) -> str:
1361        output = self.captured_output
1362        self.clear_captured_outputs()
1363        return output
def consume_captured_errors(self) -> str:
1365    def consume_captured_errors(self) -> str:
1366        errors = self.captured_errors
1367        self.clear_captured_errors()
1368        return errors
def clear_captured_outputs(self) -> None:
1370    def clear_captured_outputs(self) -> None:
1371        self._captured_outputs = []
def clear_captured_errors(self) -> None:
1373    def clear_captured_errors(self) -> None:
1374        self._errors = []
def log_error(self, message: str) -> None:
1376    def log_error(self, message: str) -> None:
1377        self._errors.append(message)
1378        super().log_error(message)

Display error info to the user.

class MarkdownConsole(CaptureTerminalConsole):
1386class MarkdownConsole(CaptureTerminalConsole):
1387    """
1388    A console that outputs markdown. Currently this is only configured for non-interactive use so for use cases
1389    where you want to display a plan or test results in markdown.
1390    """
1391
1392    def show_model_difference_summary(
1393        self,
1394        context_diff: ContextDiff,
1395        environment_naming_info: EnvironmentNamingInfo,
1396        default_catalog: t.Optional[str],
1397        no_diff: bool = True,
1398        ignored_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None,
1399    ) -> None:
1400        """Shows a summary of the differences.
1401
1402        Args:
1403            context_diff: The context diff to use to print the summary.
1404            environment_naming_info: The environment naming info to reference when printing model names
1405            default_catalog: The default catalog to reference when deciding to remove catalog from display names
1406            no_diff: Hide the actual SQL differences.
1407            ignored_snapshot_ids: A set of snapshot names that are ignored
1408        """
1409        ignored_snapshot_ids = ignored_snapshot_ids or set()
1410        if context_diff.is_new_environment:
1411            self._print(
1412                f"**New environment `{context_diff.environment}` will be created from `{context_diff.create_from}`**\n"
1413            )
1414            if not context_diff.has_snapshot_changes:
1415                return
1416
1417        if not context_diff.has_changes:
1418            self._print(f"**No differences when compared to `{context_diff.environment}`**\n")
1419            return
1420
1421        self._print(f"**Summary of differences against `{context_diff.environment}`:**\n")
1422
1423        added_snapshots = {
1424            context_diff.snapshots[s_id]
1425            for s_id in context_diff.added
1426            if s_id not in ignored_snapshot_ids
1427        }
1428        added_snapshot_models = {s for s in added_snapshots if s.is_model}
1429        if added_snapshot_models:
1430            self._print("\n**Added Models:**")
1431            for snapshot in sorted(added_snapshot_models):
1432                self._print(
1433                    f"- `{snapshot.display_name(environment_naming_info, default_catalog)}`"
1434                )
1435
1436        added_snapshot_audits = {s for s in added_snapshots if s.is_audit}
1437        if added_snapshot_audits:
1438            self._print("\n**Added Standalone Audits:**")
1439            for snapshot in sorted(added_snapshot_audits):
1440                self._print(
1441                    f"- `{snapshot.display_name(environment_naming_info, default_catalog)}`"
1442                )
1443
1444        removed_snapshot_table_infos = {
1445            snapshot_table_info
1446            for s_id, snapshot_table_info in context_diff.removed_snapshots.items()
1447            if s_id not in ignored_snapshot_ids
1448        }
1449        removed_model_snapshot_table_infos = {s for s in removed_snapshot_table_infos if s.is_model}
1450        if removed_model_snapshot_table_infos:
1451            self._print("\n**Removed Models:**")
1452            for snapshot_table_info in sorted(removed_model_snapshot_table_infos):
1453                self._print(
1454                    f"- `{snapshot_table_info.display_name(environment_naming_info, default_catalog)}`"
1455                )
1456
1457        removed_audit_snapshot_table_infos = {s for s in removed_snapshot_table_infos if s.is_audit}
1458        if removed_audit_snapshot_table_infos:
1459            self._print("\n**Removed Standalone Audits:**")
1460            for snapshot_table_info in sorted(removed_audit_snapshot_table_infos):
1461                self._print(
1462                    f"- `{snapshot_table_info.display_name(environment_naming_info, default_catalog)}`"
1463                )
1464
1465        modified_snapshots = {
1466            current_snapshot
1467            for current_snapshot, _ in context_diff.modified_snapshots.values()
1468            if current_snapshot.snapshot_id not in ignored_snapshot_ids
1469        }
1470        if modified_snapshots:
1471            directly_modified = []
1472            indirectly_modified = []
1473            metadata_modified = []
1474            for snapshot in modified_snapshots:
1475                if context_diff.directly_modified(snapshot.name):
1476                    directly_modified.append(snapshot)
1477                elif context_diff.indirectly_modified(snapshot.name):
1478                    indirectly_modified.append(snapshot)
1479                elif context_diff.metadata_updated(snapshot.name):
1480                    metadata_modified.append(snapshot)
1481            if directly_modified:
1482                self._print("\n**Directly Modified:**")
1483                for snapshot in sorted(directly_modified):
1484                    self._print(
1485                        f"- `{snapshot.display_name(environment_naming_info, default_catalog)}`"
1486                    )
1487                    if not no_diff:
1488                        self._print(f"```diff\n{context_diff.text_diff(snapshot.name)}\n```")
1489            if indirectly_modified:
1490                self._print("\n**Indirectly Modified:**")
1491                for snapshot in sorted(indirectly_modified):
1492                    self._print(
1493                        f"- `{snapshot.display_name(environment_naming_info, default_catalog)}`"
1494                    )
1495            if metadata_modified:
1496                self._print("\n**Metadata Updated:**")
1497                for snapshot in sorted(metadata_modified):
1498                    self._print(
1499                        f"- `{snapshot.display_name(environment_naming_info, default_catalog)}`"
1500                    )
1501        if ignored_snapshot_ids:
1502            self._print("\n**Ignored Models (Expected Plan Start):**")
1503            for s_id in sorted(ignored_snapshot_ids):
1504                snapshot = context_diff.snapshots[s_id]
1505                self._print(
1506                    f"- `{snapshot.display_name(environment_naming_info, default_catalog)}` ({snapshot.get_latest(start_date(snapshot, context_diff.snapshots.values()))})"
1507                )
1508
1509    def _show_missing_dates(self, plan: Plan, default_catalog: t.Optional[str]) -> None:
1510        """Displays the models with missing dates."""
1511        missing_intervals = plan.missing_intervals
1512        if not missing_intervals:
1513            return
1514        self._print("\n**Models needing backfill (missing dates):**")
1515        for missing in missing_intervals:
1516            snapshot = plan.context_diff.snapshots[missing.snapshot_id]
1517            if not snapshot.is_model:
1518                continue
1519
1520            preview_modifier = ""
1521            if not plan.deployability_index.is_deployable(snapshot):
1522                preview_modifier = " (**preview**)"
1523
1524            self._print(
1525                f"* `{snapshot.display_name(plan.environment_naming_info, default_catalog)}`: {missing.format_intervals(snapshot.node.interval_unit)}{preview_modifier}"
1526            )
1527
1528    def _show_categorized_snapshots(self, plan: Plan, default_catalog: t.Optional[str]) -> None:
1529        context_diff = plan.context_diff
1530        for snapshot in plan.categorized:
1531            if not context_diff.directly_modified(snapshot.name):
1532                continue
1533
1534            category_str = SNAPSHOT_CHANGE_CATEGORY_STR[snapshot.change_category]
1535            tree = Tree(
1536                f"[bold][direct]Directly Modified: {snapshot.display_name(plan.environment_naming_info, default_catalog)} ({category_str})"
1537            )
1538            indirect_tree = None
1539            for child_sid in sorted(plan.indirectly_modified.get(snapshot.snapshot_id, set())):
1540                child_snapshot = context_diff.snapshots[child_sid]
1541                if not indirect_tree:
1542                    indirect_tree = Tree("[indirect]Indirectly Modified Children:")
1543                    tree.add(indirect_tree)
1544                child_category_str = SNAPSHOT_CHANGE_CATEGORY_STR[child_snapshot.change_category]
1545                indirect_tree.add(
1546                    f"[indirect]{child_snapshot.display_name(plan.environment_naming_info, default_catalog)} ({child_category_str})"
1547                )
1548            self._print(f"```diff\n{context_diff.text_diff(snapshot.name)}\n```\n")
1549            self._print("```\n")
1550            self._print(tree)
1551            self._print("\n```")
1552
1553    def log_test_results(
1554        self, result: unittest.result.TestResult, output: str, target_dialect: str
1555    ) -> None:
1556        # import ipywidgets as widgets
1557        if result.wasSuccessful():
1558            self._print(
1559                f"**Successfully Ran `{str(result.testsRun)}` Tests Against `{target_dialect}`**\n\n"
1560            )
1561        else:
1562            self._print(
1563                f"**Num Successful Tests: {result.testsRun - len(result.failures) - len(result.errors)}**\n\n"
1564            )
1565            for test, _ in result.failures + result.errors:
1566                if isinstance(test, ModelTest):
1567                    self._print(f"* Failure Test: `{test.model.name}` - `{test.test_name}`\n\n")
1568            self._print(f"```{output}```\n\n")
1569
1570    def log_error(self, message: str) -> None:
1571        super().log_error(f"```\n{message}```\n\n")

A console that outputs markdown. Currently this is only configured for non-interactive use so for use cases where you want to display a plan or test results in markdown.

def show_model_difference_summary( self, context_diff: sqlmesh.core.context_diff.ContextDiff, environment_naming_info: sqlmesh.core.environment.EnvironmentNamingInfo, default_catalog: Union[str, NoneType], no_diff: bool = True, ignored_snapshot_ids: Union[Set[sqlmesh.core.snapshot.definition.SnapshotId], NoneType] = None) -> None:
1392    def show_model_difference_summary(
1393        self,
1394        context_diff: ContextDiff,
1395        environment_naming_info: EnvironmentNamingInfo,
1396        default_catalog: t.Optional[str],
1397        no_diff: bool = True,
1398        ignored_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None,
1399    ) -> None:
1400        """Shows a summary of the differences.
1401
1402        Args:
1403            context_diff: The context diff to use to print the summary.
1404            environment_naming_info: The environment naming info to reference when printing model names
1405            default_catalog: The default catalog to reference when deciding to remove catalog from display names
1406            no_diff: Hide the actual SQL differences.
1407            ignored_snapshot_ids: A set of snapshot names that are ignored
1408        """
1409        ignored_snapshot_ids = ignored_snapshot_ids or set()
1410        if context_diff.is_new_environment:
1411            self._print(
1412                f"**New environment `{context_diff.environment}` will be created from `{context_diff.create_from}`**\n"
1413            )
1414            if not context_diff.has_snapshot_changes:
1415                return
1416
1417        if not context_diff.has_changes:
1418            self._print(f"**No differences when compared to `{context_diff.environment}`**\n")
1419            return
1420
1421        self._print(f"**Summary of differences against `{context_diff.environment}`:**\n")
1422
1423        added_snapshots = {
1424            context_diff.snapshots[s_id]
1425            for s_id in context_diff.added
1426            if s_id not in ignored_snapshot_ids
1427        }
1428        added_snapshot_models = {s for s in added_snapshots if s.is_model}
1429        if added_snapshot_models:
1430            self._print("\n**Added Models:**")
1431            for snapshot in sorted(added_snapshot_models):
1432                self._print(
1433                    f"- `{snapshot.display_name(environment_naming_info, default_catalog)}`"
1434                )
1435
1436        added_snapshot_audits = {s for s in added_snapshots if s.is_audit}
1437        if added_snapshot_audits:
1438            self._print("\n**Added Standalone Audits:**")
1439            for snapshot in sorted(added_snapshot_audits):
1440                self._print(
1441                    f"- `{snapshot.display_name(environment_naming_info, default_catalog)}`"
1442                )
1443
1444        removed_snapshot_table_infos = {
1445            snapshot_table_info
1446            for s_id, snapshot_table_info in context_diff.removed_snapshots.items()
1447            if s_id not in ignored_snapshot_ids
1448        }
1449        removed_model_snapshot_table_infos = {s for s in removed_snapshot_table_infos if s.is_model}
1450        if removed_model_snapshot_table_infos:
1451            self._print("\n**Removed Models:**")
1452            for snapshot_table_info in sorted(removed_model_snapshot_table_infos):
1453                self._print(
1454                    f"- `{snapshot_table_info.display_name(environment_naming_info, default_catalog)}`"
1455                )
1456
1457        removed_audit_snapshot_table_infos = {s for s in removed_snapshot_table_infos if s.is_audit}
1458        if removed_audit_snapshot_table_infos:
1459            self._print("\n**Removed Standalone Audits:**")
1460            for snapshot_table_info in sorted(removed_audit_snapshot_table_infos):
1461                self._print(
1462                    f"- `{snapshot_table_info.display_name(environment_naming_info, default_catalog)}`"
1463                )
1464
1465        modified_snapshots = {
1466            current_snapshot
1467            for current_snapshot, _ in context_diff.modified_snapshots.values()
1468            if current_snapshot.snapshot_id not in ignored_snapshot_ids
1469        }
1470        if modified_snapshots:
1471            directly_modified = []
1472            indirectly_modified = []
1473            metadata_modified = []
1474            for snapshot in modified_snapshots:
1475                if context_diff.directly_modified(snapshot.name):
1476                    directly_modified.append(snapshot)
1477                elif context_diff.indirectly_modified(snapshot.name):
1478                    indirectly_modified.append(snapshot)
1479                elif context_diff.metadata_updated(snapshot.name):
1480                    metadata_modified.append(snapshot)
1481            if directly_modified:
1482                self._print("\n**Directly Modified:**")
1483                for snapshot in sorted(directly_modified):
1484                    self._print(
1485                        f"- `{snapshot.display_name(environment_naming_info, default_catalog)}`"
1486                    )
1487                    if not no_diff:
1488                        self._print(f"```diff\n{context_diff.text_diff(snapshot.name)}\n```")
1489            if indirectly_modified:
1490                self._print("\n**Indirectly Modified:**")
1491                for snapshot in sorted(indirectly_modified):
1492                    self._print(
1493                        f"- `{snapshot.display_name(environment_naming_info, default_catalog)}`"
1494                    )
1495            if metadata_modified:
1496                self._print("\n**Metadata Updated:**")
1497                for snapshot in sorted(metadata_modified):
1498                    self._print(
1499                        f"- `{snapshot.display_name(environment_naming_info, default_catalog)}`"
1500                    )
1501        if ignored_snapshot_ids:
1502            self._print("\n**Ignored Models (Expected Plan Start):**")
1503            for s_id in sorted(ignored_snapshot_ids):
1504                snapshot = context_diff.snapshots[s_id]
1505                self._print(
1506                    f"- `{snapshot.display_name(environment_naming_info, default_catalog)}` ({snapshot.get_latest(start_date(snapshot, context_diff.snapshots.values()))})"
1507                )

Shows a summary of the differences.

Arguments:
  • context_diff: The context diff to use to print the summary.
  • environment_naming_info: The environment naming info to reference when printing model names
  • default_catalog: The default catalog to reference when deciding to remove catalog from display names
  • no_diff: Hide the actual SQL differences.
  • ignored_snapshot_ids: A set of snapshot names that are ignored
def log_test_results( self, result: unittest.result.TestResult, output: str, target_dialect: str) -> None:
1553    def log_test_results(
1554        self, result: unittest.result.TestResult, output: str, target_dialect: str
1555    ) -> None:
1556        # import ipywidgets as widgets
1557        if result.wasSuccessful():
1558            self._print(
1559                f"**Successfully Ran `{str(result.testsRun)}` Tests Against `{target_dialect}`**\n\n"
1560            )
1561        else:
1562            self._print(
1563                f"**Num Successful Tests: {result.testsRun - len(result.failures) - len(result.errors)}**\n\n"
1564            )
1565            for test, _ in result.failures + result.errors:
1566                if isinstance(test, ModelTest):
1567                    self._print(f"* Failure Test: `{test.model.name}` - `{test.test_name}`\n\n")
1568            self._print(f"```{output}```\n\n")

Display the test result and output.

Arguments:
  • result: The unittest test result that contains metrics like num success, fails, ect.
  • output: The generated output from the unittest.
  • target_dialect: The dialect that tests were run against. Assumes all tests run against the same dialect.
def log_error(self, message: str) -> None:
1570    def log_error(self, message: str) -> None:
1571        super().log_error(f"```\n{message}```\n\n")

Display error info to the user.

class DatabricksMagicConsole(CaptureTerminalConsole):
1574class DatabricksMagicConsole(CaptureTerminalConsole):
1575    """
1576    Note: Databricks Magic Console currently does not support progress bars while a plan is being applied. The
1577    NotebookMagicConsole does support progress bars, but they will time out after 5 minutes of execution
1578    and it makes it difficult to see the progress of the plan.
1579    """
1580
1581    def __init__(self, *args: t.Any, **kwargs: t.Any) -> None:
1582        super().__init__(*args, **kwargs)
1583        self.evaluation_batch_progress: t.Dict[SnapshotId, t.Tuple[str, int]] = {}
1584        self.promotion_status: t.Tuple[int, int] = (0, 0)
1585        self.model_creation_status: t.Tuple[int, int] = (0, 0)
1586        self.migration_status: t.Tuple[int, int] = (0, 0)
1587
1588    def _print(self, value: t.Any, **kwargs: t.Any) -> None:
1589        super()._print(value, **kwargs)
1590        for captured_output in self._captured_outputs:
1591            print(captured_output)
1592        self.clear_captured_outputs()
1593
1594    def _prompt(self, message: str, **kwargs: t.Any) -> t.Any:
1595        self._print(message)
1596        return super()._prompt("", **kwargs)
1597
1598    def _confirm(self, message: str, **kwargs: t.Any) -> bool:
1599        message = f"{message} [y/n]"
1600        self._print(message)
1601        return super()._confirm("", **kwargs)
1602
1603    def start_evaluation_progress(
1604        self,
1605        batches: t.Dict[Snapshot, int],
1606        environment_naming_info: EnvironmentNamingInfo,
1607        default_catalog: t.Optional[str],
1608    ) -> None:
1609        self.evaluation_batches = batches
1610        self.evaluation_environment_naming_info = environment_naming_info
1611        self.default_catalog = default_catalog
1612
1613    def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
1614        if not self.evaluation_batch_progress.get(snapshot.snapshot_id):
1615            display_name = snapshot.display_name(
1616                self.evaluation_environment_naming_info, self.default_catalog
1617            )
1618            self.evaluation_batch_progress[snapshot.snapshot_id] = (display_name, 0)
1619            print(f"Starting '{display_name}', Total batches: {self.evaluation_batches[snapshot]}")
1620
1621    def update_snapshot_evaluation_progress(
1622        self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int]
1623    ) -> None:
1624        view_name, loaded_batches = self.evaluation_batch_progress[snapshot.snapshot_id]
1625        total_batches = self.evaluation_batches[snapshot]
1626
1627        loaded_batches += 1
1628        self.evaluation_batch_progress[snapshot.snapshot_id] = (view_name, loaded_batches)
1629
1630        finished_loading = loaded_batches == total_batches
1631        status = "Loaded" if finished_loading else "Loading"
1632        print(f"{status} '{view_name}', Completed Batches: {loaded_batches}/{total_batches}")
1633        if finished_loading:
1634            total_finished_loading = len(
1635                [
1636                    s
1637                    for s, total in self.evaluation_batches.items()
1638                    if self.evaluation_batch_progress.get(s.snapshot_id, (None, -1))[1] == total
1639                ]
1640            )
1641            total = len(self.evaluation_batch_progress)
1642            print(f"Completed Loading {total_finished_loading}/{total} Models")
1643
1644    def stop_evaluation_progress(self, success: bool = True) -> None:
1645        self.evaluation_batch_progress = {}
1646        super().stop_evaluation_progress(success)
1647        print(f"Loading {'succeeded' if success else 'failed'}")
1648
1649    def start_creation_progress(
1650        self,
1651        total_tasks: int,
1652        environment_naming_info: EnvironmentNamingInfo,
1653        default_catalog: t.Optional[str],
1654    ) -> None:
1655        """Indicates that a new creation progress has begun."""
1656        self.model_creation_status = (0, total_tasks)
1657        print("Starting Creating New Model Versions")
1658
1659    def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None:
1660        """Update the snapshot creation progress."""
1661        num_creations, total_creations = self.model_creation_status
1662        num_creations += 1
1663        self.model_creation_status = (num_creations, total_creations)
1664        if num_creations % 5 == 0:
1665            print(f"Created New Model Versions: {num_creations}/{total_creations}")
1666
1667    def stop_creation_progress(self, success: bool = True) -> None:
1668        """Stop the snapshot creation progress."""
1669        self.model_creation_status = (0, 0)
1670        print(f"New Model Creation {'succeeded' if success else 'failed'}")
1671
1672    def start_promotion_progress(
1673        self,
1674        total_tasks: int,
1675        environment_naming_info: EnvironmentNamingInfo,
1676        default_catalog: t.Optional[str],
1677    ) -> None:
1678        """Indicates that a new snapshot promotion progress has begun."""
1679        self.promotion_status = (0, total_tasks)
1680        print(f"Virtually Updating '{environment_naming_info.name}'")
1681
1682    def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
1683        """Update the snapshot promotion progress."""
1684        num_promotions, total_promotions = self.promotion_status
1685        num_promotions += 1
1686        self.promotion_status = (num_promotions, total_promotions)
1687        if num_promotions % 5 == 0:
1688            print(f"Virtually Updated {num_promotions}/{total_promotions}")
1689
1690    def stop_promotion_progress(self, success: bool = True) -> None:
1691        """Stop the snapshot promotion progress."""
1692        self.promotion_status = (0, 0)
1693        print(f"Virtual Update {'succeeded' if success else 'failed'}")
1694
1695    def start_migration_progress(self, total_tasks: int) -> None:
1696        """Indicates that a new migration progress has begun."""
1697        self.migration_status = (0, total_tasks)
1698        print("Starting Migration")
1699
1700    def update_migration_progress(self, num_tasks: int) -> None:
1701        """Update the migration progress."""
1702        num_migrations, total_migrations = self.migration_status
1703        num_migrations += num_tasks
1704        self.migration_status = (num_migrations, total_migrations)
1705        if num_migrations % 5 == 0:
1706            print(f"Migration Updated {num_migrations}/{total_migrations}")
1707
1708    def stop_migration_progress(self, success: bool = True) -> None:
1709        """Stop the migration progress."""
1710        self.migration_status = (0, 0)
1711        print(f"Migration {'succeeded' if success else 'failed'}")

Note: Databricks Magic Console currently does not support progress bars while a plan is being applied. The NotebookMagicConsole does support progress bars, but they will time out after 5 minutes of execution and it makes it difficult to see the progress of the plan.

DatabricksMagicConsole(*args: Any, **kwargs: Any)
1581    def __init__(self, *args: t.Any, **kwargs: t.Any) -> None:
1582        super().__init__(*args, **kwargs)
1583        self.evaluation_batch_progress: t.Dict[SnapshotId, t.Tuple[str, int]] = {}
1584        self.promotion_status: t.Tuple[int, int] = (0, 0)
1585        self.model_creation_status: t.Tuple[int, int] = (0, 0)
1586        self.migration_status: t.Tuple[int, int] = (0, 0)
def start_evaluation_progress( self, batches: Dict[sqlmesh.core.snapshot.definition.Snapshot, int], environment_naming_info: sqlmesh.core.environment.EnvironmentNamingInfo, default_catalog: Union[str, NoneType]) -> None:
1603    def start_evaluation_progress(
1604        self,
1605        batches: t.Dict[Snapshot, int],
1606        environment_naming_info: EnvironmentNamingInfo,
1607        default_catalog: t.Optional[str],
1608    ) -> None:
1609        self.evaluation_batches = batches
1610        self.evaluation_environment_naming_info = environment_naming_info
1611        self.default_catalog = default_catalog

Indicates that a new snapshot evaluation progress has begun.

def start_snapshot_evaluation_progress(self, snapshot: sqlmesh.core.snapshot.definition.Snapshot) -> None:
1613    def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
1614        if not self.evaluation_batch_progress.get(snapshot.snapshot_id):
1615            display_name = snapshot.display_name(
1616                self.evaluation_environment_naming_info, self.default_catalog
1617            )
1618            self.evaluation_batch_progress[snapshot.snapshot_id] = (display_name, 0)
1619            print(f"Starting '{display_name}', Total batches: {self.evaluation_batches[snapshot]}")

Starts the snapshot evaluation progress.

def update_snapshot_evaluation_progress( self, snapshot: sqlmesh.core.snapshot.definition.Snapshot, batch_idx: int, duration_ms: Union[int, NoneType]) -> None:
1621    def update_snapshot_evaluation_progress(
1622        self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int]
1623    ) -> None:
1624        view_name, loaded_batches = self.evaluation_batch_progress[snapshot.snapshot_id]
1625        total_batches = self.evaluation_batches[snapshot]
1626
1627        loaded_batches += 1
1628        self.evaluation_batch_progress[snapshot.snapshot_id] = (view_name, loaded_batches)
1629
1630        finished_loading = loaded_batches == total_batches
1631        status = "Loaded" if finished_loading else "Loading"
1632        print(f"{status} '{view_name}', Completed Batches: {loaded_batches}/{total_batches}")
1633        if finished_loading:
1634            total_finished_loading = len(
1635                [
1636                    s
1637                    for s, total in self.evaluation_batches.items()
1638                    if self.evaluation_batch_progress.get(s.snapshot_id, (None, -1))[1] == total
1639                ]
1640            )
1641            total = len(self.evaluation_batch_progress)
1642            print(f"Completed Loading {total_finished_loading}/{total} Models")

Update the snapshot evaluation progress.

def stop_evaluation_progress(self, success: bool = True) -> None:
1644    def stop_evaluation_progress(self, success: bool = True) -> None:
1645        self.evaluation_batch_progress = {}
1646        super().stop_evaluation_progress(success)
1647        print(f"Loading {'succeeded' if success else 'failed'}")

Stop the snapshot evaluation progress.

def start_creation_progress( self, total_tasks: int, environment_naming_info: sqlmesh.core.environment.EnvironmentNamingInfo, default_catalog: Union[str, NoneType]) -> None:
1649    def start_creation_progress(
1650        self,
1651        total_tasks: int,
1652        environment_naming_info: EnvironmentNamingInfo,
1653        default_catalog: t.Optional[str],
1654    ) -> None:
1655        """Indicates that a new creation progress has begun."""
1656        self.model_creation_status = (0, total_tasks)
1657        print("Starting Creating New Model Versions")

Indicates that a new creation progress has begun.

def update_creation_progress( self, snapshot: Union[sqlmesh.core.snapshot.definition.SnapshotTableInfo, sqlmesh.core.snapshot.definition.Snapshot]) -> None:
1659    def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None:
1660        """Update the snapshot creation progress."""
1661        num_creations, total_creations = self.model_creation_status
1662        num_creations += 1
1663        self.model_creation_status = (num_creations, total_creations)
1664        if num_creations % 5 == 0:
1665            print(f"Created New Model Versions: {num_creations}/{total_creations}")

Update the snapshot creation progress.

def stop_creation_progress(self, success: bool = True) -> None:
1667    def stop_creation_progress(self, success: bool = True) -> None:
1668        """Stop the snapshot creation progress."""
1669        self.model_creation_status = (0, 0)
1670        print(f"New Model Creation {'succeeded' if success else 'failed'}")

Stop the snapshot creation progress.

def start_promotion_progress( self, total_tasks: int, environment_naming_info: sqlmesh.core.environment.EnvironmentNamingInfo, default_catalog: Union[str, NoneType]) -> None:
1672    def start_promotion_progress(
1673        self,
1674        total_tasks: int,
1675        environment_naming_info: EnvironmentNamingInfo,
1676        default_catalog: t.Optional[str],
1677    ) -> None:
1678        """Indicates that a new snapshot promotion progress has begun."""
1679        self.promotion_status = (0, total_tasks)
1680        print(f"Virtually Updating '{environment_naming_info.name}'")

Indicates that a new snapshot promotion progress has begun.

def update_promotion_progress( self, snapshot: Union[sqlmesh.core.snapshot.definition.SnapshotTableInfo, sqlmesh.core.snapshot.definition.Snapshot], promoted: bool) -> None:
1682    def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
1683        """Update the snapshot promotion progress."""
1684        num_promotions, total_promotions = self.promotion_status
1685        num_promotions += 1
1686        self.promotion_status = (num_promotions, total_promotions)
1687        if num_promotions % 5 == 0:
1688            print(f"Virtually Updated {num_promotions}/{total_promotions}")

Update the snapshot promotion progress.

def stop_promotion_progress(self, success: bool = True) -> None:
1690    def stop_promotion_progress(self, success: bool = True) -> None:
1691        """Stop the snapshot promotion progress."""
1692        self.promotion_status = (0, 0)
1693        print(f"Virtual Update {'succeeded' if success else 'failed'}")

Stop the snapshot promotion progress.

def start_migration_progress(self, total_tasks: int) -> None:
1695    def start_migration_progress(self, total_tasks: int) -> None:
1696        """Indicates that a new migration progress has begun."""
1697        self.migration_status = (0, total_tasks)
1698        print("Starting Migration")

Indicates that a new migration progress has begun.

def update_migration_progress(self, num_tasks: int) -> None:
1700    def update_migration_progress(self, num_tasks: int) -> None:
1701        """Update the migration progress."""
1702        num_migrations, total_migrations = self.migration_status
1703        num_migrations += num_tasks
1704        self.migration_status = (num_migrations, total_migrations)
1705        if num_migrations % 5 == 0:
1706            print(f"Migration Updated {num_migrations}/{total_migrations}")

Update the migration progress.

def stop_migration_progress(self, success: bool = True) -> None:
1708    def stop_migration_progress(self, success: bool = True) -> None:
1709        """Stop the migration progress."""
1710        self.migration_status = (0, 0)
1711        print(f"Migration {'succeeded' if success else 'failed'}")

Stop the migration progress.

class DebuggerTerminalConsole(TerminalConsole):
1714class DebuggerTerminalConsole(TerminalConsole):
1715    """A terminal console to use while debugging with no fluff, progress bars, etc."""
1716
1717    def __init__(self, console: t.Optional[RichConsole], *args: t.Any, **kwargs: t.Any) -> None:
1718        self.console: RichConsole = console or srich.console
1719
1720    def _write(self, msg: t.Any, *args: t.Any, **kwargs: t.Any) -> None:
1721        self.console.log(msg, *args, **kwargs)
1722
1723    def start_plan_evaluation(self, plan: Plan) -> None:
1724        self._write("Starting plan", plan.plan_id)
1725
1726    def stop_plan_evaluation(self) -> None:
1727        self._write("Stopping plan")
1728
1729    def start_evaluation_progress(
1730        self,
1731        batches: t.Dict[Snapshot, int],
1732        environment_naming_info: EnvironmentNamingInfo,
1733        default_catalog: t.Optional[str],
1734    ) -> None:
1735        self._write(f"Starting evaluation for {len(batches)} snapshots")
1736
1737    def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
1738        self._write(f"Evaluating {snapshot.name}")
1739
1740    def update_snapshot_evaluation_progress(
1741        self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int]
1742    ) -> None:
1743        self._write(f"Evaluating {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms")
1744
1745    def stop_evaluation_progress(self, success: bool = True) -> None:
1746        self._write(f"Stopping evaluation with success={success}")
1747
1748    def start_creation_progress(
1749        self,
1750        total_tasks: int,
1751        environment_naming_info: EnvironmentNamingInfo,
1752        default_catalog: t.Optional[str],
1753    ) -> None:
1754        self._write(f"Starting creation for {total_tasks} snapshots")
1755
1756    def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None:
1757        self._write(f"Creating {snapshot.name}")
1758
1759    def stop_creation_progress(self, success: bool = True) -> None:
1760        self._write(f"Stopping creation with success={success}")
1761
1762    def update_cleanup_progress(self, object_name: str) -> None:
1763        self._write(f"Cleaning up {object_name}")
1764
1765    def start_promotion_progress(
1766        self,
1767        total_tasks: int,
1768        environment_naming_info: EnvironmentNamingInfo,
1769        default_catalog: t.Optional[str],
1770    ) -> None:
1771        self._write(f"Starting promotion for {total_tasks} snapshots")
1772
1773    def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
1774        self._write(f"Promoting {snapshot.name}")
1775
1776    def stop_promotion_progress(self, success: bool = True) -> None:
1777        self._write(f"Stopping promotion with success={success}")
1778
1779    def start_migration_progress(self, total_tasks: int) -> None:
1780        self._write(f"Starting migration for {total_tasks} snapshots")
1781
1782    def update_migration_progress(self, num_tasks: int) -> None:
1783        self._write(f"Migration {num_tasks}")
1784
1785    def stop_migration_progress(self, success: bool = True) -> None:
1786        self._write(f"Stopping migration with success={success}")
1787
1788    def show_model_difference_summary(
1789        self,
1790        context_diff: ContextDiff,
1791        environment_naming_info: EnvironmentNamingInfo,
1792        default_catalog: t.Optional[str],
1793        no_diff: bool = True,
1794        ignored_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None,
1795    ) -> None:
1796        self._write("Model Difference Summary:")
1797        for added in context_diff.new_snapshots:
1798            self._write(f"  Added: {added}")
1799        for removed in context_diff.removed_snapshots:
1800            self._write(f"  Removed: {removed}")
1801        for modified in context_diff.modified_snapshots:
1802            self._write(f"  Modified: {modified}")
1803
1804    def log_test_results(
1805        self, result: unittest.result.TestResult, output: str, target_dialect: str
1806    ) -> None:
1807        self._write("Test Results:", result)
1808
1809    def show_sql(self, sql: str) -> None:
1810        self._write(sql)
1811
1812    def log_status_update(self, message: str) -> None:
1813        self._write(message, style="bold blue")
1814
1815    def log_error(self, message: str) -> None:
1816        self._write(message, style="bold red")
1817
1818    def log_success(self, message: str) -> None:
1819        self._write(message, style="bold green")
1820
1821    def loading_start(self, message: t.Optional[str] = None) -> uuid.UUID:
1822        self._write(message)
1823        return uuid.uuid4()
1824
1825    def loading_stop(self, id: uuid.UUID) -> None:
1826        self._write("Done")
1827
1828    def show_schema_diff(self, schema_diff: SchemaDiff) -> None:
1829        self._write(schema_diff)
1830
1831    def show_row_diff(self, row_diff: RowDiff, show_sample: bool = True) -> None:
1832        self._write(row_diff)

A terminal console to use while debugging with no fluff, progress bars, etc.

DebuggerTerminalConsole( console: Union[rich.console.Console, NoneType], *args: Any, **kwargs: Any)
1717    def __init__(self, console: t.Optional[RichConsole], *args: t.Any, **kwargs: t.Any) -> None:
1718        self.console: RichConsole = console or srich.console
def start_plan_evaluation(self, plan: sqlmesh.core.plan.definition.Plan) -> None:
1723    def start_plan_evaluation(self, plan: Plan) -> None:
1724        self._write("Starting plan", plan.plan_id)

Indicates that a new evaluation has begun.

def stop_plan_evaluation(self) -> None:
1726    def stop_plan_evaluation(self) -> None:
1727        self._write("Stopping plan")

Indicates that the evaluation has ended.

def start_evaluation_progress( self, batches: Dict[sqlmesh.core.snapshot.definition.Snapshot, int], environment_naming_info: sqlmesh.core.environment.EnvironmentNamingInfo, default_catalog: Union[str, NoneType]) -> None:
1729    def start_evaluation_progress(
1730        self,
1731        batches: t.Dict[Snapshot, int],
1732        environment_naming_info: EnvironmentNamingInfo,
1733        default_catalog: t.Optional[str],
1734    ) -> None:
1735        self._write(f"Starting evaluation for {len(batches)} snapshots")

Indicates that a new snapshot evaluation progress has begun.

def start_snapshot_evaluation_progress(self, snapshot: sqlmesh.core.snapshot.definition.Snapshot) -> None:
1737    def start_snapshot_evaluation_progress(self, snapshot: Snapshot) -> None:
1738        self._write(f"Evaluating {snapshot.name}")

Starts the snapshot evaluation progress.

def update_snapshot_evaluation_progress( self, snapshot: sqlmesh.core.snapshot.definition.Snapshot, batch_idx: int, duration_ms: Union[int, NoneType]) -> None:
1740    def update_snapshot_evaluation_progress(
1741        self, snapshot: Snapshot, batch_idx: int, duration_ms: t.Optional[int]
1742    ) -> None:
1743        self._write(f"Evaluating {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms")

Update the snapshot evaluation progress.

def stop_evaluation_progress(self, success: bool = True) -> None:
1745    def stop_evaluation_progress(self, success: bool = True) -> None:
1746        self._write(f"Stopping evaluation with success={success}")

Stop the snapshot evaluation progress.

def start_creation_progress( self, total_tasks: int, environment_naming_info: sqlmesh.core.environment.EnvironmentNamingInfo, default_catalog: Union[str, NoneType]) -> None:
1748    def start_creation_progress(
1749        self,
1750        total_tasks: int,
1751        environment_naming_info: EnvironmentNamingInfo,
1752        default_catalog: t.Optional[str],
1753    ) -> None:
1754        self._write(f"Starting creation for {total_tasks} snapshots")

Indicates that a new creation progress has begun.

def update_creation_progress( self, snapshot: Union[sqlmesh.core.snapshot.definition.SnapshotTableInfo, sqlmesh.core.snapshot.definition.Snapshot]) -> None:
1756    def update_creation_progress(self, snapshot: SnapshotInfoLike) -> None:
1757        self._write(f"Creating {snapshot.name}")

Update the snapshot creation progress.

def stop_creation_progress(self, success: bool = True) -> None:
1759    def stop_creation_progress(self, success: bool = True) -> None:
1760        self._write(f"Stopping creation with success={success}")

Stop the snapshot creation progress.

def update_cleanup_progress(self, object_name: str) -> None:
1762    def update_cleanup_progress(self, object_name: str) -> None:
1763        self._write(f"Cleaning up {object_name}")

Update the snapshot cleanup progress.

def start_promotion_progress( self, total_tasks: int, environment_naming_info: sqlmesh.core.environment.EnvironmentNamingInfo, default_catalog: Union[str, NoneType]) -> None:
1765    def start_promotion_progress(
1766        self,
1767        total_tasks: int,
1768        environment_naming_info: EnvironmentNamingInfo,
1769        default_catalog: t.Optional[str],
1770    ) -> None:
1771        self._write(f"Starting promotion for {total_tasks} snapshots")

Indicates that a new snapshot promotion progress has begun.

def update_promotion_progress( self, snapshot: Union[sqlmesh.core.snapshot.definition.SnapshotTableInfo, sqlmesh.core.snapshot.definition.Snapshot], promoted: bool) -> None:
1773    def update_promotion_progress(self, snapshot: SnapshotInfoLike, promoted: bool) -> None:
1774        self._write(f"Promoting {snapshot.name}")

Update the snapshot promotion progress.

def stop_promotion_progress(self, success: bool = True) -> None:
1776    def stop_promotion_progress(self, success: bool = True) -> None:
1777        self._write(f"Stopping promotion with success={success}")

Stop the snapshot promotion progress.

def start_migration_progress(self, total_tasks: int) -> None:
1779    def start_migration_progress(self, total_tasks: int) -> None:
1780        self._write(f"Starting migration for {total_tasks} snapshots")

Indicates that a new migration progress has begun.

def update_migration_progress(self, num_tasks: int) -> None:
1782    def update_migration_progress(self, num_tasks: int) -> None:
1783        self._write(f"Migration {num_tasks}")

Update the migration progress.

def stop_migration_progress(self, success: bool = True) -> None:
1785    def stop_migration_progress(self, success: bool = True) -> None:
1786        self._write(f"Stopping migration with success={success}")

Stop the migration progress.

def show_model_difference_summary( self, context_diff: sqlmesh.core.context_diff.ContextDiff, environment_naming_info: sqlmesh.core.environment.EnvironmentNamingInfo, default_catalog: Union[str, NoneType], no_diff: bool = True, ignored_snapshot_ids: Union[Set[sqlmesh.core.snapshot.definition.SnapshotId], NoneType] = None) -> None:
1788    def show_model_difference_summary(
1789        self,
1790        context_diff: ContextDiff,
1791        environment_naming_info: EnvironmentNamingInfo,
1792        default_catalog: t.Optional[str],
1793        no_diff: bool = True,
1794        ignored_snapshot_ids: t.Optional[t.Set[SnapshotId]] = None,
1795    ) -> None:
1796        self._write("Model Difference Summary:")
1797        for added in context_diff.new_snapshots:
1798            self._write(f"  Added: {added}")
1799        for removed in context_diff.removed_snapshots:
1800            self._write(f"  Removed: {removed}")
1801        for modified in context_diff.modified_snapshots:
1802            self._write(f"  Modified: {modified}")

Shows a summary of the differences.

Arguments:
  • context_diff: The context diff to use to print the summary
  • environment_naming_info: The environment naming info to reference when printing model names
  • default_catalog: The default catalog to reference when deciding to remove catalog from display names
  • no_diff: Hide the actual SQL differences.
  • ignored_snapshot_ids: A set of snapshot ids that are ignored
def log_test_results( self, result: unittest.result.TestResult, output: str, target_dialect: str) -> None:
1804    def log_test_results(
1805        self, result: unittest.result.TestResult, output: str, target_dialect: str
1806    ) -> None:
1807        self._write("Test Results:", result)

Display the test result and output.

Arguments:
  • result: The unittest test result that contains metrics like num success, fails, ect.
  • output: The generated output from the unittest.
  • target_dialect: The dialect that tests were run against. Assumes all tests run against the same dialect.
def show_sql(self, sql: str) -> None:
1809    def show_sql(self, sql: str) -> None:
1810        self._write(sql)

Display to the user SQL.

def log_status_update(self, message: str) -> None:
1812    def log_status_update(self, message: str) -> None:
1813        self._write(message, style="bold blue")

Display general status update to the user.

def log_error(self, message: str) -> None:
1815    def log_error(self, message: str) -> None:
1816        self._write(message, style="bold red")

Display error info to the user.

def log_success(self, message: str) -> None:
1818    def log_success(self, message: str) -> None:
1819        self._write(message, style="bold green")

Display a general successful message to the user.

def loading_start(self, message: Union[str, NoneType] = None) -> uuid.UUID:
1821    def loading_start(self, message: t.Optional[str] = None) -> uuid.UUID:
1822        self._write(message)
1823        return uuid.uuid4()

Starts loading and returns a unique ID that can be used to stop the loading. Optionally can display a message.

def loading_stop(self, id: uuid.UUID) -> None:
1825    def loading_stop(self, id: uuid.UUID) -> None:
1826        self._write("Done")

Stop loading for the given id.

def show_schema_diff(self, schema_diff: sqlmesh.core.table_diff.SchemaDiff) -> None:
1828    def show_schema_diff(self, schema_diff: SchemaDiff) -> None:
1829        self._write(schema_diff)

Show table schema diff.

def show_row_diff( self, row_diff: sqlmesh.core.table_diff.RowDiff, show_sample: bool = True) -> None:
1831    def show_row_diff(self, row_diff: RowDiff, show_sample: bool = True) -> None:
1832        self._write(row_diff)

Show table summary diff.

Inherited Members
TerminalConsole
plan
def get_console( **kwargs: Any) -> 'TerminalConsole | DatabricksMagicConsole | NotebookMagicConsole':
1835def get_console(**kwargs: t.Any) -> TerminalConsole | DatabricksMagicConsole | NotebookMagicConsole:
1836    """
1837    Returns the console that is appropriate for the current runtime environment.
1838
1839    Note: Google Colab environment is untested and currently assumes is compatible with the base
1840    NotebookMagicConsole.
1841    """
1842    from sqlmesh import RuntimeEnv
1843
1844    runtime_env = RuntimeEnv.get()
1845
1846    runtime_env_mapping = {
1847        RuntimeEnv.DATABRICKS: DatabricksMagicConsole,
1848        RuntimeEnv.JUPYTER: NotebookMagicConsole,
1849        RuntimeEnv.TERMINAL: TerminalConsole,
1850        RuntimeEnv.GOOGLE_COLAB: NotebookMagicConsole,
1851        RuntimeEnv.DEBUGGER: DebuggerTerminalConsole,
1852    }
1853    rich_console_kwargs: t.Dict[str, t.Any] = {"theme": srich.theme}
1854    if runtime_env.is_jupyter or runtime_env.is_google_colab:
1855        rich_console_kwargs["force_jupyter"] = True
1856    return runtime_env_mapping[runtime_env](
1857        **{**{"console": RichConsole(**rich_console_kwargs)}, **kwargs}
1858    )

Returns the console that is appropriate for the current runtime environment.

Note: Google Colab environment is untested and currently assumes is compatible with the base NotebookMagicConsole.