Edit on GitHub

sqlmesh.integrations.github.cicd.controller

   1from __future__ import annotations
   2
   3import functools
   4import json
   5import logging
   6import os
   7import pathlib
   8import re
   9import traceback
  10import typing as t
  11from enum import Enum
  12from pathlib import Path
  13from dataclasses import dataclass
  14from functools import cached_property
  15
  16import requests
  17from sqlglot.helper import seq_get
  18
  19from sqlmesh.core import constants as c
  20from sqlmesh.core.console import SNAPSHOT_CHANGE_CATEGORY_STR, get_console, MarkdownConsole
  21from sqlmesh.core.context import Context
  22from sqlmesh.core.test.result import ModelTextTestResult
  23from sqlmesh.core.environment import Environment
  24from sqlmesh.core.plan import Plan, PlanBuilder, SnapshotIntervals
  25from sqlmesh.core.plan.definition import UserProvidedFlags
  26from sqlmesh.core.snapshot.definition import (
  27    Snapshot,
  28    SnapshotChangeCategory,
  29    SnapshotId,
  30    SnapshotTableInfo,
  31)
  32from sqlglot.errors import SqlglotError
  33from sqlmesh.core.user import User
  34from sqlmesh.core.config import Config
  35from sqlmesh.integrations.github.cicd.config import GithubCICDBotConfig
  36from sqlmesh.utils import word_characters_only, Verbosity
  37from sqlmesh.utils.date import now
  38from sqlmesh.utils.errors import (
  39    CICDBotError,
  40    NoChangesPlanError,
  41    PlanError,
  42    UncategorizedPlanError,
  43    LinterError,
  44    SQLMeshError,
  45)
  46from sqlmesh.utils.pydantic import PydanticModel
  47
  48if t.TYPE_CHECKING:
  49    from github import Github
  50    from github.CheckRun import CheckRun
  51    from github.Issue import Issue
  52    from github.IssueComment import IssueComment
  53    from github.PullRequest import PullRequest
  54    from github.PullRequestReview import PullRequestReview
  55    from github.Repository import Repository
  56
  57logger = logging.getLogger(__name__)
  58
  59
  60class TestFailure(Exception):
  61    pass
  62
  63
  64class PullRequestInfo(PydanticModel):
  65    """Contains information related to a pull request that can be used to construct other objects/URLs"""
  66
  67    owner: str
  68    repo: str
  69    pr_number: int
  70
  71    @property
  72    def full_repo_path(self) -> str:
  73        return "/".join([self.owner, self.repo])
  74
  75    @classmethod
  76    def create_from_pull_request_url(cls, pull_request_url: str) -> PullRequestInfo:
  77        owner, repo, _, pr_number = pull_request_url.split("/")[-4:]
  78        return cls(
  79            owner=owner,
  80            repo=repo,
  81            pr_number=int(pr_number),
  82        )
  83
  84
  85class GithubCheckStatus(str, Enum):
  86    QUEUED = "queued"
  87    IN_PROGRESS = "in_progress"
  88    COMPLETED = "completed"
  89
  90    @property
  91    def is_queued(self) -> bool:
  92        return self == GithubCheckStatus.QUEUED
  93
  94    @property
  95    def is_in_progress(self) -> bool:
  96        return self == GithubCheckStatus.IN_PROGRESS
  97
  98    @property
  99    def is_completed(self) -> bool:
 100        return self == GithubCheckStatus.COMPLETED
 101
 102
 103class GithubCheckConclusion(str, Enum):
 104    SUCCESS = "success"
 105    FAILURE = "failure"
 106    NEUTRAL = "neutral"
 107    CANCELLED = "cancelled"
 108    TIMED_OUT = "timed_out"
 109    ACTION_REQUIRED = "action_required"
 110    SKIPPED = "skipped"
 111
 112    @property
 113    def is_success(self) -> bool:
 114        return self == GithubCheckConclusion.SUCCESS
 115
 116    @property
 117    def is_failure(self) -> bool:
 118        return self == GithubCheckConclusion.FAILURE
 119
 120    @property
 121    def is_neutral(self) -> bool:
 122        return self == GithubCheckConclusion.NEUTRAL
 123
 124    @property
 125    def is_cancelled(self) -> bool:
 126        return self == GithubCheckConclusion.CANCELLED
 127
 128    @property
 129    def is_timed_out(self) -> bool:
 130        return self == GithubCheckConclusion.TIMED_OUT
 131
 132    @property
 133    def is_action_required(self) -> bool:
 134        return self == GithubCheckConclusion.ACTION_REQUIRED
 135
 136    @property
 137    def is_skipped(self) -> bool:
 138        return self == GithubCheckConclusion.SKIPPED
 139
 140
 141class MergeStateStatus(str, Enum):
 142    """
 143    https://docs.github.com/en/graphql/reference/enums#mergestatestatus
 144    """
 145
 146    BEHIND = "behind"
 147    BLOCKED = "blocked"
 148    CLEAN = "clean"
 149    DIRTY = "dirty"
 150    DRAFT = "draft"
 151    HAS_HOOKS = "has_hooks"
 152    UNKNOWN = "unknown"
 153    UNSTABLE = "unstable"
 154
 155    @property
 156    def is_behind(self) -> bool:
 157        return self == MergeStateStatus.BEHIND
 158
 159    @property
 160    def is_blocked(self) -> bool:
 161        return self == MergeStateStatus.BLOCKED
 162
 163    @property
 164    def is_clean(self) -> bool:
 165        return self == MergeStateStatus.CLEAN
 166
 167    @property
 168    def is_dirty(self) -> bool:
 169        return self == MergeStateStatus.DIRTY
 170
 171    @property
 172    def is_draft(self) -> bool:
 173        return self == MergeStateStatus.DRAFT
 174
 175    @property
 176    def is_has_hooks(self) -> bool:
 177        return self == MergeStateStatus.HAS_HOOKS
 178
 179    @property
 180    def is_unknown(self) -> bool:
 181        return self == MergeStateStatus.UNKNOWN
 182
 183    @property
 184    def is_unstable(self) -> bool:
 185        return self == MergeStateStatus.UNSTABLE
 186
 187
 188class BotCommand(Enum):
 189    INVALID = 1
 190    DEPLOY_PROD = 2
 191
 192    @classmethod
 193    def from_comment_body(cls, body: str, namespace: t.Optional[str] = None) -> BotCommand:
 194        body = body.strip()
 195        namespace = namespace.strip() if namespace else ""
 196        input_to_command = {
 197            namespace + "/deploy": cls.DEPLOY_PROD,
 198        }
 199        return input_to_command.get(body, cls.INVALID)
 200
 201    @property
 202    def is_invalid(self) -> bool:
 203        return self == self.INVALID
 204
 205    @property
 206    def is_deploy_prod(self) -> bool:
 207        return self == self.DEPLOY_PROD
 208
 209
 210class GithubEvent:
 211    """
 212    Takes a Github Actions event payload and provides a simple interface to access
 213    """
 214
 215    def __init__(self, payload: t.Dict[str, t.Any]) -> None:
 216        self.payload = payload
 217        self._pull_request_info: t.Optional[PullRequestInfo] = None
 218
 219    @classmethod
 220    def from_obj(cls, obj: t.Dict[str, t.Any]) -> GithubEvent:
 221        return cls(payload=obj)
 222
 223    @classmethod
 224    def from_path(cls, path: t.Union[str, pathlib.Path]) -> GithubEvent:
 225        with open(pathlib.Path(path), "r", encoding="utf-8") as f:
 226            return cls.from_obj(json.load(f))
 227
 228    @classmethod
 229    def from_env(cls) -> GithubEvent:
 230        return cls.from_path(os.environ["GITHUB_EVENT_PATH"])
 231
 232    @property
 233    def is_review(self) -> bool:
 234        return bool(self.payload.get("review"))
 235
 236    @property
 237    def is_comment(self) -> bool:
 238        comment = self.payload.get("comment")
 239        if not comment:
 240            return False
 241        if not comment.get("body"):
 242            return False
 243        return True
 244
 245    @property
 246    def is_comment_added(self) -> bool:
 247        return self.is_comment and self.payload.get("action") != "deleted"
 248
 249    @property
 250    def is_pull_request(self) -> bool:
 251        return bool(self.payload.get("pull_request"))
 252
 253    @property
 254    def is_pull_request_closed(self) -> bool:
 255        return self.is_pull_request and self.payload.get("action") == "closed"
 256
 257    @property
 258    def pull_request_url(self) -> str:
 259        if self.is_review:
 260            return self.payload["review"]["pull_request_url"]
 261        if self.is_comment:
 262            return self.payload["issue"]["pull_request"]["url"]
 263        if self.is_pull_request:
 264            return self.payload["pull_request"]["_links"]["self"]["href"]
 265        raise CICDBotError("Unable to determine pull request url")
 266
 267    @property
 268    def pull_request_info(self) -> PullRequestInfo:
 269        if not self._pull_request_info:
 270            self._pull_request_info = PullRequestInfo.create_from_pull_request_url(
 271                self.pull_request_url
 272            )
 273        return self._pull_request_info
 274
 275    @property
 276    def pull_request_comment_body(self) -> t.Optional[str]:
 277        if self.is_comment_added:
 278            return self.payload["comment"]["body"]
 279        return None
 280
 281
 282class GithubController:
 283    BOT_HEADER_MSG = ":robot: **SQLMesh Bot Info** :robot:"
 284    MAX_BYTE_LENGTH = 65535
 285
 286    def __init__(
 287        self,
 288        paths: t.Union[Path, t.Iterable[Path]],
 289        token: str,
 290        config: t.Optional[t.Union[Config, str]] = None,
 291        event: t.Optional[GithubEvent] = None,
 292        client: t.Optional[Github] = None,
 293        context: t.Optional[Context] = None,
 294    ) -> None:
 295        from github import Github
 296
 297        logger.debug(f"Initializing GithubController with paths: {paths} and config: {config}")
 298
 299        self.config = config
 300        self._paths = paths
 301        self._token = token
 302        self._event = event or GithubEvent.from_env()
 303        logger.debug(f"Github event: {json.dumps(self._event.payload)}")
 304        self._pr_plan_builder: t.Optional[PlanBuilder] = None
 305        self._prod_plan_builder: t.Optional[PlanBuilder] = None
 306        self._prod_plan_with_gaps_builder: t.Optional[PlanBuilder] = None
 307        self._check_run_mapping: t.Dict[str, CheckRun] = {}
 308
 309        if not isinstance(get_console(), MarkdownConsole):
 310            raise CICDBotError("Console must be a markdown console.")
 311        self._console = t.cast(MarkdownConsole, get_console())
 312
 313        from github.Consts import DEFAULT_BASE_URL
 314        from github.Auth import Token
 315
 316        self._client: Github = client or Github(
 317            base_url=os.environ.get("GITHUB_API_URL", DEFAULT_BASE_URL), auth=Token(self._token)
 318        )
 319
 320        self._repo: Repository = self._client.get_repo(
 321            self._event.pull_request_info.full_repo_path, lazy=True
 322        )
 323        self._pull_request: PullRequest = self._repo.get_pull(
 324            self._event.pull_request_info.pr_number
 325        )
 326        self._issue: Issue = self._repo.get_issue(self._event.pull_request_info.pr_number)
 327        self._reviews: t.Iterable[PullRequestReview] = self._pull_request.get_reviews()
 328        # TODO: The python module says that user names can be None and this is not currently handled
 329        self._approvers: t.Set[str] = {
 330            review.user.login or "UNKNOWN"
 331            for review in self._reviews
 332            if review.state.lower() == "approved"
 333        }
 334        logger.debug(f"Approvers: {', '.join(self._approvers)}")
 335        self._context: Context = context or Context(paths=self._paths, config=self.config)
 336
 337        # Bot config needs the context to be initialized
 338        logger.debug(f"Bot config: {self.bot_config.json(indent=2)}")
 339
 340    @property
 341    def deploy_command_enabled(self) -> bool:
 342        return self.bot_config.enable_deploy_command
 343
 344    @property
 345    def is_comment_added(self) -> bool:
 346        return self._event.is_comment_added
 347
 348    @property
 349    def _required_approvers(self) -> t.List[User]:
 350        required_approvers = [
 351            user
 352            for user in self._context.users
 353            if user.is_required_approver and user.github_username
 354        ]
 355        logger.debug(
 356            f"Required approvers: {', '.join(user.github_username for user in required_approvers if user.github_username)}"
 357        )
 358        return required_approvers
 359
 360    @property
 361    def _required_approvers_with_approval(self) -> t.List[User]:
 362        return [
 363            user for user in self._required_approvers if user.github_username in self._approvers
 364        ]
 365
 366    @property
 367    def pr_environment_name(self) -> str:
 368        return Environment.sanitize_name(
 369            "_".join(
 370                [
 371                    self.bot_config.pr_environment_name or self._event.pull_request_info.repo,
 372                    str(self._event.pull_request_info.pr_number),
 373                ]
 374            )
 375        )
 376
 377    @property
 378    def do_required_approval_check(self) -> bool:
 379        """We want to skip required approval check if no users have this role"""
 380        do_required_approval_check = bool(self._required_approvers)
 381        logger.debug(f"Do required approval check: {do_required_approval_check}")
 382        return do_required_approval_check
 383
 384    @property
 385    def has_required_approval(self) -> bool:
 386        """
 387        Check if the PR has a required approver.
 388
 389        TODO: Allow defining requiring some number, or all, required approvers.
 390        """
 391        if not self._required_approvers or self._required_approvers_with_approval:
 392            logger.debug("Has required Approval")
 393            return True
 394        logger.debug("Does not have required approval")
 395        return False
 396
 397    @property
 398    def pr_plan(self) -> Plan:
 399        if not self._pr_plan_builder:
 400            self._pr_plan_builder = self._context.plan_builder(
 401                environment=self.pr_environment_name,
 402                skip_tests=True,
 403                skip_linter=True,
 404                categorizer_config=self.bot_config.auto_categorize_changes,
 405                start=self.bot_config.default_pr_start,
 406                min_intervals=self.bot_config.pr_min_intervals,
 407                skip_backfill=self.bot_config.skip_pr_backfill,
 408                include_unmodified=self.bot_config.pr_include_unmodified,
 409                forward_only=self.forward_only_plan,
 410            )
 411        assert self._pr_plan_builder
 412        return self._pr_plan_builder.build()
 413
 414    @property
 415    def pr_plan_or_none(self) -> t.Optional[Plan]:
 416        try:
 417            return self.pr_plan
 418        except:
 419            return None
 420
 421    @property
 422    def pr_plan_flags(self) -> t.Optional[t.Dict[str, UserProvidedFlags]]:
 423        if pr_plan := self.pr_plan_or_none:
 424            return pr_plan.user_provided_flags
 425        if pr_plan_builder := self._pr_plan_builder:
 426            return pr_plan_builder._user_provided_flags
 427        return None
 428
 429    @property
 430    def prod_plan(self) -> Plan:
 431        if not self._prod_plan_builder:
 432            self._prod_plan_builder = self._context.plan_builder(
 433                c.PROD,
 434                no_gaps=True,
 435                skip_tests=True,
 436                skip_linter=True,
 437                categorizer_config=self.bot_config.auto_categorize_changes,
 438                run=self.bot_config.run_on_deploy_to_prod,
 439                forward_only=self.forward_only_plan,
 440            )
 441        assert self._prod_plan_builder
 442        return self._prod_plan_builder.build()
 443
 444    @property
 445    def prod_plan_with_gaps(self) -> Plan:
 446        if not self._prod_plan_with_gaps_builder:
 447            self._prod_plan_with_gaps_builder = self._context.plan_builder(
 448                c.PROD,
 449                # this is required to highlight any data gaps between this PR environment and prod (since PR environments may only contain a subset of data)
 450                no_gaps=False,
 451                skip_tests=True,
 452                skip_linter=True,
 453                categorizer_config=self.bot_config.auto_categorize_changes,
 454                run=self.bot_config.run_on_deploy_to_prod,
 455                forward_only=self.forward_only_plan,
 456            )
 457        assert self._prod_plan_with_gaps_builder
 458        return self._prod_plan_with_gaps_builder.build()
 459
 460    @property
 461    def bot_config(self) -> GithubCICDBotConfig:
 462        bot_config = self._context.config.cicd_bot or GithubCICDBotConfig(
 463            auto_categorize_changes=self._context.auto_categorize_changes
 464        )
 465        return bot_config
 466
 467    @property
 468    def modified_snapshots(self) -> t.Dict[SnapshotId, t.Union[Snapshot, SnapshotTableInfo]]:
 469        return self.prod_plan_with_gaps.modified_snapshots
 470
 471    @property
 472    def removed_snapshots(self) -> t.Set[SnapshotId]:
 473        return set(self.prod_plan_with_gaps.context_diff.removed_snapshots)
 474
 475    @property
 476    def pr_targets_prod_branch(self) -> bool:
 477        return self._pull_request.base.ref in self.bot_config.prod_branch_names
 478
 479    @property
 480    def forward_only_plan(self) -> bool:
 481        default = self._context.config.plan.forward_only
 482        head_ref = self._pull_request.head.ref
 483        if isinstance(head_ref, str):
 484            return head_ref.endswith(self.bot_config.forward_only_branch_suffix) or default
 485        return default
 486
 487    @classmethod
 488    def _append_output(cls, key: str, value: str) -> None:
 489        """
 490        Appends the given key/value to output so they can be read by following steps
 491        """
 492        logger.debug(f"Setting output. Key: {key}, Value: {value}")
 493
 494        # GitHub Actions sets this environment variable
 495        if output_file := os.environ.get("GITHUB_OUTPUT"):
 496            with open(output_file, "a", encoding="utf-8") as fh:
 497                print(f"{key}={value}", file=fh)
 498
 499    def get_forward_only_plan_post_deployment_tip(self, plan: Plan) -> str:
 500        if not plan.forward_only:
 501            return ""
 502
 503        example_model_name = "<model name>"
 504        for snapshot_id in sorted(plan.snapshots):
 505            snapshot = plan.snapshots[snapshot_id]
 506            if snapshot.is_incremental:
 507                example_model_name = snapshot.node.name
 508                break
 509
 510        return (
 511            "> [!TIP]\n"
 512            "> In order to see this forward-only plan retroactively apply to historical intervals on the production model, run the below for date ranges in scope:\n"
 513            "> \n"
 514            f"> `$ sqlmesh plan --restate-model {example_model_name} --start YYYY-MM-DD --end YYYY-MM-DD`\n"
 515            ">\n"
 516            "> Learn more: https://sqlmesh.readthedocs.io/en/stable/concepts/plans/?h=restate#restatement-plans"
 517        )
 518
 519    def get_plan_summary(self, plan: Plan) -> str:
 520        # use Verbosity.VERY_VERBOSE to prevent the list of models from being truncated
 521        # this is particularly important for the "Models needing backfill" list because
 522        # there is no easy way to tell this otherwise
 523        orig_verbosity = self._console.verbosity
 524        self._console.verbosity = Verbosity.VERY_VERBOSE
 525
 526        try:
 527            # Clear out any output that might exist from prior steps
 528            self._console.consume_captured_output()
 529            if plan.restatements:
 530                self._console._print("\n**Restating models**\n")
 531            else:
 532                self._console.show_environment_difference_summary(
 533                    context_diff=plan.context_diff,
 534                    no_diff=False,
 535                )
 536            if plan.context_diff.has_changes:
 537                self._console.show_model_difference_summary(
 538                    context_diff=plan.context_diff,
 539                    environment_naming_info=plan.environment_naming_info,
 540                    default_catalog=self._context.default_catalog,
 541                    no_diff=False,
 542                )
 543            difference_summary = self._console.consume_captured_output()
 544            self._console._show_missing_dates(plan, self._context.default_catalog)
 545            missing_dates = self._console.consume_captured_output()
 546
 547            plan_flags_section = (
 548                f"\n\n{self._generate_plan_flags_section(plan.user_provided_flags)}"
 549                if plan.user_provided_flags
 550                else ""
 551            )
 552
 553            if not difference_summary and not missing_dates:
 554                return f"No changes to apply.{plan_flags_section}"
 555
 556            warnings_block = self._console.consume_captured_warnings()
 557            errors_block = self._console.consume_captured_errors()
 558
 559            return f"{warnings_block}{errors_block}{difference_summary}\n{missing_dates}{plan_flags_section}"
 560        except PlanError as e:
 561            logger.exception("Plan failed to generate")
 562            return f"Plan failed to generate. Check for pending or unresolved changes. Error: {e}"
 563        finally:
 564            self._console.verbosity = orig_verbosity
 565
 566    def get_pr_environment_summary(
 567        self, conclusion: GithubCheckConclusion, exception: t.Optional[Exception] = None
 568    ) -> str:
 569        heading = ""
 570        summary = ""
 571
 572        if conclusion.is_success:
 573            summary = self._get_pr_environment_summary_success()
 574        elif conclusion.is_action_required:
 575            heading = f":warning: Action Required to create or update PR Environment `{self.pr_environment_name}` :warning:"
 576            summary = self._get_pr_environment_summary_action_required(exception)
 577        elif conclusion.is_failure:
 578            heading = (
 579                f":x: Failed to create or update PR Environment `{self.pr_environment_name}` :x:"
 580            )
 581            summary = self._get_pr_environment_summary_failure(exception)
 582        elif conclusion.is_skipped:
 583            heading = f":next_track_button: Skipped creating or updating PR Environment `{self.pr_environment_name}` :next_track_button:"
 584            summary = self._get_pr_environment_summary_skipped(exception)
 585        else:
 586            heading = f":interrobang: Got an unexpected conclusion: {conclusion.value}"
 587
 588        # note: we just add warnings here, errors will be covered by the "failure" conclusion
 589        if warnings := self._console.consume_captured_warnings():
 590            summary = f"{warnings}\n{summary}"
 591
 592        return f"{heading}\n\n{summary}".strip()
 593
 594    def _get_pr_environment_summary_success(self) -> str:
 595        prod_plan = self.prod_plan_with_gaps
 596
 597        if not prod_plan.has_changes:
 598            summary = "No models were modified in this PR.\n"
 599        else:
 600            intro = self._generate_pr_environment_summary_intro()
 601            summary = intro + self._generate_pr_environment_summary_list(prod_plan)
 602
 603        if prod_plan.user_provided_flags:
 604            summary += self._generate_plan_flags_section(prod_plan.user_provided_flags)
 605
 606        return summary
 607
 608    def _get_pr_environment_summary_skipped(self, exception: t.Optional[Exception] = None) -> str:
 609        if isinstance(exception, NoChangesPlanError):
 610            skip_reason = "No changes were detected compared to the prod environment."
 611        elif isinstance(exception, TestFailure):
 612            skip_reason = "Unit Test(s) Failed so skipping PR creation"
 613        else:
 614            skip_reason = "A prior stage failed resulting in skipping PR creation."
 615
 616        return skip_reason
 617
 618    def _get_pr_environment_summary_action_required(
 619        self, exception: t.Optional[Exception] = None
 620    ) -> str:
 621        plan = self.pr_plan_or_none
 622        if isinstance(exception, UncategorizedPlanError) and plan:
 623            failure_msg = f"The following models could not be categorized automatically:\n"
 624            for snapshot in plan.uncategorized:
 625                failure_msg += f"- {snapshot.name}\n"
 626            failure_msg += (
 627                f"\nRun `sqlmesh plan {self.pr_environment_name}` locally to apply these changes.\n\n"
 628                "If you would like the bot to automatically categorize changes, check the [documentation](https://sqlmesh.readthedocs.io/en/stable/integrations/github/) for more information."
 629            )
 630        else:
 631            failure_msg = "Please check the Actions Workflow logs for more information."
 632
 633        return failure_msg
 634
 635    def _get_pr_environment_summary_failure(self, exception: t.Optional[Exception] = None) -> str:
 636        console_output = self._console.consume_captured_output()
 637        failure_msg = ""
 638
 639        if isinstance(exception, PlanError):
 640            if exception.args and (msg := exception.args[0]) and isinstance(msg, str):
 641                failure_msg += f"*{msg}*\n"
 642            if console_output:
 643                failure_msg += f"\n{console_output}"
 644        elif isinstance(exception, (SQLMeshError, SqlglotError, ValueError)):
 645            # this logic is taken from the global error handler attached to the CLI, which uses `click.echo()` to output the message
 646            # so cant be re-used here because it bypasses the Console
 647            failure_msg = f"**Error:** {str(exception)}"
 648        elif exception:
 649            logger.debug(
 650                "Got unexpected error. Error Type: "
 651                + str(type(exception))
 652                + " Stack trace: "
 653                + traceback.format_exc()
 654            )
 655            failure_msg = f"This is an unexpected error.\n\n**Exception:**\n```\n{traceback.format_exc()}\n```"
 656
 657        if captured_errors := self._console.consume_captured_errors():
 658            failure_msg = f"{captured_errors}\n{failure_msg}"
 659
 660        if plan_flags := self.pr_plan_flags:
 661            failure_msg += f"\n\n{self._generate_plan_flags_section(plan_flags)}"
 662
 663        return failure_msg
 664
 665    def run_tests(self) -> t.Tuple[ModelTextTestResult, str]:
 666        """
 667        Run tests for the PR
 668        """
 669        return self._context._run_tests(verbosity=Verbosity.VERBOSE)
 670
 671    def run_linter(self) -> None:
 672        """
 673        Run linter for the PR
 674        """
 675        self._console.consume_captured_output()
 676        self._context.lint_models()
 677
 678    def _get_or_create_comment(self, header: str = BOT_HEADER_MSG) -> IssueComment:
 679        comment = seq_get(
 680            [comment for comment in self._issue.get_comments() if header in comment.body],
 681            0,
 682        )
 683        if not comment:
 684            logger.debug(f"Did not find comment so creating one with header: {header}")
 685            return self._issue.create_comment(header)
 686        logger.debug(f"Found comment with header: {header}")
 687        return comment
 688
 689    def _get_merge_state_status(self) -> MergeStateStatus:
 690        """
 691        This feature is currently in preview and therefore not available in the python module.
 692        So we query GraphQL directly instead.
 693        """
 694        headers = {
 695            "Authorization": f"Bearer {self._token}",
 696            "Accept": "application/vnd.github.merge-info-preview+json",
 697        }
 698        query = f"""{{
 699            repository(owner: "{self._event.pull_request_info.owner}", name: "{self._event.pull_request_info.repo}") {{
 700                pullRequest(number: {self._event.pull_request_info.pr_number}) {{
 701                    title
 702                    state
 703                    mergeStateStatus
 704                }}
 705            }}
 706        }}"""
 707        request = requests.post(
 708            os.environ["GITHUB_GRAPHQL_URL"],
 709            json={"query": query},
 710            headers=headers,
 711        )
 712        if request.status_code == 200:
 713            merge_status = MergeStateStatus(
 714                request.json()["data"]["repository"]["pullRequest"]["mergeStateStatus"].lower()
 715            )
 716            logger.debug(f"Merge state status: {merge_status.value}")
 717            return merge_status
 718        raise CICDBotError(f"Unable to get merge state status. Error: {request.text}")
 719
 720    def update_sqlmesh_comment_info(
 721        self, value: str, *, dedup_regex: t.Optional[str]
 722    ) -> t.Tuple[bool, IssueComment]:
 723        """
 724        Update the SQLMesh PR Comment for the given lookup key with the given value. If a comment does not exist then
 725        it creates one. It determines the comment to update by looking for a comment with the header. If a dedup
 726        regex is provided then it will check if the value already exists in the comment and if so it will not update
 727        """
 728        comment = self._get_or_create_comment()
 729        if dedup_regex:
 730            # If we find a match against the regex then we just return since the comment has already been posted
 731            if seq_get(re.findall(dedup_regex, comment.body), 0):
 732                return False, comment
 733        full_comment = f"{comment.body}\n{value}"
 734        body, *truncated = self._chunk_up_api_message(f"{full_comment}")
 735        if truncated:
 736            logger.warning(
 737                f"Comment body was too long so we truncated it. Full text: {full_comment}"
 738            )
 739        comment.edit(body=body)
 740        return True, comment
 741
 742    def update_pr_environment(self) -> None:
 743        """
 744        Creates a PR environment from the logic present in the PR. If the PR contains changes that are
 745        uncategorized, then an error will be raised.
 746        """
 747        self._console.consume_captured_output()  # clear output buffer
 748        self._context.apply(self.pr_plan)  # will raise if PR environment creation fails
 749
 750        # update PR info comment
 751        vde_title = "- :eyes: To **review** this PR's changes, use virtual data environment:"
 752        comment_value = f"{vde_title}\n  - `{self.pr_environment_name}`"
 753        if self.bot_config.enable_deploy_command:
 754            full_command = f"{self.bot_config.command_namespace or ''}/deploy"
 755            comment_value += f"\n- :arrow_forward: To **apply** this PR's plan to prod, comment:\n  - `{full_command}`"
 756        dedup_regex = vde_title.replace("*", r"\*") + r".*"
 757        updated_comment, _ = self.update_sqlmesh_comment_info(
 758            value=comment_value,
 759            dedup_regex=dedup_regex,
 760        )
 761        if updated_comment:
 762            self._append_output("created_pr_environment", "true")
 763
 764    def deploy_to_prod(self) -> None:
 765        """
 766        Attempts to deploy a plan to prod. If the plan is not up-to-date or has gaps then it will raise.
 767        """
 768        # If the PR is already merged then we will not deploy to prod if this event was triggered prior to the merge.
 769        # The deploy can still happen if the workflow is configured to listen for `closed` events.
 770        if self._pull_request.merged and not self._event.is_pull_request_closed:
 771            raise CICDBotError(
 772                "PR is already merged and this event was triggered prior to the merge."
 773            )
 774        merge_status = self._get_merge_state_status()
 775        if self.bot_config.check_if_blocked_on_deploy_to_prod and merge_status.is_blocked:
 776            raise CICDBotError(
 777                "Branch protection or ruleset requirement is likely not satisfied, e.g. missing CODEOWNERS approval. "
 778                "Please check PR and resolve any issues. To disable this check, set `check_if_blocked_on_deploy_to_prod` to false in the bot configuration."
 779            )
 780        if merge_status.is_dirty:
 781            raise CICDBotError(
 782                "Merge commit cannot be cleanly created. Likely from a merge conflict. "
 783                "Please check PR and resolve any issues."
 784            )
 785        plan_summary = f"""<details>
 786  <summary>:ship: Prod Plan Being Applied</summary>
 787
 788{self.get_plan_summary(self.prod_plan)}
 789</details>
 790
 791"""
 792        if self.forward_only_plan:
 793            plan_summary = (
 794                f"{self.get_forward_only_plan_post_deployment_tip(self.prod_plan)}\n{plan_summary}"
 795            )
 796
 797        self.update_sqlmesh_comment_info(
 798            value=plan_summary,
 799            dedup_regex=None,
 800        )
 801        self._context.apply(self.prod_plan)
 802
 803    def try_invalidate_pr_environment(self) -> None:
 804        """
 805        Marks the PR environment for garbage collection.
 806        """
 807        if self.bot_config.invalidate_environment_after_deploy:
 808            self._context.invalidate_environment(self.pr_environment_name)
 809
 810    def _update_check(
 811        self,
 812        name: str,
 813        status: GithubCheckStatus,
 814        title: str,
 815        conclusion: t.Optional[GithubCheckConclusion] = None,
 816        full_summary: t.Optional[str] = None,
 817    ) -> None:
 818        """
 819        Updates the status of the merge commit.
 820        """
 821        current_time = now()
 822        kwargs: t.Dict[str, t.Any] = {
 823            "name": name,
 824            # Note: The environment variable `GITHUB_SHA` would be the merge commit so that is why instead we
 825            # get the last commit on the PR.
 826            "head_sha": self._pull_request.head.sha,
 827            "status": status.value,
 828        }
 829        if status.is_in_progress:
 830            kwargs["started_at"] = current_time
 831        if status.is_completed:
 832            kwargs["completed_at"] = current_time
 833        if conclusion:
 834            kwargs["conclusion"] = conclusion.value
 835        full_summary = full_summary or title
 836        summary, text, *truncated = self._chunk_up_api_message(full_summary) + [None]
 837        if truncated and truncated[0] is not None:
 838            logger.warning(f"Summary was too long so we truncated it. Full text: {full_summary}")
 839        kwargs["output"] = {"title": title, "summary": summary}
 840        if text:
 841            kwargs["output"]["text"] = text
 842        logger.debug(f"Updating check with kwargs: {kwargs}")
 843
 844        if self.running_in_github_actions:
 845            # Only make the API call to update the checks if we are running within GitHub Actions
 846            # One very annoying limitation of the Pull Request Checks API is that its only available to GitHub Apps
 847            # and not personal access tokens, which makes it unable to be utilized during local development
 848            if name in self._check_run_mapping:
 849                logger.debug(f"Found check run in mapping so updating it. Name: {name}")
 850                check_run = self._check_run_mapping[name]
 851                check_run.edit(
 852                    **{
 853                        k: v
 854                        for k, v in kwargs.items()
 855                        if k not in ("name", "head_sha", "started_at")
 856                    }
 857                )
 858            else:
 859                logger.debug(f"Did not find check run in mapping so creating it. Name: {name}")
 860                self._check_run_mapping[name] = self._repo.create_check_run(**kwargs)
 861        else:
 862            # Output the summary using print() so the newlines are resolved and the result can easily
 863            # be disambiguated from the rest of the console output and copy+pasted into a Markdown renderer
 864            print(
 865                f"---CHECK OUTPUT START: {kwargs['output']['title']} ---\n{kwargs['output']['summary']}\n---CHECK OUTPUT END---\n"
 866            )
 867
 868        if conclusion:
 869            self._append_output(
 870                word_characters_only(name.replace("SQLMesh - ", "").lower()), conclusion.value
 871            )
 872
 873    def _update_check_handler(
 874        self,
 875        check_name: str,
 876        status: GithubCheckStatus,
 877        conclusion: t.Optional[GithubCheckConclusion],
 878        status_handler: t.Callable[[GithubCheckStatus], t.Tuple[str, t.Optional[str]]],
 879        conclusion_handler: t.Callable[
 880            [GithubCheckConclusion], t.Tuple[GithubCheckConclusion, str, t.Optional[str]]
 881        ],
 882    ) -> None:
 883        if conclusion:
 884            conclusion, title, summary = conclusion_handler(conclusion)
 885        else:
 886            title, summary = status_handler(status)
 887        self._update_check(
 888            name=check_name,
 889            status=status,
 890            title=title,
 891            conclusion=conclusion,
 892            full_summary=summary,
 893        )
 894
 895    def update_linter_check(
 896        self,
 897        status: GithubCheckStatus,
 898        conclusion: t.Optional[GithubCheckConclusion] = None,
 899    ) -> None:
 900        if not self._context.config.linter.enabled:
 901            return
 902
 903        def conclusion_handler(
 904            conclusion: GithubCheckConclusion,
 905        ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]:
 906            linter_summary = self._console.consume_captured_output() or "Linter Success"
 907
 908            title = "Linter results"
 909
 910            return conclusion, title, linter_summary
 911
 912        self._update_check_handler(
 913            check_name="SQLMesh - Linter",
 914            status=status,
 915            conclusion=conclusion,
 916            status_handler=lambda status: (
 917                {
 918                    GithubCheckStatus.IN_PROGRESS: "Running linter",
 919                    GithubCheckStatus.QUEUED: "Waiting to Run linter",
 920                }[status],
 921                None,
 922            ),
 923            conclusion_handler=conclusion_handler,
 924        )
 925
 926    def update_test_check(
 927        self,
 928        status: GithubCheckStatus,
 929        conclusion: t.Optional[GithubCheckConclusion] = None,
 930        result: t.Optional[ModelTextTestResult] = None,
 931        traceback: t.Optional[str] = None,
 932    ) -> None:
 933        """
 934        Updates the status of tests for code in the PR
 935        """
 936
 937        def conclusion_handler(
 938            conclusion: GithubCheckConclusion,
 939            result: t.Optional[ModelTextTestResult],
 940        ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]:
 941            if result:
 942                # Clear out console
 943                self._console.consume_captured_output()
 944                self._console.log_test_results(
 945                    result,
 946                    self._context.test_connection_config._engine_adapter.DIALECT,
 947                )
 948                test_summary = self._console.consume_captured_output()
 949                test_title = "Tests Passed" if result.wasSuccessful() else "Tests Failed"
 950                test_conclusion = (
 951                    GithubCheckConclusion.SUCCESS
 952                    if result.wasSuccessful()
 953                    else GithubCheckConclusion.FAILURE
 954                )
 955                return test_conclusion, test_title, test_summary
 956            if traceback:
 957                self._console._print(traceback)
 958
 959            test_title = "Skipped Tests" if conclusion.is_skipped else "Tests Failed"
 960            return conclusion, test_title, traceback
 961
 962        self._update_check_handler(
 963            check_name="SQLMesh - Run Unit Tests",
 964            status=status,
 965            conclusion=conclusion,
 966            status_handler=lambda status: (
 967                {
 968                    GithubCheckStatus.IN_PROGRESS: "Running Tests",
 969                    GithubCheckStatus.QUEUED: "Waiting to Run Tests",
 970                }[status],
 971                None,
 972            ),
 973            conclusion_handler=functools.partial(conclusion_handler, result=result),
 974        )
 975
 976    def update_required_approval_check(
 977        self, status: GithubCheckStatus, conclusion: t.Optional[GithubCheckConclusion] = None
 978    ) -> None:
 979        """
 980        Updates the status of the merge commit for the required approval.
 981        """
 982
 983        def conclusion_handler(
 984            conclusion: GithubCheckConclusion,
 985        ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]:
 986            test_summary = "**List of possible required approvers:**\n"
 987            for user in self._required_approvers:
 988                test_summary += f"- `{user.github_username or user.username}`\n"
 989
 990            title = (
 991                f"Obtained approval from required approvers: {', '.join([user.github_username or user.username for user in self._required_approvers_with_approval])}"
 992                if conclusion.is_success
 993                else "Need a Required Approval"
 994            )
 995            return conclusion, title, test_summary
 996
 997        # If we get a skip that means required approvers is not configured therefore it does not need to be displayed
 998        if conclusion and conclusion.is_skipped:
 999            return
1000
1001        self._update_check_handler(
1002            check_name="SQLMesh - Has Required Approval",
1003            status=status,
1004            conclusion=conclusion,
1005            status_handler=lambda status: (
1006                {
1007                    GithubCheckStatus.IN_PROGRESS: "Checking if we have required Approvers",
1008                    GithubCheckStatus.QUEUED: "Waiting to Check if we have required Approvers",
1009                }[status],
1010                None,
1011            ),
1012            conclusion_handler=conclusion_handler,
1013        )
1014
1015    def update_pr_environment_check(
1016        self, status: GithubCheckStatus, exception: t.Optional[Exception] = None
1017    ) -> t.Optional[GithubCheckConclusion]:
1018        """
1019        Updates the status of the merge commit for the PR environment.
1020        """
1021        conclusion: t.Optional[GithubCheckConclusion] = None
1022        if isinstance(exception, (NoChangesPlanError, TestFailure, LinterError)):
1023            conclusion = GithubCheckConclusion.SKIPPED
1024        elif isinstance(exception, UncategorizedPlanError):
1025            conclusion = GithubCheckConclusion.ACTION_REQUIRED
1026        elif exception:
1027            conclusion = GithubCheckConclusion.FAILURE
1028        elif status.is_completed:
1029            conclusion = GithubCheckConclusion.SUCCESS
1030
1031        check_title_static = "PR Virtual Data Environment: "
1032        check_title = check_title_static + self.pr_environment_name
1033
1034        def conclusion_handler(
1035            conclusion: GithubCheckConclusion, exception: t.Optional[Exception]
1036        ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]:
1037            summary = self.get_pr_environment_summary(conclusion, exception)
1038            self._append_output("pr_environment_name", self.pr_environment_name)
1039            return conclusion, check_title, summary
1040
1041        self._update_check_handler(
1042            check_name="SQLMesh - PR Environment Synced",
1043            status=status,
1044            conclusion=conclusion,
1045            status_handler=lambda status: (
1046                check_title,
1047                {
1048                    GithubCheckStatus.QUEUED: f":pause_button: Waiting to create or update PR Environment `{self.pr_environment_name}`",
1049                    GithubCheckStatus.IN_PROGRESS: f":rocket: Creating or Updating PR Environment `{self.pr_environment_name}`",
1050                }[status],
1051            ),
1052            conclusion_handler=functools.partial(conclusion_handler, exception=exception),
1053        )
1054        return conclusion
1055
1056    def update_prod_plan_preview_check(
1057        self,
1058        status: GithubCheckStatus,
1059        conclusion: t.Optional[GithubCheckConclusion] = None,
1060        summary: t.Optional[str] = None,
1061    ) -> None:
1062        """
1063        Updates the status of the merge commit for the prod plan preview.
1064        """
1065
1066        def conclusion_handler(
1067            conclusion: GithubCheckConclusion, summary: t.Optional[str] = None
1068        ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]:
1069            conclusion_to_title = {
1070                GithubCheckConclusion.SUCCESS: "Prod Plan Preview",
1071                GithubCheckConclusion.CANCELLED: "Cancelled generating prod plan preview",
1072                GithubCheckConclusion.SKIPPED: "Skipped generating prod plan preview since PR was not synchronized",
1073                GithubCheckConclusion.FAILURE: "Failed to generate prod plan preview",
1074            }
1075            title = conclusion_to_title.get(
1076                conclusion, f"Got an unexpected conclusion: {conclusion.value}"
1077            )
1078            if conclusion == GithubCheckConclusion.SUCCESS and summary:
1079                summary = (
1080                    f"This is a preview that shows the differences between this PR environment `{self.pr_environment_name}` and `prod`.\n\n"
1081                    "These are the changes that would be deployed.\n\n"
1082                ) + summary
1083
1084            return conclusion, title, summary
1085
1086        self._update_check_handler(
1087            check_name="SQLMesh - Prod Plan Preview",
1088            status=status,
1089            conclusion=conclusion,
1090            status_handler=lambda status: (
1091                {
1092                    GithubCheckStatus.IN_PROGRESS: "Generating Prod Plan",
1093                    GithubCheckStatus.QUEUED: "Waiting to Generate Prod Plan",
1094                }[status],
1095                None,
1096            ),
1097            conclusion_handler=functools.partial(conclusion_handler, summary=summary),
1098        )
1099
1100    def update_prod_environment_check(
1101        self,
1102        status: GithubCheckStatus,
1103        conclusion: t.Optional[GithubCheckConclusion] = None,
1104        skip_reason: t.Optional[str] = None,
1105        plan_error: t.Optional[PlanError] = None,
1106    ) -> None:
1107        """
1108        Updates the status of the merge commit for the prod environment.
1109        """
1110
1111        def conclusion_handler(
1112            conclusion: GithubCheckConclusion, skip_reason: t.Optional[str] = None
1113        ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]:
1114            conclusion_to_title = {
1115                GithubCheckConclusion.SUCCESS: "Deployed to Prod",
1116                GithubCheckConclusion.CANCELLED: "Cancelled deploying to prod",
1117                GithubCheckConclusion.SKIPPED: "Skipped deployment",
1118                GithubCheckConclusion.FAILURE: "Failed to deploy to prod",
1119                GithubCheckConclusion.ACTION_REQUIRED: "Failed due to error applying plan",
1120            }
1121            title = (
1122                conclusion_to_title.get(conclusion)
1123                or f"Got an unexpected conclusion: {conclusion.value}"
1124            )
1125            if conclusion.is_skipped:
1126                summary = skip_reason
1127            elif conclusion.is_failure:
1128                captured_errors = self._console.consume_captured_errors()
1129                summary = (
1130                    captured_errors or f"{title}\n\n**Error:**\n```\n{traceback.format_exc()}\n```"
1131                )
1132            elif conclusion.is_action_required:
1133                if plan_error:
1134                    summary = f"**Plan error:**\n```\n{plan_error}\n```"
1135                else:
1136                    summary = "Got an action required conclusion but no plan error was provided. This is unexpected."
1137            else:
1138                summary = "**Generated Prod Plan**\n" + self.get_plan_summary(self.prod_plan)
1139
1140            return conclusion, title, summary
1141
1142        self._update_check_handler(
1143            check_name="SQLMesh - Prod Environment Synced",
1144            status=status,
1145            conclusion=conclusion,
1146            status_handler=lambda status: (
1147                {
1148                    GithubCheckStatus.IN_PROGRESS: "Deploying to Prod",
1149                    GithubCheckStatus.QUEUED: "Waiting to see if we can deploy to prod",
1150                }[status],
1151                None,
1152            ),
1153            conclusion_handler=functools.partial(conclusion_handler, skip_reason=skip_reason),
1154        )
1155
1156    def try_merge_pr(self) -> None:
1157        """
1158        Merges the PR using the merge method defined in the bot config. If one is not defined then a merge is not
1159        performed
1160        """
1161        if self.bot_config.merge_method:
1162            logger.debug(f"Merging PR with merge method: {self.bot_config.merge_method.value}")
1163            self._pull_request.merge(merge_method=self.bot_config.merge_method.value)
1164        else:
1165            logger.debug("No merge method defined so skipping merge")
1166
1167    def get_command_from_comment(self) -> BotCommand:
1168        """
1169        Gets the command from the comment
1170        """
1171        if not self._event.is_comment_added:
1172            logger.debug("Event is not a comment so returning invalid")
1173            return BotCommand.INVALID
1174        if self._event.pull_request_comment_body is None:
1175            raise CICDBotError("Unable to get comment body")
1176        logger.debug(f"Getting command from comment body: {self._event.pull_request_comment_body}")
1177        return BotCommand.from_comment_body(
1178            self._event.pull_request_comment_body, self.bot_config.command_namespace
1179        )
1180
1181    def _chunk_up_api_message(self, message: str) -> t.List[str]:
1182        """
1183        Chunks up the message into `MAX_BYTE_LENGTH` byte chunks
1184        """
1185        message_encoded = message.encode("utf-8")
1186        return [
1187            message_encoded[i : i + self.MAX_BYTE_LENGTH].decode("utf-8", "ignore")
1188            for i in range(0, len(message_encoded), self.MAX_BYTE_LENGTH)
1189        ]
1190
1191    @property
1192    def running_in_github_actions(self) -> bool:
1193        return os.environ.get("GITHUB_ACTIONS", None) == "true"
1194
1195    @property
1196    def version_info(self) -> str:
1197        from sqlmesh.cli.main import _sqlmesh_version
1198
1199        return _sqlmesh_version()
1200
1201    def _generate_plan_flags_section(
1202        self, user_provided_flags: t.Dict[str, UserProvidedFlags]
1203    ) -> str:
1204        # collapsed section syntax:
1205        # https://docs.github.com/en/get-started/writing-on-github/working-with-advanced-formatting/organizing-information-with-collapsed-sections#creating-a-collapsed-section
1206        section = "<details>\n\n<summary>Plan flags</summary>\n\n"
1207        for flag_name, flag_value in user_provided_flags.items():
1208            section += f"- `{flag_name}` = `{flag_value}`\n"
1209        section += "\n</details>"
1210
1211        return section
1212
1213    def _generate_pr_environment_summary_intro(self) -> str:
1214        note = ""
1215        subset_reasons = []
1216
1217        if self.bot_config.skip_pr_backfill:
1218            subset_reasons.append("`skip_pr_backfill` is enabled")
1219
1220        if default_pr_start := self.bot_config.default_pr_start:
1221            subset_reasons.append(f"`default_pr_start` is set to `{default_pr_start}`")
1222
1223        if subset_reasons:
1224            note = (
1225                "> [!IMPORTANT]\n"
1226                f"> This PR environment may only contain a subset of data because:\n"
1227                + "\n".join(f"> - {r}" for r in subset_reasons)
1228                + "\n"
1229                "> \n"
1230                "> This means that deploying to `prod` may not be a simple virtual update if there is still some data to load.\n"
1231                "> See `Dates not loaded in PR` below or the `Prod Plan Preview` check for more information.\n\n"
1232            )
1233
1234        return (
1235            f"Here is a summary of data that has been loaded into the PR environment `{self.pr_environment_name}` and could be deployed to `prod`.\n\n"
1236            + note
1237        )
1238
1239    def _generate_pr_environment_summary_list(self, plan: Plan) -> str:
1240        added_snapshot_ids = set(plan.context_diff.added)
1241        modified_snapshot_ids = set(
1242            s.snapshot_id for s, _ in plan.context_diff.modified_snapshots.values()
1243        )
1244        removed_snapshot_ids = set(plan.context_diff.removed_snapshots.keys())
1245
1246        # note: we sort these to get a deterministic order for the output tests
1247        table_records = sorted(
1248            [
1249                SnapshotSummaryRecord(snapshot_id=snapshot_id, plan=plan)
1250                for snapshot_id in (
1251                    added_snapshot_ids | modified_snapshot_ids | removed_snapshot_ids
1252                )
1253            ],
1254            key=lambda r: r.display_name,
1255        )
1256
1257        sections = [
1258            ("### Added", [r for r in table_records if r.is_added]),
1259            ("### Removed", [r for r in table_records if r.is_removed]),
1260            ("### Directly Modified", [r for r in table_records if r.is_directly_modified]),
1261            ("### Indirectly Modified", [r for r in table_records if r.is_indirectly_modified]),
1262            (
1263                "### Metadata Updated",
1264                [r for r in table_records if r.is_metadata_updated and not r.is_modified],
1265            ),
1266        ]
1267
1268        summary = ""
1269        for title, records in sections:
1270            if records:
1271                summary += f"\n{title}\n"
1272
1273            for record in records:
1274                summary += f"{record.as_markdown_list_item}\n"
1275
1276        return summary
1277
1278
1279@dataclass
1280class SnapshotSummaryRecord:
1281    snapshot_id: SnapshotId
1282    plan: Plan
1283
1284    @property
1285    def snapshot(self) -> Snapshot:
1286        if self.is_removed:
1287            raise ValueError("Removed snapshots only have SnapshotTableInfo available")
1288        return self.plan.snapshots[self.snapshot_id]
1289
1290    @cached_property
1291    def snapshot_table_info(self) -> SnapshotTableInfo:
1292        if self.is_removed:
1293            return self.plan.modified_snapshots[self.snapshot_id].table_info
1294        return self.plan.snapshots[self.snapshot_id].table_info
1295
1296    @property
1297    def display_name(self) -> str:
1298        dialect = None if self.is_removed else self.snapshot.node.dialect
1299        return self.snapshot_table_info.display_name(
1300            self.plan.environment_naming_info, default_catalog=None, dialect=dialect
1301        )
1302
1303    @property
1304    def change_category(self) -> str:
1305        if self.is_removed:
1306            return SNAPSHOT_CHANGE_CATEGORY_STR[SnapshotChangeCategory.BREAKING]
1307
1308        if change_category := self.snapshot.change_category:
1309            return SNAPSHOT_CHANGE_CATEGORY_STR[change_category]
1310
1311        return "Uncategorized"
1312
1313    @property
1314    def is_added(self) -> bool:
1315        return self.snapshot_id in self.plan.context_diff.added
1316
1317    @property
1318    def is_removed(self) -> bool:
1319        return self.snapshot_id in self.plan.context_diff.removed_snapshots
1320
1321    @property
1322    def is_dev_preview(self) -> bool:
1323        return not self.plan.deployability_index.is_deployable(self.snapshot_id)
1324
1325    @property
1326    def is_directly_modified(self) -> bool:
1327        return self.plan.context_diff.directly_modified(self.snapshot_table_info.name)
1328
1329    @property
1330    def is_indirectly_modified(self) -> bool:
1331        return self.plan.context_diff.indirectly_modified(self.snapshot_table_info.name)
1332
1333    @property
1334    def is_modified(self) -> bool:
1335        return self.is_directly_modified or self.is_indirectly_modified
1336
1337    @property
1338    def is_metadata_updated(self) -> bool:
1339        return self.plan.context_diff.metadata_updated(self.snapshot_table_info.name)
1340
1341    @property
1342    def is_incremental(self) -> bool:
1343        return self.snapshot_table_info.is_incremental
1344
1345    @property
1346    def modification_type(self) -> str:
1347        if self.is_directly_modified:
1348            return "Directly modified"
1349        if self.is_indirectly_modified:
1350            return "Indirectly modified"
1351        if self.is_metadata_updated:
1352            return "Metadata updated"
1353
1354        return "Unknown"
1355
1356    @property
1357    def loaded_intervals(self) -> SnapshotIntervals:
1358        if self.is_removed:
1359            raise ValueError("Removed snapshots dont have loaded intervals available")
1360
1361        return SnapshotIntervals(
1362            snapshot_id=self.snapshot_id,
1363            intervals=(
1364                self.snapshot.dev_intervals
1365                if self.snapshot.is_forward_only
1366                else self.snapshot.intervals
1367            ),
1368        )
1369
1370    @property
1371    def loaded_intervals_rendered(self) -> str:
1372        if self.is_removed:
1373            return "REMOVED"
1374
1375        return self._format_intervals(self.loaded_intervals)
1376
1377    @property
1378    def missing_intervals(self) -> t.Optional[SnapshotIntervals]:
1379        return next(
1380            (si for si in self.plan.missing_intervals if si.snapshot_id == self.snapshot_id),
1381            None,
1382        )
1383
1384    @property
1385    def missing_intervals_formatted(self) -> str:
1386        if not self.is_removed and (intervals := self.missing_intervals):
1387            return self._format_intervals(intervals)
1388
1389        return "N/A"
1390
1391    @property
1392    def as_markdown_list_item(self) -> str:
1393        if self.is_removed:
1394            return f"- `{self.display_name}` ({self.change_category})"
1395
1396        how_applied = ""
1397
1398        if not self.is_incremental:
1399            from sqlmesh.core.console import _format_missing_intervals
1400
1401            # note: this is to re-use the '[recreate view]' and '[full refresh]' text and keep it in sync with updates to the CLI
1402            # it doesnt actually use the passed intervals, those are handled differently
1403            how_applied = _format_missing_intervals(self.snapshot, self.loaded_intervals)
1404
1405        how_applied_str = f" [{how_applied}]" if how_applied else ""
1406
1407        item = f"- `{self.display_name}` ({self.change_category})\n"
1408
1409        if self.snapshot_table_info.model_kind_name:
1410            item += f"  **Kind:** {self.snapshot_table_info.model_kind_name}{how_applied_str}\n"
1411
1412        if self.is_incremental:
1413            # in-depth interval info is only relevant for incremental models
1414            item += f"  **Dates loaded in PR:** [{self.loaded_intervals_rendered}]\n"
1415            if self.missing_intervals:
1416                item += f"  **Dates *not* loaded in PR:** [{self.missing_intervals_formatted}]\n"
1417
1418        return item
1419
1420    def _format_intervals(self, intervals: SnapshotIntervals) -> str:
1421        preview_modifier = " (**preview**)" if self.is_dev_preview else ""
1422        return f"{intervals.format_intervals(self.snapshot.node.interval_unit)}{preview_modifier}"
logger = <Logger sqlmesh.integrations.github.cicd.controller (WARNING)>
class TestFailure(builtins.Exception):
61class TestFailure(Exception):
62    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class PullRequestInfo(sqlmesh.utils.pydantic.PydanticModel):
65class PullRequestInfo(PydanticModel):
66    """Contains information related to a pull request that can be used to construct other objects/URLs"""
67
68    owner: str
69    repo: str
70    pr_number: int
71
72    @property
73    def full_repo_path(self) -> str:
74        return "/".join([self.owner, self.repo])
75
76    @classmethod
77    def create_from_pull_request_url(cls, pull_request_url: str) -> PullRequestInfo:
78        owner, repo, _, pr_number = pull_request_url.split("/")[-4:]
79        return cls(
80            owner=owner,
81            repo=repo,
82            pr_number=int(pr_number),
83        )

Contains information related to a pull request that can be used to construct other objects/URLs

owner: str
repo: str
pr_number: int
full_repo_path: str
72    @property
73    def full_repo_path(self) -> str:
74        return "/".join([self.owner, self.repo])
@classmethod
def create_from_pull_request_url( cls, pull_request_url: str) -> PullRequestInfo:
76    @classmethod
77    def create_from_pull_request_url(cls, pull_request_url: str) -> PullRequestInfo:
78        owner, repo, _, pr_number = pull_request_url.split("/")[-4:]
79        return cls(
80            owner=owner,
81            repo=repo,
82            pr_number=int(pr_number),
83        )
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class GithubCheckStatus(builtins.str, enum.Enum):
 86class GithubCheckStatus(str, Enum):
 87    QUEUED = "queued"
 88    IN_PROGRESS = "in_progress"
 89    COMPLETED = "completed"
 90
 91    @property
 92    def is_queued(self) -> bool:
 93        return self == GithubCheckStatus.QUEUED
 94
 95    @property
 96    def is_in_progress(self) -> bool:
 97        return self == GithubCheckStatus.IN_PROGRESS
 98
 99    @property
100    def is_completed(self) -> bool:
101        return self == GithubCheckStatus.COMPLETED

An enumeration.

QUEUED = <GithubCheckStatus.QUEUED: 'queued'>
IN_PROGRESS = <GithubCheckStatus.IN_PROGRESS: 'in_progress'>
COMPLETED = <GithubCheckStatus.COMPLETED: 'completed'>
is_queued: bool
91    @property
92    def is_queued(self) -> bool:
93        return self == GithubCheckStatus.QUEUED
is_in_progress: bool
95    @property
96    def is_in_progress(self) -> bool:
97        return self == GithubCheckStatus.IN_PROGRESS
is_completed: bool
 99    @property
100    def is_completed(self) -> bool:
101        return self == GithubCheckStatus.COMPLETED
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class GithubCheckConclusion(builtins.str, enum.Enum):
104class GithubCheckConclusion(str, Enum):
105    SUCCESS = "success"
106    FAILURE = "failure"
107    NEUTRAL = "neutral"
108    CANCELLED = "cancelled"
109    TIMED_OUT = "timed_out"
110    ACTION_REQUIRED = "action_required"
111    SKIPPED = "skipped"
112
113    @property
114    def is_success(self) -> bool:
115        return self == GithubCheckConclusion.SUCCESS
116
117    @property
118    def is_failure(self) -> bool:
119        return self == GithubCheckConclusion.FAILURE
120
121    @property
122    def is_neutral(self) -> bool:
123        return self == GithubCheckConclusion.NEUTRAL
124
125    @property
126    def is_cancelled(self) -> bool:
127        return self == GithubCheckConclusion.CANCELLED
128
129    @property
130    def is_timed_out(self) -> bool:
131        return self == GithubCheckConclusion.TIMED_OUT
132
133    @property
134    def is_action_required(self) -> bool:
135        return self == GithubCheckConclusion.ACTION_REQUIRED
136
137    @property
138    def is_skipped(self) -> bool:
139        return self == GithubCheckConclusion.SKIPPED

An enumeration.

SUCCESS = <GithubCheckConclusion.SUCCESS: 'success'>
FAILURE = <GithubCheckConclusion.FAILURE: 'failure'>
NEUTRAL = <GithubCheckConclusion.NEUTRAL: 'neutral'>
CANCELLED = <GithubCheckConclusion.CANCELLED: 'cancelled'>
TIMED_OUT = <GithubCheckConclusion.TIMED_OUT: 'timed_out'>
ACTION_REQUIRED = <GithubCheckConclusion.ACTION_REQUIRED: 'action_required'>
SKIPPED = <GithubCheckConclusion.SKIPPED: 'skipped'>
is_success: bool
113    @property
114    def is_success(self) -> bool:
115        return self == GithubCheckConclusion.SUCCESS
is_failure: bool
117    @property
118    def is_failure(self) -> bool:
119        return self == GithubCheckConclusion.FAILURE
is_neutral: bool
121    @property
122    def is_neutral(self) -> bool:
123        return self == GithubCheckConclusion.NEUTRAL
is_cancelled: bool
125    @property
126    def is_cancelled(self) -> bool:
127        return self == GithubCheckConclusion.CANCELLED
is_timed_out: bool
129    @property
130    def is_timed_out(self) -> bool:
131        return self == GithubCheckConclusion.TIMED_OUT
is_action_required: bool
133    @property
134    def is_action_required(self) -> bool:
135        return self == GithubCheckConclusion.ACTION_REQUIRED
is_skipped: bool
137    @property
138    def is_skipped(self) -> bool:
139        return self == GithubCheckConclusion.SKIPPED
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class MergeStateStatus(builtins.str, enum.Enum):
142class MergeStateStatus(str, Enum):
143    """
144    https://docs.github.com/en/graphql/reference/enums#mergestatestatus
145    """
146
147    BEHIND = "behind"
148    BLOCKED = "blocked"
149    CLEAN = "clean"
150    DIRTY = "dirty"
151    DRAFT = "draft"
152    HAS_HOOKS = "has_hooks"
153    UNKNOWN = "unknown"
154    UNSTABLE = "unstable"
155
156    @property
157    def is_behind(self) -> bool:
158        return self == MergeStateStatus.BEHIND
159
160    @property
161    def is_blocked(self) -> bool:
162        return self == MergeStateStatus.BLOCKED
163
164    @property
165    def is_clean(self) -> bool:
166        return self == MergeStateStatus.CLEAN
167
168    @property
169    def is_dirty(self) -> bool:
170        return self == MergeStateStatus.DIRTY
171
172    @property
173    def is_draft(self) -> bool:
174        return self == MergeStateStatus.DRAFT
175
176    @property
177    def is_has_hooks(self) -> bool:
178        return self == MergeStateStatus.HAS_HOOKS
179
180    @property
181    def is_unknown(self) -> bool:
182        return self == MergeStateStatus.UNKNOWN
183
184    @property
185    def is_unstable(self) -> bool:
186        return self == MergeStateStatus.UNSTABLE
BEHIND = <MergeStateStatus.BEHIND: 'behind'>
BLOCKED = <MergeStateStatus.BLOCKED: 'blocked'>
CLEAN = <MergeStateStatus.CLEAN: 'clean'>
DIRTY = <MergeStateStatus.DIRTY: 'dirty'>
DRAFT = <MergeStateStatus.DRAFT: 'draft'>
HAS_HOOKS = <MergeStateStatus.HAS_HOOKS: 'has_hooks'>
UNKNOWN = <MergeStateStatus.UNKNOWN: 'unknown'>
UNSTABLE = <MergeStateStatus.UNSTABLE: 'unstable'>
is_behind: bool
156    @property
157    def is_behind(self) -> bool:
158        return self == MergeStateStatus.BEHIND
is_blocked: bool
160    @property
161    def is_blocked(self) -> bool:
162        return self == MergeStateStatus.BLOCKED
is_clean: bool
164    @property
165    def is_clean(self) -> bool:
166        return self == MergeStateStatus.CLEAN
is_dirty: bool
168    @property
169    def is_dirty(self) -> bool:
170        return self == MergeStateStatus.DIRTY
is_draft: bool
172    @property
173    def is_draft(self) -> bool:
174        return self == MergeStateStatus.DRAFT
is_has_hooks: bool
176    @property
177    def is_has_hooks(self) -> bool:
178        return self == MergeStateStatus.HAS_HOOKS
is_unknown: bool
180    @property
181    def is_unknown(self) -> bool:
182        return self == MergeStateStatus.UNKNOWN
is_unstable: bool
184    @property
185    def is_unstable(self) -> bool:
186        return self == MergeStateStatus.UNSTABLE
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class BotCommand(enum.Enum):
189class BotCommand(Enum):
190    INVALID = 1
191    DEPLOY_PROD = 2
192
193    @classmethod
194    def from_comment_body(cls, body: str, namespace: t.Optional[str] = None) -> BotCommand:
195        body = body.strip()
196        namespace = namespace.strip() if namespace else ""
197        input_to_command = {
198            namespace + "/deploy": cls.DEPLOY_PROD,
199        }
200        return input_to_command.get(body, cls.INVALID)
201
202    @property
203    def is_invalid(self) -> bool:
204        return self == self.INVALID
205
206    @property
207    def is_deploy_prod(self) -> bool:
208        return self == self.DEPLOY_PROD

An enumeration.

INVALID = <BotCommand.INVALID: 1>
DEPLOY_PROD = <BotCommand.DEPLOY_PROD: 2>
@classmethod
def from_comment_body( cls, body: str, namespace: Optional[str] = None) -> BotCommand:
193    @classmethod
194    def from_comment_body(cls, body: str, namespace: t.Optional[str] = None) -> BotCommand:
195        body = body.strip()
196        namespace = namespace.strip() if namespace else ""
197        input_to_command = {
198            namespace + "/deploy": cls.DEPLOY_PROD,
199        }
200        return input_to_command.get(body, cls.INVALID)
is_invalid: bool
202    @property
203    def is_invalid(self) -> bool:
204        return self == self.INVALID
is_deploy_prod: bool
206    @property
207    def is_deploy_prod(self) -> bool:
208        return self == self.DEPLOY_PROD
Inherited Members
enum.Enum
name
value
class GithubEvent:
211class GithubEvent:
212    """
213    Takes a Github Actions event payload and provides a simple interface to access
214    """
215
216    def __init__(self, payload: t.Dict[str, t.Any]) -> None:
217        self.payload = payload
218        self._pull_request_info: t.Optional[PullRequestInfo] = None
219
220    @classmethod
221    def from_obj(cls, obj: t.Dict[str, t.Any]) -> GithubEvent:
222        return cls(payload=obj)
223
224    @classmethod
225    def from_path(cls, path: t.Union[str, pathlib.Path]) -> GithubEvent:
226        with open(pathlib.Path(path), "r", encoding="utf-8") as f:
227            return cls.from_obj(json.load(f))
228
229    @classmethod
230    def from_env(cls) -> GithubEvent:
231        return cls.from_path(os.environ["GITHUB_EVENT_PATH"])
232
233    @property
234    def is_review(self) -> bool:
235        return bool(self.payload.get("review"))
236
237    @property
238    def is_comment(self) -> bool:
239        comment = self.payload.get("comment")
240        if not comment:
241            return False
242        if not comment.get("body"):
243            return False
244        return True
245
246    @property
247    def is_comment_added(self) -> bool:
248        return self.is_comment and self.payload.get("action") != "deleted"
249
250    @property
251    def is_pull_request(self) -> bool:
252        return bool(self.payload.get("pull_request"))
253
254    @property
255    def is_pull_request_closed(self) -> bool:
256        return self.is_pull_request and self.payload.get("action") == "closed"
257
258    @property
259    def pull_request_url(self) -> str:
260        if self.is_review:
261            return self.payload["review"]["pull_request_url"]
262        if self.is_comment:
263            return self.payload["issue"]["pull_request"]["url"]
264        if self.is_pull_request:
265            return self.payload["pull_request"]["_links"]["self"]["href"]
266        raise CICDBotError("Unable to determine pull request url")
267
268    @property
269    def pull_request_info(self) -> PullRequestInfo:
270        if not self._pull_request_info:
271            self._pull_request_info = PullRequestInfo.create_from_pull_request_url(
272                self.pull_request_url
273            )
274        return self._pull_request_info
275
276    @property
277    def pull_request_comment_body(self) -> t.Optional[str]:
278        if self.is_comment_added:
279            return self.payload["comment"]["body"]
280        return None

Takes a Github Actions event payload and provides a simple interface to access

GithubEvent(payload: Dict[str, Any])
216    def __init__(self, payload: t.Dict[str, t.Any]) -> None:
217        self.payload = payload
218        self._pull_request_info: t.Optional[PullRequestInfo] = None
payload
@classmethod
def from_obj( cls, obj: Dict[str, Any]) -> GithubEvent:
220    @classmethod
221    def from_obj(cls, obj: t.Dict[str, t.Any]) -> GithubEvent:
222        return cls(payload=obj)
@classmethod
def from_path( cls, path: Union[str, pathlib.Path]) -> GithubEvent:
224    @classmethod
225    def from_path(cls, path: t.Union[str, pathlib.Path]) -> GithubEvent:
226        with open(pathlib.Path(path), "r", encoding="utf-8") as f:
227            return cls.from_obj(json.load(f))
@classmethod
def from_env(cls) -> GithubEvent:
229    @classmethod
230    def from_env(cls) -> GithubEvent:
231        return cls.from_path(os.environ["GITHUB_EVENT_PATH"])
is_review: bool
233    @property
234    def is_review(self) -> bool:
235        return bool(self.payload.get("review"))
is_comment: bool
237    @property
238    def is_comment(self) -> bool:
239        comment = self.payload.get("comment")
240        if not comment:
241            return False
242        if not comment.get("body"):
243            return False
244        return True
is_comment_added: bool
246    @property
247    def is_comment_added(self) -> bool:
248        return self.is_comment and self.payload.get("action") != "deleted"
is_pull_request: bool
250    @property
251    def is_pull_request(self) -> bool:
252        return bool(self.payload.get("pull_request"))
is_pull_request_closed: bool
254    @property
255    def is_pull_request_closed(self) -> bool:
256        return self.is_pull_request and self.payload.get("action") == "closed"
pull_request_url: str
258    @property
259    def pull_request_url(self) -> str:
260        if self.is_review:
261            return self.payload["review"]["pull_request_url"]
262        if self.is_comment:
263            return self.payload["issue"]["pull_request"]["url"]
264        if self.is_pull_request:
265            return self.payload["pull_request"]["_links"]["self"]["href"]
266        raise CICDBotError("Unable to determine pull request url")
pull_request_info: PullRequestInfo
268    @property
269    def pull_request_info(self) -> PullRequestInfo:
270        if not self._pull_request_info:
271            self._pull_request_info = PullRequestInfo.create_from_pull_request_url(
272                self.pull_request_url
273            )
274        return self._pull_request_info
pull_request_comment_body: Optional[str]
276    @property
277    def pull_request_comment_body(self) -> t.Optional[str]:
278        if self.is_comment_added:
279            return self.payload["comment"]["body"]
280        return None
class GithubController:
 283class GithubController:
 284    BOT_HEADER_MSG = ":robot: **SQLMesh Bot Info** :robot:"
 285    MAX_BYTE_LENGTH = 65535
 286
 287    def __init__(
 288        self,
 289        paths: t.Union[Path, t.Iterable[Path]],
 290        token: str,
 291        config: t.Optional[t.Union[Config, str]] = None,
 292        event: t.Optional[GithubEvent] = None,
 293        client: t.Optional[Github] = None,
 294        context: t.Optional[Context] = None,
 295    ) -> None:
 296        from github import Github
 297
 298        logger.debug(f"Initializing GithubController with paths: {paths} and config: {config}")
 299
 300        self.config = config
 301        self._paths = paths
 302        self._token = token
 303        self._event = event or GithubEvent.from_env()
 304        logger.debug(f"Github event: {json.dumps(self._event.payload)}")
 305        self._pr_plan_builder: t.Optional[PlanBuilder] = None
 306        self._prod_plan_builder: t.Optional[PlanBuilder] = None
 307        self._prod_plan_with_gaps_builder: t.Optional[PlanBuilder] = None
 308        self._check_run_mapping: t.Dict[str, CheckRun] = {}
 309
 310        if not isinstance(get_console(), MarkdownConsole):
 311            raise CICDBotError("Console must be a markdown console.")
 312        self._console = t.cast(MarkdownConsole, get_console())
 313
 314        from github.Consts import DEFAULT_BASE_URL
 315        from github.Auth import Token
 316
 317        self._client: Github = client or Github(
 318            base_url=os.environ.get("GITHUB_API_URL", DEFAULT_BASE_URL), auth=Token(self._token)
 319        )
 320
 321        self._repo: Repository = self._client.get_repo(
 322            self._event.pull_request_info.full_repo_path, lazy=True
 323        )
 324        self._pull_request: PullRequest = self._repo.get_pull(
 325            self._event.pull_request_info.pr_number
 326        )
 327        self._issue: Issue = self._repo.get_issue(self._event.pull_request_info.pr_number)
 328        self._reviews: t.Iterable[PullRequestReview] = self._pull_request.get_reviews()
 329        # TODO: The python module says that user names can be None and this is not currently handled
 330        self._approvers: t.Set[str] = {
 331            review.user.login or "UNKNOWN"
 332            for review in self._reviews
 333            if review.state.lower() == "approved"
 334        }
 335        logger.debug(f"Approvers: {', '.join(self._approvers)}")
 336        self._context: Context = context or Context(paths=self._paths, config=self.config)
 337
 338        # Bot config needs the context to be initialized
 339        logger.debug(f"Bot config: {self.bot_config.json(indent=2)}")
 340
 341    @property
 342    def deploy_command_enabled(self) -> bool:
 343        return self.bot_config.enable_deploy_command
 344
 345    @property
 346    def is_comment_added(self) -> bool:
 347        return self._event.is_comment_added
 348
 349    @property
 350    def _required_approvers(self) -> t.List[User]:
 351        required_approvers = [
 352            user
 353            for user in self._context.users
 354            if user.is_required_approver and user.github_username
 355        ]
 356        logger.debug(
 357            f"Required approvers: {', '.join(user.github_username for user in required_approvers if user.github_username)}"
 358        )
 359        return required_approvers
 360
 361    @property
 362    def _required_approvers_with_approval(self) -> t.List[User]:
 363        return [
 364            user for user in self._required_approvers if user.github_username in self._approvers
 365        ]
 366
 367    @property
 368    def pr_environment_name(self) -> str:
 369        return Environment.sanitize_name(
 370            "_".join(
 371                [
 372                    self.bot_config.pr_environment_name or self._event.pull_request_info.repo,
 373                    str(self._event.pull_request_info.pr_number),
 374                ]
 375            )
 376        )
 377
 378    @property
 379    def do_required_approval_check(self) -> bool:
 380        """We want to skip required approval check if no users have this role"""
 381        do_required_approval_check = bool(self._required_approvers)
 382        logger.debug(f"Do required approval check: {do_required_approval_check}")
 383        return do_required_approval_check
 384
 385    @property
 386    def has_required_approval(self) -> bool:
 387        """
 388        Check if the PR has a required approver.
 389
 390        TODO: Allow defining requiring some number, or all, required approvers.
 391        """
 392        if not self._required_approvers or self._required_approvers_with_approval:
 393            logger.debug("Has required Approval")
 394            return True
 395        logger.debug("Does not have required approval")
 396        return False
 397
 398    @property
 399    def pr_plan(self) -> Plan:
 400        if not self._pr_plan_builder:
 401            self._pr_plan_builder = self._context.plan_builder(
 402                environment=self.pr_environment_name,
 403                skip_tests=True,
 404                skip_linter=True,
 405                categorizer_config=self.bot_config.auto_categorize_changes,
 406                start=self.bot_config.default_pr_start,
 407                min_intervals=self.bot_config.pr_min_intervals,
 408                skip_backfill=self.bot_config.skip_pr_backfill,
 409                include_unmodified=self.bot_config.pr_include_unmodified,
 410                forward_only=self.forward_only_plan,
 411            )
 412        assert self._pr_plan_builder
 413        return self._pr_plan_builder.build()
 414
 415    @property
 416    def pr_plan_or_none(self) -> t.Optional[Plan]:
 417        try:
 418            return self.pr_plan
 419        except:
 420            return None
 421
 422    @property
 423    def pr_plan_flags(self) -> t.Optional[t.Dict[str, UserProvidedFlags]]:
 424        if pr_plan := self.pr_plan_or_none:
 425            return pr_plan.user_provided_flags
 426        if pr_plan_builder := self._pr_plan_builder:
 427            return pr_plan_builder._user_provided_flags
 428        return None
 429
 430    @property
 431    def prod_plan(self) -> Plan:
 432        if not self._prod_plan_builder:
 433            self._prod_plan_builder = self._context.plan_builder(
 434                c.PROD,
 435                no_gaps=True,
 436                skip_tests=True,
 437                skip_linter=True,
 438                categorizer_config=self.bot_config.auto_categorize_changes,
 439                run=self.bot_config.run_on_deploy_to_prod,
 440                forward_only=self.forward_only_plan,
 441            )
 442        assert self._prod_plan_builder
 443        return self._prod_plan_builder.build()
 444
 445    @property
 446    def prod_plan_with_gaps(self) -> Plan:
 447        if not self._prod_plan_with_gaps_builder:
 448            self._prod_plan_with_gaps_builder = self._context.plan_builder(
 449                c.PROD,
 450                # this is required to highlight any data gaps between this PR environment and prod (since PR environments may only contain a subset of data)
 451                no_gaps=False,
 452                skip_tests=True,
 453                skip_linter=True,
 454                categorizer_config=self.bot_config.auto_categorize_changes,
 455                run=self.bot_config.run_on_deploy_to_prod,
 456                forward_only=self.forward_only_plan,
 457            )
 458        assert self._prod_plan_with_gaps_builder
 459        return self._prod_plan_with_gaps_builder.build()
 460
 461    @property
 462    def bot_config(self) -> GithubCICDBotConfig:
 463        bot_config = self._context.config.cicd_bot or GithubCICDBotConfig(
 464            auto_categorize_changes=self._context.auto_categorize_changes
 465        )
 466        return bot_config
 467
 468    @property
 469    def modified_snapshots(self) -> t.Dict[SnapshotId, t.Union[Snapshot, SnapshotTableInfo]]:
 470        return self.prod_plan_with_gaps.modified_snapshots
 471
 472    @property
 473    def removed_snapshots(self) -> t.Set[SnapshotId]:
 474        return set(self.prod_plan_with_gaps.context_diff.removed_snapshots)
 475
 476    @property
 477    def pr_targets_prod_branch(self) -> bool:
 478        return self._pull_request.base.ref in self.bot_config.prod_branch_names
 479
 480    @property
 481    def forward_only_plan(self) -> bool:
 482        default = self._context.config.plan.forward_only
 483        head_ref = self._pull_request.head.ref
 484        if isinstance(head_ref, str):
 485            return head_ref.endswith(self.bot_config.forward_only_branch_suffix) or default
 486        return default
 487
 488    @classmethod
 489    def _append_output(cls, key: str, value: str) -> None:
 490        """
 491        Appends the given key/value to output so they can be read by following steps
 492        """
 493        logger.debug(f"Setting output. Key: {key}, Value: {value}")
 494
 495        # GitHub Actions sets this environment variable
 496        if output_file := os.environ.get("GITHUB_OUTPUT"):
 497            with open(output_file, "a", encoding="utf-8") as fh:
 498                print(f"{key}={value}", file=fh)
 499
 500    def get_forward_only_plan_post_deployment_tip(self, plan: Plan) -> str:
 501        if not plan.forward_only:
 502            return ""
 503
 504        example_model_name = "<model name>"
 505        for snapshot_id in sorted(plan.snapshots):
 506            snapshot = plan.snapshots[snapshot_id]
 507            if snapshot.is_incremental:
 508                example_model_name = snapshot.node.name
 509                break
 510
 511        return (
 512            "> [!TIP]\n"
 513            "> In order to see this forward-only plan retroactively apply to historical intervals on the production model, run the below for date ranges in scope:\n"
 514            "> \n"
 515            f"> `$ sqlmesh plan --restate-model {example_model_name} --start YYYY-MM-DD --end YYYY-MM-DD`\n"
 516            ">\n"
 517            "> Learn more: https://sqlmesh.readthedocs.io/en/stable/concepts/plans/?h=restate#restatement-plans"
 518        )
 519
 520    def get_plan_summary(self, plan: Plan) -> str:
 521        # use Verbosity.VERY_VERBOSE to prevent the list of models from being truncated
 522        # this is particularly important for the "Models needing backfill" list because
 523        # there is no easy way to tell this otherwise
 524        orig_verbosity = self._console.verbosity
 525        self._console.verbosity = Verbosity.VERY_VERBOSE
 526
 527        try:
 528            # Clear out any output that might exist from prior steps
 529            self._console.consume_captured_output()
 530            if plan.restatements:
 531                self._console._print("\n**Restating models**\n")
 532            else:
 533                self._console.show_environment_difference_summary(
 534                    context_diff=plan.context_diff,
 535                    no_diff=False,
 536                )
 537            if plan.context_diff.has_changes:
 538                self._console.show_model_difference_summary(
 539                    context_diff=plan.context_diff,
 540                    environment_naming_info=plan.environment_naming_info,
 541                    default_catalog=self._context.default_catalog,
 542                    no_diff=False,
 543                )
 544            difference_summary = self._console.consume_captured_output()
 545            self._console._show_missing_dates(plan, self._context.default_catalog)
 546            missing_dates = self._console.consume_captured_output()
 547
 548            plan_flags_section = (
 549                f"\n\n{self._generate_plan_flags_section(plan.user_provided_flags)}"
 550                if plan.user_provided_flags
 551                else ""
 552            )
 553
 554            if not difference_summary and not missing_dates:
 555                return f"No changes to apply.{plan_flags_section}"
 556
 557            warnings_block = self._console.consume_captured_warnings()
 558            errors_block = self._console.consume_captured_errors()
 559
 560            return f"{warnings_block}{errors_block}{difference_summary}\n{missing_dates}{plan_flags_section}"
 561        except PlanError as e:
 562            logger.exception("Plan failed to generate")
 563            return f"Plan failed to generate. Check for pending or unresolved changes. Error: {e}"
 564        finally:
 565            self._console.verbosity = orig_verbosity
 566
 567    def get_pr_environment_summary(
 568        self, conclusion: GithubCheckConclusion, exception: t.Optional[Exception] = None
 569    ) -> str:
 570        heading = ""
 571        summary = ""
 572
 573        if conclusion.is_success:
 574            summary = self._get_pr_environment_summary_success()
 575        elif conclusion.is_action_required:
 576            heading = f":warning: Action Required to create or update PR Environment `{self.pr_environment_name}` :warning:"
 577            summary = self._get_pr_environment_summary_action_required(exception)
 578        elif conclusion.is_failure:
 579            heading = (
 580                f":x: Failed to create or update PR Environment `{self.pr_environment_name}` :x:"
 581            )
 582            summary = self._get_pr_environment_summary_failure(exception)
 583        elif conclusion.is_skipped:
 584            heading = f":next_track_button: Skipped creating or updating PR Environment `{self.pr_environment_name}` :next_track_button:"
 585            summary = self._get_pr_environment_summary_skipped(exception)
 586        else:
 587            heading = f":interrobang: Got an unexpected conclusion: {conclusion.value}"
 588
 589        # note: we just add warnings here, errors will be covered by the "failure" conclusion
 590        if warnings := self._console.consume_captured_warnings():
 591            summary = f"{warnings}\n{summary}"
 592
 593        return f"{heading}\n\n{summary}".strip()
 594
 595    def _get_pr_environment_summary_success(self) -> str:
 596        prod_plan = self.prod_plan_with_gaps
 597
 598        if not prod_plan.has_changes:
 599            summary = "No models were modified in this PR.\n"
 600        else:
 601            intro = self._generate_pr_environment_summary_intro()
 602            summary = intro + self._generate_pr_environment_summary_list(prod_plan)
 603
 604        if prod_plan.user_provided_flags:
 605            summary += self._generate_plan_flags_section(prod_plan.user_provided_flags)
 606
 607        return summary
 608
 609    def _get_pr_environment_summary_skipped(self, exception: t.Optional[Exception] = None) -> str:
 610        if isinstance(exception, NoChangesPlanError):
 611            skip_reason = "No changes were detected compared to the prod environment."
 612        elif isinstance(exception, TestFailure):
 613            skip_reason = "Unit Test(s) Failed so skipping PR creation"
 614        else:
 615            skip_reason = "A prior stage failed resulting in skipping PR creation."
 616
 617        return skip_reason
 618
 619    def _get_pr_environment_summary_action_required(
 620        self, exception: t.Optional[Exception] = None
 621    ) -> str:
 622        plan = self.pr_plan_or_none
 623        if isinstance(exception, UncategorizedPlanError) and plan:
 624            failure_msg = f"The following models could not be categorized automatically:\n"
 625            for snapshot in plan.uncategorized:
 626                failure_msg += f"- {snapshot.name}\n"
 627            failure_msg += (
 628                f"\nRun `sqlmesh plan {self.pr_environment_name}` locally to apply these changes.\n\n"
 629                "If you would like the bot to automatically categorize changes, check the [documentation](https://sqlmesh.readthedocs.io/en/stable/integrations/github/) for more information."
 630            )
 631        else:
 632            failure_msg = "Please check the Actions Workflow logs for more information."
 633
 634        return failure_msg
 635
 636    def _get_pr_environment_summary_failure(self, exception: t.Optional[Exception] = None) -> str:
 637        console_output = self._console.consume_captured_output()
 638        failure_msg = ""
 639
 640        if isinstance(exception, PlanError):
 641            if exception.args and (msg := exception.args[0]) and isinstance(msg, str):
 642                failure_msg += f"*{msg}*\n"
 643            if console_output:
 644                failure_msg += f"\n{console_output}"
 645        elif isinstance(exception, (SQLMeshError, SqlglotError, ValueError)):
 646            # this logic is taken from the global error handler attached to the CLI, which uses `click.echo()` to output the message
 647            # so cant be re-used here because it bypasses the Console
 648            failure_msg = f"**Error:** {str(exception)}"
 649        elif exception:
 650            logger.debug(
 651                "Got unexpected error. Error Type: "
 652                + str(type(exception))
 653                + " Stack trace: "
 654                + traceback.format_exc()
 655            )
 656            failure_msg = f"This is an unexpected error.\n\n**Exception:**\n```\n{traceback.format_exc()}\n```"
 657
 658        if captured_errors := self._console.consume_captured_errors():
 659            failure_msg = f"{captured_errors}\n{failure_msg}"
 660
 661        if plan_flags := self.pr_plan_flags:
 662            failure_msg += f"\n\n{self._generate_plan_flags_section(plan_flags)}"
 663
 664        return failure_msg
 665
 666    def run_tests(self) -> t.Tuple[ModelTextTestResult, str]:
 667        """
 668        Run tests for the PR
 669        """
 670        return self._context._run_tests(verbosity=Verbosity.VERBOSE)
 671
 672    def run_linter(self) -> None:
 673        """
 674        Run linter for the PR
 675        """
 676        self._console.consume_captured_output()
 677        self._context.lint_models()
 678
 679    def _get_or_create_comment(self, header: str = BOT_HEADER_MSG) -> IssueComment:
 680        comment = seq_get(
 681            [comment for comment in self._issue.get_comments() if header in comment.body],
 682            0,
 683        )
 684        if not comment:
 685            logger.debug(f"Did not find comment so creating one with header: {header}")
 686            return self._issue.create_comment(header)
 687        logger.debug(f"Found comment with header: {header}")
 688        return comment
 689
 690    def _get_merge_state_status(self) -> MergeStateStatus:
 691        """
 692        This feature is currently in preview and therefore not available in the python module.
 693        So we query GraphQL directly instead.
 694        """
 695        headers = {
 696            "Authorization": f"Bearer {self._token}",
 697            "Accept": "application/vnd.github.merge-info-preview+json",
 698        }
 699        query = f"""{{
 700            repository(owner: "{self._event.pull_request_info.owner}", name: "{self._event.pull_request_info.repo}") {{
 701                pullRequest(number: {self._event.pull_request_info.pr_number}) {{
 702                    title
 703                    state
 704                    mergeStateStatus
 705                }}
 706            }}
 707        }}"""
 708        request = requests.post(
 709            os.environ["GITHUB_GRAPHQL_URL"],
 710            json={"query": query},
 711            headers=headers,
 712        )
 713        if request.status_code == 200:
 714            merge_status = MergeStateStatus(
 715                request.json()["data"]["repository"]["pullRequest"]["mergeStateStatus"].lower()
 716            )
 717            logger.debug(f"Merge state status: {merge_status.value}")
 718            return merge_status
 719        raise CICDBotError(f"Unable to get merge state status. Error: {request.text}")
 720
 721    def update_sqlmesh_comment_info(
 722        self, value: str, *, dedup_regex: t.Optional[str]
 723    ) -> t.Tuple[bool, IssueComment]:
 724        """
 725        Update the SQLMesh PR Comment for the given lookup key with the given value. If a comment does not exist then
 726        it creates one. It determines the comment to update by looking for a comment with the header. If a dedup
 727        regex is provided then it will check if the value already exists in the comment and if so it will not update
 728        """
 729        comment = self._get_or_create_comment()
 730        if dedup_regex:
 731            # If we find a match against the regex then we just return since the comment has already been posted
 732            if seq_get(re.findall(dedup_regex, comment.body), 0):
 733                return False, comment
 734        full_comment = f"{comment.body}\n{value}"
 735        body, *truncated = self._chunk_up_api_message(f"{full_comment}")
 736        if truncated:
 737            logger.warning(
 738                f"Comment body was too long so we truncated it. Full text: {full_comment}"
 739            )
 740        comment.edit(body=body)
 741        return True, comment
 742
 743    def update_pr_environment(self) -> None:
 744        """
 745        Creates a PR environment from the logic present in the PR. If the PR contains changes that are
 746        uncategorized, then an error will be raised.
 747        """
 748        self._console.consume_captured_output()  # clear output buffer
 749        self._context.apply(self.pr_plan)  # will raise if PR environment creation fails
 750
 751        # update PR info comment
 752        vde_title = "- :eyes: To **review** this PR's changes, use virtual data environment:"
 753        comment_value = f"{vde_title}\n  - `{self.pr_environment_name}`"
 754        if self.bot_config.enable_deploy_command:
 755            full_command = f"{self.bot_config.command_namespace or ''}/deploy"
 756            comment_value += f"\n- :arrow_forward: To **apply** this PR's plan to prod, comment:\n  - `{full_command}`"
 757        dedup_regex = vde_title.replace("*", r"\*") + r".*"
 758        updated_comment, _ = self.update_sqlmesh_comment_info(
 759            value=comment_value,
 760            dedup_regex=dedup_regex,
 761        )
 762        if updated_comment:
 763            self._append_output("created_pr_environment", "true")
 764
 765    def deploy_to_prod(self) -> None:
 766        """
 767        Attempts to deploy a plan to prod. If the plan is not up-to-date or has gaps then it will raise.
 768        """
 769        # If the PR is already merged then we will not deploy to prod if this event was triggered prior to the merge.
 770        # The deploy can still happen if the workflow is configured to listen for `closed` events.
 771        if self._pull_request.merged and not self._event.is_pull_request_closed:
 772            raise CICDBotError(
 773                "PR is already merged and this event was triggered prior to the merge."
 774            )
 775        merge_status = self._get_merge_state_status()
 776        if self.bot_config.check_if_blocked_on_deploy_to_prod and merge_status.is_blocked:
 777            raise CICDBotError(
 778                "Branch protection or ruleset requirement is likely not satisfied, e.g. missing CODEOWNERS approval. "
 779                "Please check PR and resolve any issues. To disable this check, set `check_if_blocked_on_deploy_to_prod` to false in the bot configuration."
 780            )
 781        if merge_status.is_dirty:
 782            raise CICDBotError(
 783                "Merge commit cannot be cleanly created. Likely from a merge conflict. "
 784                "Please check PR and resolve any issues."
 785            )
 786        plan_summary = f"""<details>
 787  <summary>:ship: Prod Plan Being Applied</summary>
 788
 789{self.get_plan_summary(self.prod_plan)}
 790</details>
 791
 792"""
 793        if self.forward_only_plan:
 794            plan_summary = (
 795                f"{self.get_forward_only_plan_post_deployment_tip(self.prod_plan)}\n{plan_summary}"
 796            )
 797
 798        self.update_sqlmesh_comment_info(
 799            value=plan_summary,
 800            dedup_regex=None,
 801        )
 802        self._context.apply(self.prod_plan)
 803
 804    def try_invalidate_pr_environment(self) -> None:
 805        """
 806        Marks the PR environment for garbage collection.
 807        """
 808        if self.bot_config.invalidate_environment_after_deploy:
 809            self._context.invalidate_environment(self.pr_environment_name)
 810
 811    def _update_check(
 812        self,
 813        name: str,
 814        status: GithubCheckStatus,
 815        title: str,
 816        conclusion: t.Optional[GithubCheckConclusion] = None,
 817        full_summary: t.Optional[str] = None,
 818    ) -> None:
 819        """
 820        Updates the status of the merge commit.
 821        """
 822        current_time = now()
 823        kwargs: t.Dict[str, t.Any] = {
 824            "name": name,
 825            # Note: The environment variable `GITHUB_SHA` would be the merge commit so that is why instead we
 826            # get the last commit on the PR.
 827            "head_sha": self._pull_request.head.sha,
 828            "status": status.value,
 829        }
 830        if status.is_in_progress:
 831            kwargs["started_at"] = current_time
 832        if status.is_completed:
 833            kwargs["completed_at"] = current_time
 834        if conclusion:
 835            kwargs["conclusion"] = conclusion.value
 836        full_summary = full_summary or title
 837        summary, text, *truncated = self._chunk_up_api_message(full_summary) + [None]
 838        if truncated and truncated[0] is not None:
 839            logger.warning(f"Summary was too long so we truncated it. Full text: {full_summary}")
 840        kwargs["output"] = {"title": title, "summary": summary}
 841        if text:
 842            kwargs["output"]["text"] = text
 843        logger.debug(f"Updating check with kwargs: {kwargs}")
 844
 845        if self.running_in_github_actions:
 846            # Only make the API call to update the checks if we are running within GitHub Actions
 847            # One very annoying limitation of the Pull Request Checks API is that its only available to GitHub Apps
 848            # and not personal access tokens, which makes it unable to be utilized during local development
 849            if name in self._check_run_mapping:
 850                logger.debug(f"Found check run in mapping so updating it. Name: {name}")
 851                check_run = self._check_run_mapping[name]
 852                check_run.edit(
 853                    **{
 854                        k: v
 855                        for k, v in kwargs.items()
 856                        if k not in ("name", "head_sha", "started_at")
 857                    }
 858                )
 859            else:
 860                logger.debug(f"Did not find check run in mapping so creating it. Name: {name}")
 861                self._check_run_mapping[name] = self._repo.create_check_run(**kwargs)
 862        else:
 863            # Output the summary using print() so the newlines are resolved and the result can easily
 864            # be disambiguated from the rest of the console output and copy+pasted into a Markdown renderer
 865            print(
 866                f"---CHECK OUTPUT START: {kwargs['output']['title']} ---\n{kwargs['output']['summary']}\n---CHECK OUTPUT END---\n"
 867            )
 868
 869        if conclusion:
 870            self._append_output(
 871                word_characters_only(name.replace("SQLMesh - ", "").lower()), conclusion.value
 872            )
 873
 874    def _update_check_handler(
 875        self,
 876        check_name: str,
 877        status: GithubCheckStatus,
 878        conclusion: t.Optional[GithubCheckConclusion],
 879        status_handler: t.Callable[[GithubCheckStatus], t.Tuple[str, t.Optional[str]]],
 880        conclusion_handler: t.Callable[
 881            [GithubCheckConclusion], t.Tuple[GithubCheckConclusion, str, t.Optional[str]]
 882        ],
 883    ) -> None:
 884        if conclusion:
 885            conclusion, title, summary = conclusion_handler(conclusion)
 886        else:
 887            title, summary = status_handler(status)
 888        self._update_check(
 889            name=check_name,
 890            status=status,
 891            title=title,
 892            conclusion=conclusion,
 893            full_summary=summary,
 894        )
 895
 896    def update_linter_check(
 897        self,
 898        status: GithubCheckStatus,
 899        conclusion: t.Optional[GithubCheckConclusion] = None,
 900    ) -> None:
 901        if not self._context.config.linter.enabled:
 902            return
 903
 904        def conclusion_handler(
 905            conclusion: GithubCheckConclusion,
 906        ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]:
 907            linter_summary = self._console.consume_captured_output() or "Linter Success"
 908
 909            title = "Linter results"
 910
 911            return conclusion, title, linter_summary
 912
 913        self._update_check_handler(
 914            check_name="SQLMesh - Linter",
 915            status=status,
 916            conclusion=conclusion,
 917            status_handler=lambda status: (
 918                {
 919                    GithubCheckStatus.IN_PROGRESS: "Running linter",
 920                    GithubCheckStatus.QUEUED: "Waiting to Run linter",
 921                }[status],
 922                None,
 923            ),
 924            conclusion_handler=conclusion_handler,
 925        )
 926
 927    def update_test_check(
 928        self,
 929        status: GithubCheckStatus,
 930        conclusion: t.Optional[GithubCheckConclusion] = None,
 931        result: t.Optional[ModelTextTestResult] = None,
 932        traceback: t.Optional[str] = None,
 933    ) -> None:
 934        """
 935        Updates the status of tests for code in the PR
 936        """
 937
 938        def conclusion_handler(
 939            conclusion: GithubCheckConclusion,
 940            result: t.Optional[ModelTextTestResult],
 941        ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]:
 942            if result:
 943                # Clear out console
 944                self._console.consume_captured_output()
 945                self._console.log_test_results(
 946                    result,
 947                    self._context.test_connection_config._engine_adapter.DIALECT,
 948                )
 949                test_summary = self._console.consume_captured_output()
 950                test_title = "Tests Passed" if result.wasSuccessful() else "Tests Failed"
 951                test_conclusion = (
 952                    GithubCheckConclusion.SUCCESS
 953                    if result.wasSuccessful()
 954                    else GithubCheckConclusion.FAILURE
 955                )
 956                return test_conclusion, test_title, test_summary
 957            if traceback:
 958                self._console._print(traceback)
 959
 960            test_title = "Skipped Tests" if conclusion.is_skipped else "Tests Failed"
 961            return conclusion, test_title, traceback
 962
 963        self._update_check_handler(
 964            check_name="SQLMesh - Run Unit Tests",
 965            status=status,
 966            conclusion=conclusion,
 967            status_handler=lambda status: (
 968                {
 969                    GithubCheckStatus.IN_PROGRESS: "Running Tests",
 970                    GithubCheckStatus.QUEUED: "Waiting to Run Tests",
 971                }[status],
 972                None,
 973            ),
 974            conclusion_handler=functools.partial(conclusion_handler, result=result),
 975        )
 976
 977    def update_required_approval_check(
 978        self, status: GithubCheckStatus, conclusion: t.Optional[GithubCheckConclusion] = None
 979    ) -> None:
 980        """
 981        Updates the status of the merge commit for the required approval.
 982        """
 983
 984        def conclusion_handler(
 985            conclusion: GithubCheckConclusion,
 986        ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]:
 987            test_summary = "**List of possible required approvers:**\n"
 988            for user in self._required_approvers:
 989                test_summary += f"- `{user.github_username or user.username}`\n"
 990
 991            title = (
 992                f"Obtained approval from required approvers: {', '.join([user.github_username or user.username for user in self._required_approvers_with_approval])}"
 993                if conclusion.is_success
 994                else "Need a Required Approval"
 995            )
 996            return conclusion, title, test_summary
 997
 998        # If we get a skip that means required approvers is not configured therefore it does not need to be displayed
 999        if conclusion and conclusion.is_skipped:
1000            return
1001
1002        self._update_check_handler(
1003            check_name="SQLMesh - Has Required Approval",
1004            status=status,
1005            conclusion=conclusion,
1006            status_handler=lambda status: (
1007                {
1008                    GithubCheckStatus.IN_PROGRESS: "Checking if we have required Approvers",
1009                    GithubCheckStatus.QUEUED: "Waiting to Check if we have required Approvers",
1010                }[status],
1011                None,
1012            ),
1013            conclusion_handler=conclusion_handler,
1014        )
1015
1016    def update_pr_environment_check(
1017        self, status: GithubCheckStatus, exception: t.Optional[Exception] = None
1018    ) -> t.Optional[GithubCheckConclusion]:
1019        """
1020        Updates the status of the merge commit for the PR environment.
1021        """
1022        conclusion: t.Optional[GithubCheckConclusion] = None
1023        if isinstance(exception, (NoChangesPlanError, TestFailure, LinterError)):
1024            conclusion = GithubCheckConclusion.SKIPPED
1025        elif isinstance(exception, UncategorizedPlanError):
1026            conclusion = GithubCheckConclusion.ACTION_REQUIRED
1027        elif exception:
1028            conclusion = GithubCheckConclusion.FAILURE
1029        elif status.is_completed:
1030            conclusion = GithubCheckConclusion.SUCCESS
1031
1032        check_title_static = "PR Virtual Data Environment: "
1033        check_title = check_title_static + self.pr_environment_name
1034
1035        def conclusion_handler(
1036            conclusion: GithubCheckConclusion, exception: t.Optional[Exception]
1037        ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]:
1038            summary = self.get_pr_environment_summary(conclusion, exception)
1039            self._append_output("pr_environment_name", self.pr_environment_name)
1040            return conclusion, check_title, summary
1041
1042        self._update_check_handler(
1043            check_name="SQLMesh - PR Environment Synced",
1044            status=status,
1045            conclusion=conclusion,
1046            status_handler=lambda status: (
1047                check_title,
1048                {
1049                    GithubCheckStatus.QUEUED: f":pause_button: Waiting to create or update PR Environment `{self.pr_environment_name}`",
1050                    GithubCheckStatus.IN_PROGRESS: f":rocket: Creating or Updating PR Environment `{self.pr_environment_name}`",
1051                }[status],
1052            ),
1053            conclusion_handler=functools.partial(conclusion_handler, exception=exception),
1054        )
1055        return conclusion
1056
1057    def update_prod_plan_preview_check(
1058        self,
1059        status: GithubCheckStatus,
1060        conclusion: t.Optional[GithubCheckConclusion] = None,
1061        summary: t.Optional[str] = None,
1062    ) -> None:
1063        """
1064        Updates the status of the merge commit for the prod plan preview.
1065        """
1066
1067        def conclusion_handler(
1068            conclusion: GithubCheckConclusion, summary: t.Optional[str] = None
1069        ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]:
1070            conclusion_to_title = {
1071                GithubCheckConclusion.SUCCESS: "Prod Plan Preview",
1072                GithubCheckConclusion.CANCELLED: "Cancelled generating prod plan preview",
1073                GithubCheckConclusion.SKIPPED: "Skipped generating prod plan preview since PR was not synchronized",
1074                GithubCheckConclusion.FAILURE: "Failed to generate prod plan preview",
1075            }
1076            title = conclusion_to_title.get(
1077                conclusion, f"Got an unexpected conclusion: {conclusion.value}"
1078            )
1079            if conclusion == GithubCheckConclusion.SUCCESS and summary:
1080                summary = (
1081                    f"This is a preview that shows the differences between this PR environment `{self.pr_environment_name}` and `prod`.\n\n"
1082                    "These are the changes that would be deployed.\n\n"
1083                ) + summary
1084
1085            return conclusion, title, summary
1086
1087        self._update_check_handler(
1088            check_name="SQLMesh - Prod Plan Preview",
1089            status=status,
1090            conclusion=conclusion,
1091            status_handler=lambda status: (
1092                {
1093                    GithubCheckStatus.IN_PROGRESS: "Generating Prod Plan",
1094                    GithubCheckStatus.QUEUED: "Waiting to Generate Prod Plan",
1095                }[status],
1096                None,
1097            ),
1098            conclusion_handler=functools.partial(conclusion_handler, summary=summary),
1099        )
1100
1101    def update_prod_environment_check(
1102        self,
1103        status: GithubCheckStatus,
1104        conclusion: t.Optional[GithubCheckConclusion] = None,
1105        skip_reason: t.Optional[str] = None,
1106        plan_error: t.Optional[PlanError] = None,
1107    ) -> None:
1108        """
1109        Updates the status of the merge commit for the prod environment.
1110        """
1111
1112        def conclusion_handler(
1113            conclusion: GithubCheckConclusion, skip_reason: t.Optional[str] = None
1114        ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]:
1115            conclusion_to_title = {
1116                GithubCheckConclusion.SUCCESS: "Deployed to Prod",
1117                GithubCheckConclusion.CANCELLED: "Cancelled deploying to prod",
1118                GithubCheckConclusion.SKIPPED: "Skipped deployment",
1119                GithubCheckConclusion.FAILURE: "Failed to deploy to prod",
1120                GithubCheckConclusion.ACTION_REQUIRED: "Failed due to error applying plan",
1121            }
1122            title = (
1123                conclusion_to_title.get(conclusion)
1124                or f"Got an unexpected conclusion: {conclusion.value}"
1125            )
1126            if conclusion.is_skipped:
1127                summary = skip_reason
1128            elif conclusion.is_failure:
1129                captured_errors = self._console.consume_captured_errors()
1130                summary = (
1131                    captured_errors or f"{title}\n\n**Error:**\n```\n{traceback.format_exc()}\n```"
1132                )
1133            elif conclusion.is_action_required:
1134                if plan_error:
1135                    summary = f"**Plan error:**\n```\n{plan_error}\n```"
1136                else:
1137                    summary = "Got an action required conclusion but no plan error was provided. This is unexpected."
1138            else:
1139                summary = "**Generated Prod Plan**\n" + self.get_plan_summary(self.prod_plan)
1140
1141            return conclusion, title, summary
1142
1143        self._update_check_handler(
1144            check_name="SQLMesh - Prod Environment Synced",
1145            status=status,
1146            conclusion=conclusion,
1147            status_handler=lambda status: (
1148                {
1149                    GithubCheckStatus.IN_PROGRESS: "Deploying to Prod",
1150                    GithubCheckStatus.QUEUED: "Waiting to see if we can deploy to prod",
1151                }[status],
1152                None,
1153            ),
1154            conclusion_handler=functools.partial(conclusion_handler, skip_reason=skip_reason),
1155        )
1156
1157    def try_merge_pr(self) -> None:
1158        """
1159        Merges the PR using the merge method defined in the bot config. If one is not defined then a merge is not
1160        performed
1161        """
1162        if self.bot_config.merge_method:
1163            logger.debug(f"Merging PR with merge method: {self.bot_config.merge_method.value}")
1164            self._pull_request.merge(merge_method=self.bot_config.merge_method.value)
1165        else:
1166            logger.debug("No merge method defined so skipping merge")
1167
1168    def get_command_from_comment(self) -> BotCommand:
1169        """
1170        Gets the command from the comment
1171        """
1172        if not self._event.is_comment_added:
1173            logger.debug("Event is not a comment so returning invalid")
1174            return BotCommand.INVALID
1175        if self._event.pull_request_comment_body is None:
1176            raise CICDBotError("Unable to get comment body")
1177        logger.debug(f"Getting command from comment body: {self._event.pull_request_comment_body}")
1178        return BotCommand.from_comment_body(
1179            self._event.pull_request_comment_body, self.bot_config.command_namespace
1180        )
1181
1182    def _chunk_up_api_message(self, message: str) -> t.List[str]:
1183        """
1184        Chunks up the message into `MAX_BYTE_LENGTH` byte chunks
1185        """
1186        message_encoded = message.encode("utf-8")
1187        return [
1188            message_encoded[i : i + self.MAX_BYTE_LENGTH].decode("utf-8", "ignore")
1189            for i in range(0, len(message_encoded), self.MAX_BYTE_LENGTH)
1190        ]
1191
1192    @property
1193    def running_in_github_actions(self) -> bool:
1194        return os.environ.get("GITHUB_ACTIONS", None) == "true"
1195
1196    @property
1197    def version_info(self) -> str:
1198        from sqlmesh.cli.main import _sqlmesh_version
1199
1200        return _sqlmesh_version()
1201
1202    def _generate_plan_flags_section(
1203        self, user_provided_flags: t.Dict[str, UserProvidedFlags]
1204    ) -> str:
1205        # collapsed section syntax:
1206        # https://docs.github.com/en/get-started/writing-on-github/working-with-advanced-formatting/organizing-information-with-collapsed-sections#creating-a-collapsed-section
1207        section = "<details>\n\n<summary>Plan flags</summary>\n\n"
1208        for flag_name, flag_value in user_provided_flags.items():
1209            section += f"- `{flag_name}` = `{flag_value}`\n"
1210        section += "\n</details>"
1211
1212        return section
1213
1214    def _generate_pr_environment_summary_intro(self) -> str:
1215        note = ""
1216        subset_reasons = []
1217
1218        if self.bot_config.skip_pr_backfill:
1219            subset_reasons.append("`skip_pr_backfill` is enabled")
1220
1221        if default_pr_start := self.bot_config.default_pr_start:
1222            subset_reasons.append(f"`default_pr_start` is set to `{default_pr_start}`")
1223
1224        if subset_reasons:
1225            note = (
1226                "> [!IMPORTANT]\n"
1227                f"> This PR environment may only contain a subset of data because:\n"
1228                + "\n".join(f"> - {r}" for r in subset_reasons)
1229                + "\n"
1230                "> \n"
1231                "> This means that deploying to `prod` may not be a simple virtual update if there is still some data to load.\n"
1232                "> See `Dates not loaded in PR` below or the `Prod Plan Preview` check for more information.\n\n"
1233            )
1234
1235        return (
1236            f"Here is a summary of data that has been loaded into the PR environment `{self.pr_environment_name}` and could be deployed to `prod`.\n\n"
1237            + note
1238        )
1239
1240    def _generate_pr_environment_summary_list(self, plan: Plan) -> str:
1241        added_snapshot_ids = set(plan.context_diff.added)
1242        modified_snapshot_ids = set(
1243            s.snapshot_id for s, _ in plan.context_diff.modified_snapshots.values()
1244        )
1245        removed_snapshot_ids = set(plan.context_diff.removed_snapshots.keys())
1246
1247        # note: we sort these to get a deterministic order for the output tests
1248        table_records = sorted(
1249            [
1250                SnapshotSummaryRecord(snapshot_id=snapshot_id, plan=plan)
1251                for snapshot_id in (
1252                    added_snapshot_ids | modified_snapshot_ids | removed_snapshot_ids
1253                )
1254            ],
1255            key=lambda r: r.display_name,
1256        )
1257
1258        sections = [
1259            ("### Added", [r for r in table_records if r.is_added]),
1260            ("### Removed", [r for r in table_records if r.is_removed]),
1261            ("### Directly Modified", [r for r in table_records if r.is_directly_modified]),
1262            ("### Indirectly Modified", [r for r in table_records if r.is_indirectly_modified]),
1263            (
1264                "### Metadata Updated",
1265                [r for r in table_records if r.is_metadata_updated and not r.is_modified],
1266            ),
1267        ]
1268
1269        summary = ""
1270        for title, records in sections:
1271            if records:
1272                summary += f"\n{title}\n"
1273
1274            for record in records:
1275                summary += f"{record.as_markdown_list_item}\n"
1276
1277        return summary
GithubController( paths: Union[pathlib.Path, Iterable[pathlib.Path]], token: str, config: Union[sqlmesh.core.config.root.Config, str, NoneType] = None, event: Optional[GithubEvent] = None, client: Optional[github.MainClass.Github] = None, context: Optional[sqlmesh.core.context.Context] = None)
287    def __init__(
288        self,
289        paths: t.Union[Path, t.Iterable[Path]],
290        token: str,
291        config: t.Optional[t.Union[Config, str]] = None,
292        event: t.Optional[GithubEvent] = None,
293        client: t.Optional[Github] = None,
294        context: t.Optional[Context] = None,
295    ) -> None:
296        from github import Github
297
298        logger.debug(f"Initializing GithubController with paths: {paths} and config: {config}")
299
300        self.config = config
301        self._paths = paths
302        self._token = token
303        self._event = event or GithubEvent.from_env()
304        logger.debug(f"Github event: {json.dumps(self._event.payload)}")
305        self._pr_plan_builder: t.Optional[PlanBuilder] = None
306        self._prod_plan_builder: t.Optional[PlanBuilder] = None
307        self._prod_plan_with_gaps_builder: t.Optional[PlanBuilder] = None
308        self._check_run_mapping: t.Dict[str, CheckRun] = {}
309
310        if not isinstance(get_console(), MarkdownConsole):
311            raise CICDBotError("Console must be a markdown console.")
312        self._console = t.cast(MarkdownConsole, get_console())
313
314        from github.Consts import DEFAULT_BASE_URL
315        from github.Auth import Token
316
317        self._client: Github = client or Github(
318            base_url=os.environ.get("GITHUB_API_URL", DEFAULT_BASE_URL), auth=Token(self._token)
319        )
320
321        self._repo: Repository = self._client.get_repo(
322            self._event.pull_request_info.full_repo_path, lazy=True
323        )
324        self._pull_request: PullRequest = self._repo.get_pull(
325            self._event.pull_request_info.pr_number
326        )
327        self._issue: Issue = self._repo.get_issue(self._event.pull_request_info.pr_number)
328        self._reviews: t.Iterable[PullRequestReview] = self._pull_request.get_reviews()
329        # TODO: The python module says that user names can be None and this is not currently handled
330        self._approvers: t.Set[str] = {
331            review.user.login or "UNKNOWN"
332            for review in self._reviews
333            if review.state.lower() == "approved"
334        }
335        logger.debug(f"Approvers: {', '.join(self._approvers)}")
336        self._context: Context = context or Context(paths=self._paths, config=self.config)
337
338        # Bot config needs the context to be initialized
339        logger.debug(f"Bot config: {self.bot_config.json(indent=2)}")
BOT_HEADER_MSG = ':robot: **SQLMesh Bot Info** :robot:'
MAX_BYTE_LENGTH = 65535
config
deploy_command_enabled: bool
341    @property
342    def deploy_command_enabled(self) -> bool:
343        return self.bot_config.enable_deploy_command
is_comment_added: bool
345    @property
346    def is_comment_added(self) -> bool:
347        return self._event.is_comment_added
pr_environment_name: str
367    @property
368    def pr_environment_name(self) -> str:
369        return Environment.sanitize_name(
370            "_".join(
371                [
372                    self.bot_config.pr_environment_name or self._event.pull_request_info.repo,
373                    str(self._event.pull_request_info.pr_number),
374                ]
375            )
376        )
do_required_approval_check: bool
378    @property
379    def do_required_approval_check(self) -> bool:
380        """We want to skip required approval check if no users have this role"""
381        do_required_approval_check = bool(self._required_approvers)
382        logger.debug(f"Do required approval check: {do_required_approval_check}")
383        return do_required_approval_check

We want to skip required approval check if no users have this role

has_required_approval: bool
385    @property
386    def has_required_approval(self) -> bool:
387        """
388        Check if the PR has a required approver.
389
390        TODO: Allow defining requiring some number, or all, required approvers.
391        """
392        if not self._required_approvers or self._required_approvers_with_approval:
393            logger.debug("Has required Approval")
394            return True
395        logger.debug("Does not have required approval")
396        return False

Check if the PR has a required approver.

TODO: Allow defining requiring some number, or all, required approvers.

pr_plan: sqlmesh.core.plan.definition.Plan
398    @property
399    def pr_plan(self) -> Plan:
400        if not self._pr_plan_builder:
401            self._pr_plan_builder = self._context.plan_builder(
402                environment=self.pr_environment_name,
403                skip_tests=True,
404                skip_linter=True,
405                categorizer_config=self.bot_config.auto_categorize_changes,
406                start=self.bot_config.default_pr_start,
407                min_intervals=self.bot_config.pr_min_intervals,
408                skip_backfill=self.bot_config.skip_pr_backfill,
409                include_unmodified=self.bot_config.pr_include_unmodified,
410                forward_only=self.forward_only_plan,
411            )
412        assert self._pr_plan_builder
413        return self._pr_plan_builder.build()
pr_plan_or_none: Optional[sqlmesh.core.plan.definition.Plan]
415    @property
416    def pr_plan_or_none(self) -> t.Optional[Plan]:
417        try:
418            return self.pr_plan
419        except:
420            return None
pr_plan_flags: Optional[Dict[str, Union[datetime.date, datetime.datetime, str, int, float, bool, List[str]]]]
422    @property
423    def pr_plan_flags(self) -> t.Optional[t.Dict[str, UserProvidedFlags]]:
424        if pr_plan := self.pr_plan_or_none:
425            return pr_plan.user_provided_flags
426        if pr_plan_builder := self._pr_plan_builder:
427            return pr_plan_builder._user_provided_flags
428        return None
prod_plan: sqlmesh.core.plan.definition.Plan
430    @property
431    def prod_plan(self) -> Plan:
432        if not self._prod_plan_builder:
433            self._prod_plan_builder = self._context.plan_builder(
434                c.PROD,
435                no_gaps=True,
436                skip_tests=True,
437                skip_linter=True,
438                categorizer_config=self.bot_config.auto_categorize_changes,
439                run=self.bot_config.run_on_deploy_to_prod,
440                forward_only=self.forward_only_plan,
441            )
442        assert self._prod_plan_builder
443        return self._prod_plan_builder.build()
prod_plan_with_gaps: sqlmesh.core.plan.definition.Plan
445    @property
446    def prod_plan_with_gaps(self) -> Plan:
447        if not self._prod_plan_with_gaps_builder:
448            self._prod_plan_with_gaps_builder = self._context.plan_builder(
449                c.PROD,
450                # this is required to highlight any data gaps between this PR environment and prod (since PR environments may only contain a subset of data)
451                no_gaps=False,
452                skip_tests=True,
453                skip_linter=True,
454                categorizer_config=self.bot_config.auto_categorize_changes,
455                run=self.bot_config.run_on_deploy_to_prod,
456                forward_only=self.forward_only_plan,
457            )
458        assert self._prod_plan_with_gaps_builder
459        return self._prod_plan_with_gaps_builder.build()
461    @property
462    def bot_config(self) -> GithubCICDBotConfig:
463        bot_config = self._context.config.cicd_bot or GithubCICDBotConfig(
464            auto_categorize_changes=self._context.auto_categorize_changes
465        )
466        return bot_config
468    @property
469    def modified_snapshots(self) -> t.Dict[SnapshotId, t.Union[Snapshot, SnapshotTableInfo]]:
470        return self.prod_plan_with_gaps.modified_snapshots
removed_snapshots: Set[sqlmesh.core.snapshot.definition.SnapshotId]
472    @property
473    def removed_snapshots(self) -> t.Set[SnapshotId]:
474        return set(self.prod_plan_with_gaps.context_diff.removed_snapshots)
pr_targets_prod_branch: bool
476    @property
477    def pr_targets_prod_branch(self) -> bool:
478        return self._pull_request.base.ref in self.bot_config.prod_branch_names
forward_only_plan: bool
480    @property
481    def forward_only_plan(self) -> bool:
482        default = self._context.config.plan.forward_only
483        head_ref = self._pull_request.head.ref
484        if isinstance(head_ref, str):
485            return head_ref.endswith(self.bot_config.forward_only_branch_suffix) or default
486        return default
def get_forward_only_plan_post_deployment_tip(self, plan: sqlmesh.core.plan.definition.Plan) -> str:
500    def get_forward_only_plan_post_deployment_tip(self, plan: Plan) -> str:
501        if not plan.forward_only:
502            return ""
503
504        example_model_name = "<model name>"
505        for snapshot_id in sorted(plan.snapshots):
506            snapshot = plan.snapshots[snapshot_id]
507            if snapshot.is_incremental:
508                example_model_name = snapshot.node.name
509                break
510
511        return (
512            "> [!TIP]\n"
513            "> In order to see this forward-only plan retroactively apply to historical intervals on the production model, run the below for date ranges in scope:\n"
514            "> \n"
515            f"> `$ sqlmesh plan --restate-model {example_model_name} --start YYYY-MM-DD --end YYYY-MM-DD`\n"
516            ">\n"
517            "> Learn more: https://sqlmesh.readthedocs.io/en/stable/concepts/plans/?h=restate#restatement-plans"
518        )
def get_plan_summary(self, plan: sqlmesh.core.plan.definition.Plan) -> str:
520    def get_plan_summary(self, plan: Plan) -> str:
521        # use Verbosity.VERY_VERBOSE to prevent the list of models from being truncated
522        # this is particularly important for the "Models needing backfill" list because
523        # there is no easy way to tell this otherwise
524        orig_verbosity = self._console.verbosity
525        self._console.verbosity = Verbosity.VERY_VERBOSE
526
527        try:
528            # Clear out any output that might exist from prior steps
529            self._console.consume_captured_output()
530            if plan.restatements:
531                self._console._print("\n**Restating models**\n")
532            else:
533                self._console.show_environment_difference_summary(
534                    context_diff=plan.context_diff,
535                    no_diff=False,
536                )
537            if plan.context_diff.has_changes:
538                self._console.show_model_difference_summary(
539                    context_diff=plan.context_diff,
540                    environment_naming_info=plan.environment_naming_info,
541                    default_catalog=self._context.default_catalog,
542                    no_diff=False,
543                )
544            difference_summary = self._console.consume_captured_output()
545            self._console._show_missing_dates(plan, self._context.default_catalog)
546            missing_dates = self._console.consume_captured_output()
547
548            plan_flags_section = (
549                f"\n\n{self._generate_plan_flags_section(plan.user_provided_flags)}"
550                if plan.user_provided_flags
551                else ""
552            )
553
554            if not difference_summary and not missing_dates:
555                return f"No changes to apply.{plan_flags_section}"
556
557            warnings_block = self._console.consume_captured_warnings()
558            errors_block = self._console.consume_captured_errors()
559
560            return f"{warnings_block}{errors_block}{difference_summary}\n{missing_dates}{plan_flags_section}"
561        except PlanError as e:
562            logger.exception("Plan failed to generate")
563            return f"Plan failed to generate. Check for pending or unresolved changes. Error: {e}"
564        finally:
565            self._console.verbosity = orig_verbosity
def get_pr_environment_summary( self, conclusion: GithubCheckConclusion, exception: Optional[Exception] = None) -> str:
567    def get_pr_environment_summary(
568        self, conclusion: GithubCheckConclusion, exception: t.Optional[Exception] = None
569    ) -> str:
570        heading = ""
571        summary = ""
572
573        if conclusion.is_success:
574            summary = self._get_pr_environment_summary_success()
575        elif conclusion.is_action_required:
576            heading = f":warning: Action Required to create or update PR Environment `{self.pr_environment_name}` :warning:"
577            summary = self._get_pr_environment_summary_action_required(exception)
578        elif conclusion.is_failure:
579            heading = (
580                f":x: Failed to create or update PR Environment `{self.pr_environment_name}` :x:"
581            )
582            summary = self._get_pr_environment_summary_failure(exception)
583        elif conclusion.is_skipped:
584            heading = f":next_track_button: Skipped creating or updating PR Environment `{self.pr_environment_name}` :next_track_button:"
585            summary = self._get_pr_environment_summary_skipped(exception)
586        else:
587            heading = f":interrobang: Got an unexpected conclusion: {conclusion.value}"
588
589        # note: we just add warnings here, errors will be covered by the "failure" conclusion
590        if warnings := self._console.consume_captured_warnings():
591            summary = f"{warnings}\n{summary}"
592
593        return f"{heading}\n\n{summary}".strip()
def run_tests(self) -> Tuple[sqlmesh.core.test.result.ModelTextTestResult, str]:
666    def run_tests(self) -> t.Tuple[ModelTextTestResult, str]:
667        """
668        Run tests for the PR
669        """
670        return self._context._run_tests(verbosity=Verbosity.VERBOSE)

Run tests for the PR

def run_linter(self) -> None:
672    def run_linter(self) -> None:
673        """
674        Run linter for the PR
675        """
676        self._console.consume_captured_output()
677        self._context.lint_models()

Run linter for the PR

def update_sqlmesh_comment_info( self, value: str, *, dedup_regex: Optional[str]) -> Tuple[bool, github.IssueComment.IssueComment]:
721    def update_sqlmesh_comment_info(
722        self, value: str, *, dedup_regex: t.Optional[str]
723    ) -> t.Tuple[bool, IssueComment]:
724        """
725        Update the SQLMesh PR Comment for the given lookup key with the given value. If a comment does not exist then
726        it creates one. It determines the comment to update by looking for a comment with the header. If a dedup
727        regex is provided then it will check if the value already exists in the comment and if so it will not update
728        """
729        comment = self._get_or_create_comment()
730        if dedup_regex:
731            # If we find a match against the regex then we just return since the comment has already been posted
732            if seq_get(re.findall(dedup_regex, comment.body), 0):
733                return False, comment
734        full_comment = f"{comment.body}\n{value}"
735        body, *truncated = self._chunk_up_api_message(f"{full_comment}")
736        if truncated:
737            logger.warning(
738                f"Comment body was too long so we truncated it. Full text: {full_comment}"
739            )
740        comment.edit(body=body)
741        return True, comment

Update the SQLMesh PR Comment for the given lookup key with the given value. If a comment does not exist then it creates one. It determines the comment to update by looking for a comment with the header. If a dedup regex is provided then it will check if the value already exists in the comment and if so it will not update

def update_pr_environment(self) -> None:
743    def update_pr_environment(self) -> None:
744        """
745        Creates a PR environment from the logic present in the PR. If the PR contains changes that are
746        uncategorized, then an error will be raised.
747        """
748        self._console.consume_captured_output()  # clear output buffer
749        self._context.apply(self.pr_plan)  # will raise if PR environment creation fails
750
751        # update PR info comment
752        vde_title = "- :eyes: To **review** this PR's changes, use virtual data environment:"
753        comment_value = f"{vde_title}\n  - `{self.pr_environment_name}`"
754        if self.bot_config.enable_deploy_command:
755            full_command = f"{self.bot_config.command_namespace or ''}/deploy"
756            comment_value += f"\n- :arrow_forward: To **apply** this PR's plan to prod, comment:\n  - `{full_command}`"
757        dedup_regex = vde_title.replace("*", r"\*") + r".*"
758        updated_comment, _ = self.update_sqlmesh_comment_info(
759            value=comment_value,
760            dedup_regex=dedup_regex,
761        )
762        if updated_comment:
763            self._append_output("created_pr_environment", "true")

Creates a PR environment from the logic present in the PR. If the PR contains changes that are uncategorized, then an error will be raised.

def deploy_to_prod(self) -> None:
765    def deploy_to_prod(self) -> None:
766        """
767        Attempts to deploy a plan to prod. If the plan is not up-to-date or has gaps then it will raise.
768        """
769        # If the PR is already merged then we will not deploy to prod if this event was triggered prior to the merge.
770        # The deploy can still happen if the workflow is configured to listen for `closed` events.
771        if self._pull_request.merged and not self._event.is_pull_request_closed:
772            raise CICDBotError(
773                "PR is already merged and this event was triggered prior to the merge."
774            )
775        merge_status = self._get_merge_state_status()
776        if self.bot_config.check_if_blocked_on_deploy_to_prod and merge_status.is_blocked:
777            raise CICDBotError(
778                "Branch protection or ruleset requirement is likely not satisfied, e.g. missing CODEOWNERS approval. "
779                "Please check PR and resolve any issues. To disable this check, set `check_if_blocked_on_deploy_to_prod` to false in the bot configuration."
780            )
781        if merge_status.is_dirty:
782            raise CICDBotError(
783                "Merge commit cannot be cleanly created. Likely from a merge conflict. "
784                "Please check PR and resolve any issues."
785            )
786        plan_summary = f"""<details>
787  <summary>:ship: Prod Plan Being Applied</summary>
788
789{self.get_plan_summary(self.prod_plan)}
790</details>
791
792"""
793        if self.forward_only_plan:
794            plan_summary = (
795                f"{self.get_forward_only_plan_post_deployment_tip(self.prod_plan)}\n{plan_summary}"
796            )
797
798        self.update_sqlmesh_comment_info(
799            value=plan_summary,
800            dedup_regex=None,
801        )
802        self._context.apply(self.prod_plan)

Attempts to deploy a plan to prod. If the plan is not up-to-date or has gaps then it will raise.

def try_invalidate_pr_environment(self) -> None:
804    def try_invalidate_pr_environment(self) -> None:
805        """
806        Marks the PR environment for garbage collection.
807        """
808        if self.bot_config.invalidate_environment_after_deploy:
809            self._context.invalidate_environment(self.pr_environment_name)

Marks the PR environment for garbage collection.

def update_linter_check( self, status: GithubCheckStatus, conclusion: Optional[GithubCheckConclusion] = None) -> None:
896    def update_linter_check(
897        self,
898        status: GithubCheckStatus,
899        conclusion: t.Optional[GithubCheckConclusion] = None,
900    ) -> None:
901        if not self._context.config.linter.enabled:
902            return
903
904        def conclusion_handler(
905            conclusion: GithubCheckConclusion,
906        ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]:
907            linter_summary = self._console.consume_captured_output() or "Linter Success"
908
909            title = "Linter results"
910
911            return conclusion, title, linter_summary
912
913        self._update_check_handler(
914            check_name="SQLMesh - Linter",
915            status=status,
916            conclusion=conclusion,
917            status_handler=lambda status: (
918                {
919                    GithubCheckStatus.IN_PROGRESS: "Running linter",
920                    GithubCheckStatus.QUEUED: "Waiting to Run linter",
921                }[status],
922                None,
923            ),
924            conclusion_handler=conclusion_handler,
925        )
def update_test_check( self, status: GithubCheckStatus, conclusion: Optional[GithubCheckConclusion] = None, result: Optional[sqlmesh.core.test.result.ModelTextTestResult] = None, traceback: Optional[str] = None) -> None:
927    def update_test_check(
928        self,
929        status: GithubCheckStatus,
930        conclusion: t.Optional[GithubCheckConclusion] = None,
931        result: t.Optional[ModelTextTestResult] = None,
932        traceback: t.Optional[str] = None,
933    ) -> None:
934        """
935        Updates the status of tests for code in the PR
936        """
937
938        def conclusion_handler(
939            conclusion: GithubCheckConclusion,
940            result: t.Optional[ModelTextTestResult],
941        ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]:
942            if result:
943                # Clear out console
944                self._console.consume_captured_output()
945                self._console.log_test_results(
946                    result,
947                    self._context.test_connection_config._engine_adapter.DIALECT,
948                )
949                test_summary = self._console.consume_captured_output()
950                test_title = "Tests Passed" if result.wasSuccessful() else "Tests Failed"
951                test_conclusion = (
952                    GithubCheckConclusion.SUCCESS
953                    if result.wasSuccessful()
954                    else GithubCheckConclusion.FAILURE
955                )
956                return test_conclusion, test_title, test_summary
957            if traceback:
958                self._console._print(traceback)
959
960            test_title = "Skipped Tests" if conclusion.is_skipped else "Tests Failed"
961            return conclusion, test_title, traceback
962
963        self._update_check_handler(
964            check_name="SQLMesh - Run Unit Tests",
965            status=status,
966            conclusion=conclusion,
967            status_handler=lambda status: (
968                {
969                    GithubCheckStatus.IN_PROGRESS: "Running Tests",
970                    GithubCheckStatus.QUEUED: "Waiting to Run Tests",
971                }[status],
972                None,
973            ),
974            conclusion_handler=functools.partial(conclusion_handler, result=result),
975        )

Updates the status of tests for code in the PR

def update_required_approval_check( self, status: GithubCheckStatus, conclusion: Optional[GithubCheckConclusion] = None) -> None:
 977    def update_required_approval_check(
 978        self, status: GithubCheckStatus, conclusion: t.Optional[GithubCheckConclusion] = None
 979    ) -> None:
 980        """
 981        Updates the status of the merge commit for the required approval.
 982        """
 983
 984        def conclusion_handler(
 985            conclusion: GithubCheckConclusion,
 986        ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]:
 987            test_summary = "**List of possible required approvers:**\n"
 988            for user in self._required_approvers:
 989                test_summary += f"- `{user.github_username or user.username}`\n"
 990
 991            title = (
 992                f"Obtained approval from required approvers: {', '.join([user.github_username or user.username for user in self._required_approvers_with_approval])}"
 993                if conclusion.is_success
 994                else "Need a Required Approval"
 995            )
 996            return conclusion, title, test_summary
 997
 998        # If we get a skip that means required approvers is not configured therefore it does not need to be displayed
 999        if conclusion and conclusion.is_skipped:
1000            return
1001
1002        self._update_check_handler(
1003            check_name="SQLMesh - Has Required Approval",
1004            status=status,
1005            conclusion=conclusion,
1006            status_handler=lambda status: (
1007                {
1008                    GithubCheckStatus.IN_PROGRESS: "Checking if we have required Approvers",
1009                    GithubCheckStatus.QUEUED: "Waiting to Check if we have required Approvers",
1010                }[status],
1011                None,
1012            ),
1013            conclusion_handler=conclusion_handler,
1014        )

Updates the status of the merge commit for the required approval.

def update_pr_environment_check( self, status: GithubCheckStatus, exception: Optional[Exception] = None) -> Optional[GithubCheckConclusion]:
1016    def update_pr_environment_check(
1017        self, status: GithubCheckStatus, exception: t.Optional[Exception] = None
1018    ) -> t.Optional[GithubCheckConclusion]:
1019        """
1020        Updates the status of the merge commit for the PR environment.
1021        """
1022        conclusion: t.Optional[GithubCheckConclusion] = None
1023        if isinstance(exception, (NoChangesPlanError, TestFailure, LinterError)):
1024            conclusion = GithubCheckConclusion.SKIPPED
1025        elif isinstance(exception, UncategorizedPlanError):
1026            conclusion = GithubCheckConclusion.ACTION_REQUIRED
1027        elif exception:
1028            conclusion = GithubCheckConclusion.FAILURE
1029        elif status.is_completed:
1030            conclusion = GithubCheckConclusion.SUCCESS
1031
1032        check_title_static = "PR Virtual Data Environment: "
1033        check_title = check_title_static + self.pr_environment_name
1034
1035        def conclusion_handler(
1036            conclusion: GithubCheckConclusion, exception: t.Optional[Exception]
1037        ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]:
1038            summary = self.get_pr_environment_summary(conclusion, exception)
1039            self._append_output("pr_environment_name", self.pr_environment_name)
1040            return conclusion, check_title, summary
1041
1042        self._update_check_handler(
1043            check_name="SQLMesh - PR Environment Synced",
1044            status=status,
1045            conclusion=conclusion,
1046            status_handler=lambda status: (
1047                check_title,
1048                {
1049                    GithubCheckStatus.QUEUED: f":pause_button: Waiting to create or update PR Environment `{self.pr_environment_name}`",
1050                    GithubCheckStatus.IN_PROGRESS: f":rocket: Creating or Updating PR Environment `{self.pr_environment_name}`",
1051                }[status],
1052            ),
1053            conclusion_handler=functools.partial(conclusion_handler, exception=exception),
1054        )
1055        return conclusion

Updates the status of the merge commit for the PR environment.

def update_prod_plan_preview_check( self, status: GithubCheckStatus, conclusion: Optional[GithubCheckConclusion] = None, summary: Optional[str] = None) -> None:
1057    def update_prod_plan_preview_check(
1058        self,
1059        status: GithubCheckStatus,
1060        conclusion: t.Optional[GithubCheckConclusion] = None,
1061        summary: t.Optional[str] = None,
1062    ) -> None:
1063        """
1064        Updates the status of the merge commit for the prod plan preview.
1065        """
1066
1067        def conclusion_handler(
1068            conclusion: GithubCheckConclusion, summary: t.Optional[str] = None
1069        ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]:
1070            conclusion_to_title = {
1071                GithubCheckConclusion.SUCCESS: "Prod Plan Preview",
1072                GithubCheckConclusion.CANCELLED: "Cancelled generating prod plan preview",
1073                GithubCheckConclusion.SKIPPED: "Skipped generating prod plan preview since PR was not synchronized",
1074                GithubCheckConclusion.FAILURE: "Failed to generate prod plan preview",
1075            }
1076            title = conclusion_to_title.get(
1077                conclusion, f"Got an unexpected conclusion: {conclusion.value}"
1078            )
1079            if conclusion == GithubCheckConclusion.SUCCESS and summary:
1080                summary = (
1081                    f"This is a preview that shows the differences between this PR environment `{self.pr_environment_name}` and `prod`.\n\n"
1082                    "These are the changes that would be deployed.\n\n"
1083                ) + summary
1084
1085            return conclusion, title, summary
1086
1087        self._update_check_handler(
1088            check_name="SQLMesh - Prod Plan Preview",
1089            status=status,
1090            conclusion=conclusion,
1091            status_handler=lambda status: (
1092                {
1093                    GithubCheckStatus.IN_PROGRESS: "Generating Prod Plan",
1094                    GithubCheckStatus.QUEUED: "Waiting to Generate Prod Plan",
1095                }[status],
1096                None,
1097            ),
1098            conclusion_handler=functools.partial(conclusion_handler, summary=summary),
1099        )

Updates the status of the merge commit for the prod plan preview.

def update_prod_environment_check( self, status: GithubCheckStatus, conclusion: Optional[GithubCheckConclusion] = None, skip_reason: Optional[str] = None, plan_error: Optional[sqlmesh.utils.errors.PlanError] = None) -> None:
1101    def update_prod_environment_check(
1102        self,
1103        status: GithubCheckStatus,
1104        conclusion: t.Optional[GithubCheckConclusion] = None,
1105        skip_reason: t.Optional[str] = None,
1106        plan_error: t.Optional[PlanError] = None,
1107    ) -> None:
1108        """
1109        Updates the status of the merge commit for the prod environment.
1110        """
1111
1112        def conclusion_handler(
1113            conclusion: GithubCheckConclusion, skip_reason: t.Optional[str] = None
1114        ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]:
1115            conclusion_to_title = {
1116                GithubCheckConclusion.SUCCESS: "Deployed to Prod",
1117                GithubCheckConclusion.CANCELLED: "Cancelled deploying to prod",
1118                GithubCheckConclusion.SKIPPED: "Skipped deployment",
1119                GithubCheckConclusion.FAILURE: "Failed to deploy to prod",
1120                GithubCheckConclusion.ACTION_REQUIRED: "Failed due to error applying plan",
1121            }
1122            title = (
1123                conclusion_to_title.get(conclusion)
1124                or f"Got an unexpected conclusion: {conclusion.value}"
1125            )
1126            if conclusion.is_skipped:
1127                summary = skip_reason
1128            elif conclusion.is_failure:
1129                captured_errors = self._console.consume_captured_errors()
1130                summary = (
1131                    captured_errors or f"{title}\n\n**Error:**\n```\n{traceback.format_exc()}\n```"
1132                )
1133            elif conclusion.is_action_required:
1134                if plan_error:
1135                    summary = f"**Plan error:**\n```\n{plan_error}\n```"
1136                else:
1137                    summary = "Got an action required conclusion but no plan error was provided. This is unexpected."
1138            else:
1139                summary = "**Generated Prod Plan**\n" + self.get_plan_summary(self.prod_plan)
1140
1141            return conclusion, title, summary
1142
1143        self._update_check_handler(
1144            check_name="SQLMesh - Prod Environment Synced",
1145            status=status,
1146            conclusion=conclusion,
1147            status_handler=lambda status: (
1148                {
1149                    GithubCheckStatus.IN_PROGRESS: "Deploying to Prod",
1150                    GithubCheckStatus.QUEUED: "Waiting to see if we can deploy to prod",
1151                }[status],
1152                None,
1153            ),
1154            conclusion_handler=functools.partial(conclusion_handler, skip_reason=skip_reason),
1155        )

Updates the status of the merge commit for the prod environment.

def try_merge_pr(self) -> None:
1157    def try_merge_pr(self) -> None:
1158        """
1159        Merges the PR using the merge method defined in the bot config. If one is not defined then a merge is not
1160        performed
1161        """
1162        if self.bot_config.merge_method:
1163            logger.debug(f"Merging PR with merge method: {self.bot_config.merge_method.value}")
1164            self._pull_request.merge(merge_method=self.bot_config.merge_method.value)
1165        else:
1166            logger.debug("No merge method defined so skipping merge")

Merges the PR using the merge method defined in the bot config. If one is not defined then a merge is not performed

def get_command_from_comment(self) -> BotCommand:
1168    def get_command_from_comment(self) -> BotCommand:
1169        """
1170        Gets the command from the comment
1171        """
1172        if not self._event.is_comment_added:
1173            logger.debug("Event is not a comment so returning invalid")
1174            return BotCommand.INVALID
1175        if self._event.pull_request_comment_body is None:
1176            raise CICDBotError("Unable to get comment body")
1177        logger.debug(f"Getting command from comment body: {self._event.pull_request_comment_body}")
1178        return BotCommand.from_comment_body(
1179            self._event.pull_request_comment_body, self.bot_config.command_namespace
1180        )

Gets the command from the comment

running_in_github_actions: bool
1192    @property
1193    def running_in_github_actions(self) -> bool:
1194        return os.environ.get("GITHUB_ACTIONS", None) == "true"
version_info: str
1196    @property
1197    def version_info(self) -> str:
1198        from sqlmesh.cli.main import _sqlmesh_version
1199
1200        return _sqlmesh_version()
@dataclass
class SnapshotSummaryRecord:
1280@dataclass
1281class SnapshotSummaryRecord:
1282    snapshot_id: SnapshotId
1283    plan: Plan
1284
1285    @property
1286    def snapshot(self) -> Snapshot:
1287        if self.is_removed:
1288            raise ValueError("Removed snapshots only have SnapshotTableInfo available")
1289        return self.plan.snapshots[self.snapshot_id]
1290
1291    @cached_property
1292    def snapshot_table_info(self) -> SnapshotTableInfo:
1293        if self.is_removed:
1294            return self.plan.modified_snapshots[self.snapshot_id].table_info
1295        return self.plan.snapshots[self.snapshot_id].table_info
1296
1297    @property
1298    def display_name(self) -> str:
1299        dialect = None if self.is_removed else self.snapshot.node.dialect
1300        return self.snapshot_table_info.display_name(
1301            self.plan.environment_naming_info, default_catalog=None, dialect=dialect
1302        )
1303
1304    @property
1305    def change_category(self) -> str:
1306        if self.is_removed:
1307            return SNAPSHOT_CHANGE_CATEGORY_STR[SnapshotChangeCategory.BREAKING]
1308
1309        if change_category := self.snapshot.change_category:
1310            return SNAPSHOT_CHANGE_CATEGORY_STR[change_category]
1311
1312        return "Uncategorized"
1313
1314    @property
1315    def is_added(self) -> bool:
1316        return self.snapshot_id in self.plan.context_diff.added
1317
1318    @property
1319    def is_removed(self) -> bool:
1320        return self.snapshot_id in self.plan.context_diff.removed_snapshots
1321
1322    @property
1323    def is_dev_preview(self) -> bool:
1324        return not self.plan.deployability_index.is_deployable(self.snapshot_id)
1325
1326    @property
1327    def is_directly_modified(self) -> bool:
1328        return self.plan.context_diff.directly_modified(self.snapshot_table_info.name)
1329
1330    @property
1331    def is_indirectly_modified(self) -> bool:
1332        return self.plan.context_diff.indirectly_modified(self.snapshot_table_info.name)
1333
1334    @property
1335    def is_modified(self) -> bool:
1336        return self.is_directly_modified or self.is_indirectly_modified
1337
1338    @property
1339    def is_metadata_updated(self) -> bool:
1340        return self.plan.context_diff.metadata_updated(self.snapshot_table_info.name)
1341
1342    @property
1343    def is_incremental(self) -> bool:
1344        return self.snapshot_table_info.is_incremental
1345
1346    @property
1347    def modification_type(self) -> str:
1348        if self.is_directly_modified:
1349            return "Directly modified"
1350        if self.is_indirectly_modified:
1351            return "Indirectly modified"
1352        if self.is_metadata_updated:
1353            return "Metadata updated"
1354
1355        return "Unknown"
1356
1357    @property
1358    def loaded_intervals(self) -> SnapshotIntervals:
1359        if self.is_removed:
1360            raise ValueError("Removed snapshots dont have loaded intervals available")
1361
1362        return SnapshotIntervals(
1363            snapshot_id=self.snapshot_id,
1364            intervals=(
1365                self.snapshot.dev_intervals
1366                if self.snapshot.is_forward_only
1367                else self.snapshot.intervals
1368            ),
1369        )
1370
1371    @property
1372    def loaded_intervals_rendered(self) -> str:
1373        if self.is_removed:
1374            return "REMOVED"
1375
1376        return self._format_intervals(self.loaded_intervals)
1377
1378    @property
1379    def missing_intervals(self) -> t.Optional[SnapshotIntervals]:
1380        return next(
1381            (si for si in self.plan.missing_intervals if si.snapshot_id == self.snapshot_id),
1382            None,
1383        )
1384
1385    @property
1386    def missing_intervals_formatted(self) -> str:
1387        if not self.is_removed and (intervals := self.missing_intervals):
1388            return self._format_intervals(intervals)
1389
1390        return "N/A"
1391
1392    @property
1393    def as_markdown_list_item(self) -> str:
1394        if self.is_removed:
1395            return f"- `{self.display_name}` ({self.change_category})"
1396
1397        how_applied = ""
1398
1399        if not self.is_incremental:
1400            from sqlmesh.core.console import _format_missing_intervals
1401
1402            # note: this is to re-use the '[recreate view]' and '[full refresh]' text and keep it in sync with updates to the CLI
1403            # it doesnt actually use the passed intervals, those are handled differently
1404            how_applied = _format_missing_intervals(self.snapshot, self.loaded_intervals)
1405
1406        how_applied_str = f" [{how_applied}]" if how_applied else ""
1407
1408        item = f"- `{self.display_name}` ({self.change_category})\n"
1409
1410        if self.snapshot_table_info.model_kind_name:
1411            item += f"  **Kind:** {self.snapshot_table_info.model_kind_name}{how_applied_str}\n"
1412
1413        if self.is_incremental:
1414            # in-depth interval info is only relevant for incremental models
1415            item += f"  **Dates loaded in PR:** [{self.loaded_intervals_rendered}]\n"
1416            if self.missing_intervals:
1417                item += f"  **Dates *not* loaded in PR:** [{self.missing_intervals_formatted}]\n"
1418
1419        return item
1420
1421    def _format_intervals(self, intervals: SnapshotIntervals) -> str:
1422        preview_modifier = " (**preview**)" if self.is_dev_preview else ""
1423        return f"{intervals.format_intervals(self.snapshot.node.interval_unit)}{preview_modifier}"
1285    @property
1286    def snapshot(self) -> Snapshot:
1287        if self.is_removed:
1288            raise ValueError("Removed snapshots only have SnapshotTableInfo available")
1289        return self.plan.snapshots[self.snapshot_id]
snapshot_table_info: sqlmesh.core.snapshot.definition.SnapshotTableInfo
1291    @cached_property
1292    def snapshot_table_info(self) -> SnapshotTableInfo:
1293        if self.is_removed:
1294            return self.plan.modified_snapshots[self.snapshot_id].table_info
1295        return self.plan.snapshots[self.snapshot_id].table_info
display_name: str
1297    @property
1298    def display_name(self) -> str:
1299        dialect = None if self.is_removed else self.snapshot.node.dialect
1300        return self.snapshot_table_info.display_name(
1301            self.plan.environment_naming_info, default_catalog=None, dialect=dialect
1302        )
change_category: str
1304    @property
1305    def change_category(self) -> str:
1306        if self.is_removed:
1307            return SNAPSHOT_CHANGE_CATEGORY_STR[SnapshotChangeCategory.BREAKING]
1308
1309        if change_category := self.snapshot.change_category:
1310            return SNAPSHOT_CHANGE_CATEGORY_STR[change_category]
1311
1312        return "Uncategorized"
is_added: bool
1314    @property
1315    def is_added(self) -> bool:
1316        return self.snapshot_id in self.plan.context_diff.added
is_removed: bool
1318    @property
1319    def is_removed(self) -> bool:
1320        return self.snapshot_id in self.plan.context_diff.removed_snapshots
is_dev_preview: bool
1322    @property
1323    def is_dev_preview(self) -> bool:
1324        return not self.plan.deployability_index.is_deployable(self.snapshot_id)
is_directly_modified: bool
1326    @property
1327    def is_directly_modified(self) -> bool:
1328        return self.plan.context_diff.directly_modified(self.snapshot_table_info.name)
is_indirectly_modified: bool
1330    @property
1331    def is_indirectly_modified(self) -> bool:
1332        return self.plan.context_diff.indirectly_modified(self.snapshot_table_info.name)
is_modified: bool
1334    @property
1335    def is_modified(self) -> bool:
1336        return self.is_directly_modified or self.is_indirectly_modified
is_metadata_updated: bool
1338    @property
1339    def is_metadata_updated(self) -> bool:
1340        return self.plan.context_diff.metadata_updated(self.snapshot_table_info.name)
is_incremental: bool
1342    @property
1343    def is_incremental(self) -> bool:
1344        return self.snapshot_table_info.is_incremental
modification_type: str
1346    @property
1347    def modification_type(self) -> str:
1348        if self.is_directly_modified:
1349            return "Directly modified"
1350        if self.is_indirectly_modified:
1351            return "Indirectly modified"
1352        if self.is_metadata_updated:
1353            return "Metadata updated"
1354
1355        return "Unknown"
loaded_intervals: sqlmesh.core.plan.definition.SnapshotIntervals
1357    @property
1358    def loaded_intervals(self) -> SnapshotIntervals:
1359        if self.is_removed:
1360            raise ValueError("Removed snapshots dont have loaded intervals available")
1361
1362        return SnapshotIntervals(
1363            snapshot_id=self.snapshot_id,
1364            intervals=(
1365                self.snapshot.dev_intervals
1366                if self.snapshot.is_forward_only
1367                else self.snapshot.intervals
1368            ),
1369        )
loaded_intervals_rendered: str
1371    @property
1372    def loaded_intervals_rendered(self) -> str:
1373        if self.is_removed:
1374            return "REMOVED"
1375
1376        return self._format_intervals(self.loaded_intervals)
missing_intervals: Optional[sqlmesh.core.plan.definition.SnapshotIntervals]
1378    @property
1379    def missing_intervals(self) -> t.Optional[SnapshotIntervals]:
1380        return next(
1381            (si for si in self.plan.missing_intervals if si.snapshot_id == self.snapshot_id),
1382            None,
1383        )
missing_intervals_formatted: str
1385    @property
1386    def missing_intervals_formatted(self) -> str:
1387        if not self.is_removed and (intervals := self.missing_intervals):
1388            return self._format_intervals(intervals)
1389
1390        return "N/A"
as_markdown_list_item: str
1392    @property
1393    def as_markdown_list_item(self) -> str:
1394        if self.is_removed:
1395            return f"- `{self.display_name}` ({self.change_category})"
1396
1397        how_applied = ""
1398
1399        if not self.is_incremental:
1400            from sqlmesh.core.console import _format_missing_intervals
1401
1402            # note: this is to re-use the '[recreate view]' and '[full refresh]' text and keep it in sync with updates to the CLI
1403            # it doesnt actually use the passed intervals, those are handled differently
1404            how_applied = _format_missing_intervals(self.snapshot, self.loaded_intervals)
1405
1406        how_applied_str = f" [{how_applied}]" if how_applied else ""
1407
1408        item = f"- `{self.display_name}` ({self.change_category})\n"
1409
1410        if self.snapshot_table_info.model_kind_name:
1411            item += f"  **Kind:** {self.snapshot_table_info.model_kind_name}{how_applied_str}\n"
1412
1413        if self.is_incremental:
1414            # in-depth interval info is only relevant for incremental models
1415            item += f"  **Dates loaded in PR:** [{self.loaded_intervals_rendered}]\n"
1416            if self.missing_intervals:
1417                item += f"  **Dates *not* loaded in PR:** [{self.missing_intervals_formatted}]\n"
1418
1419        return item