sqlmesh.integrations.github.cicd.controller
1from __future__ import annotations 2 3import functools 4import json 5import logging 6import os 7import pathlib 8import re 9import traceback 10import typing as t 11from enum import Enum 12from pathlib import Path 13from dataclasses import dataclass 14from functools import cached_property 15 16import requests 17from sqlglot.helper import seq_get 18 19from sqlmesh.core import constants as c 20from sqlmesh.core.console import SNAPSHOT_CHANGE_CATEGORY_STR, get_console, MarkdownConsole 21from sqlmesh.core.context import Context 22from sqlmesh.core.test.result import ModelTextTestResult 23from sqlmesh.core.environment import Environment 24from sqlmesh.core.plan import Plan, PlanBuilder, SnapshotIntervals 25from sqlmesh.core.plan.definition import UserProvidedFlags 26from sqlmesh.core.snapshot.definition import ( 27 Snapshot, 28 SnapshotChangeCategory, 29 SnapshotId, 30 SnapshotTableInfo, 31) 32from sqlglot.errors import SqlglotError 33from sqlmesh.core.user import User 34from sqlmesh.core.config import Config 35from sqlmesh.integrations.github.cicd.config import GithubCICDBotConfig 36from sqlmesh.utils import word_characters_only, Verbosity 37from sqlmesh.utils.date import now 38from sqlmesh.utils.errors import ( 39 CICDBotError, 40 NoChangesPlanError, 41 PlanError, 42 UncategorizedPlanError, 43 LinterError, 44 SQLMeshError, 45) 46from sqlmesh.utils.pydantic import PydanticModel 47 48if t.TYPE_CHECKING: 49 from github import Github 50 from github.CheckRun import CheckRun 51 from github.Issue import Issue 52 from github.IssueComment import IssueComment 53 from github.PullRequest import PullRequest 54 from github.PullRequestReview import PullRequestReview 55 from github.Repository import Repository 56 57logger = logging.getLogger(__name__) 58 59 60class TestFailure(Exception): 61 pass 62 63 64class PullRequestInfo(PydanticModel): 65 """Contains information related to a pull request that can be used to construct other objects/URLs""" 66 67 owner: str 68 repo: str 69 pr_number: int 70 71 @property 72 def full_repo_path(self) -> str: 73 return "/".join([self.owner, self.repo]) 74 75 @classmethod 76 def create_from_pull_request_url(cls, pull_request_url: str) -> PullRequestInfo: 77 owner, repo, _, pr_number = pull_request_url.split("/")[-4:] 78 return cls( 79 owner=owner, 80 repo=repo, 81 pr_number=int(pr_number), 82 ) 83 84 85class GithubCheckStatus(str, Enum): 86 QUEUED = "queued" 87 IN_PROGRESS = "in_progress" 88 COMPLETED = "completed" 89 90 @property 91 def is_queued(self) -> bool: 92 return self == GithubCheckStatus.QUEUED 93 94 @property 95 def is_in_progress(self) -> bool: 96 return self == GithubCheckStatus.IN_PROGRESS 97 98 @property 99 def is_completed(self) -> bool: 100 return self == GithubCheckStatus.COMPLETED 101 102 103class GithubCheckConclusion(str, Enum): 104 SUCCESS = "success" 105 FAILURE = "failure" 106 NEUTRAL = "neutral" 107 CANCELLED = "cancelled" 108 TIMED_OUT = "timed_out" 109 ACTION_REQUIRED = "action_required" 110 SKIPPED = "skipped" 111 112 @property 113 def is_success(self) -> bool: 114 return self == GithubCheckConclusion.SUCCESS 115 116 @property 117 def is_failure(self) -> bool: 118 return self == GithubCheckConclusion.FAILURE 119 120 @property 121 def is_neutral(self) -> bool: 122 return self == GithubCheckConclusion.NEUTRAL 123 124 @property 125 def is_cancelled(self) -> bool: 126 return self == GithubCheckConclusion.CANCELLED 127 128 @property 129 def is_timed_out(self) -> bool: 130 return self == GithubCheckConclusion.TIMED_OUT 131 132 @property 133 def is_action_required(self) -> bool: 134 return self == GithubCheckConclusion.ACTION_REQUIRED 135 136 @property 137 def is_skipped(self) -> bool: 138 return self == GithubCheckConclusion.SKIPPED 139 140 141class MergeStateStatus(str, Enum): 142 """ 143 https://docs.github.com/en/graphql/reference/enums#mergestatestatus 144 """ 145 146 BEHIND = "behind" 147 BLOCKED = "blocked" 148 CLEAN = "clean" 149 DIRTY = "dirty" 150 DRAFT = "draft" 151 HAS_HOOKS = "has_hooks" 152 UNKNOWN = "unknown" 153 UNSTABLE = "unstable" 154 155 @property 156 def is_behind(self) -> bool: 157 return self == MergeStateStatus.BEHIND 158 159 @property 160 def is_blocked(self) -> bool: 161 return self == MergeStateStatus.BLOCKED 162 163 @property 164 def is_clean(self) -> bool: 165 return self == MergeStateStatus.CLEAN 166 167 @property 168 def is_dirty(self) -> bool: 169 return self == MergeStateStatus.DIRTY 170 171 @property 172 def is_draft(self) -> bool: 173 return self == MergeStateStatus.DRAFT 174 175 @property 176 def is_has_hooks(self) -> bool: 177 return self == MergeStateStatus.HAS_HOOKS 178 179 @property 180 def is_unknown(self) -> bool: 181 return self == MergeStateStatus.UNKNOWN 182 183 @property 184 def is_unstable(self) -> bool: 185 return self == MergeStateStatus.UNSTABLE 186 187 188class BotCommand(Enum): 189 INVALID = 1 190 DEPLOY_PROD = 2 191 192 @classmethod 193 def from_comment_body(cls, body: str, namespace: t.Optional[str] = None) -> BotCommand: 194 body = body.strip() 195 namespace = namespace.strip() if namespace else "" 196 input_to_command = { 197 namespace + "/deploy": cls.DEPLOY_PROD, 198 } 199 return input_to_command.get(body, cls.INVALID) 200 201 @property 202 def is_invalid(self) -> bool: 203 return self == self.INVALID 204 205 @property 206 def is_deploy_prod(self) -> bool: 207 return self == self.DEPLOY_PROD 208 209 210class GithubEvent: 211 """ 212 Takes a Github Actions event payload and provides a simple interface to access 213 """ 214 215 def __init__(self, payload: t.Dict[str, t.Any]) -> None: 216 self.payload = payload 217 self._pull_request_info: t.Optional[PullRequestInfo] = None 218 219 @classmethod 220 def from_obj(cls, obj: t.Dict[str, t.Any]) -> GithubEvent: 221 return cls(payload=obj) 222 223 @classmethod 224 def from_path(cls, path: t.Union[str, pathlib.Path]) -> GithubEvent: 225 with open(pathlib.Path(path), "r", encoding="utf-8") as f: 226 return cls.from_obj(json.load(f)) 227 228 @classmethod 229 def from_env(cls) -> GithubEvent: 230 return cls.from_path(os.environ["GITHUB_EVENT_PATH"]) 231 232 @property 233 def is_review(self) -> bool: 234 return bool(self.payload.get("review")) 235 236 @property 237 def is_comment(self) -> bool: 238 comment = self.payload.get("comment") 239 if not comment: 240 return False 241 if not comment.get("body"): 242 return False 243 return True 244 245 @property 246 def is_comment_added(self) -> bool: 247 return self.is_comment and self.payload.get("action") != "deleted" 248 249 @property 250 def is_pull_request(self) -> bool: 251 return bool(self.payload.get("pull_request")) 252 253 @property 254 def is_pull_request_closed(self) -> bool: 255 return self.is_pull_request and self.payload.get("action") == "closed" 256 257 @property 258 def pull_request_url(self) -> str: 259 if self.is_review: 260 return self.payload["review"]["pull_request_url"] 261 if self.is_comment: 262 return self.payload["issue"]["pull_request"]["url"] 263 if self.is_pull_request: 264 return self.payload["pull_request"]["_links"]["self"]["href"] 265 raise CICDBotError("Unable to determine pull request url") 266 267 @property 268 def pull_request_info(self) -> PullRequestInfo: 269 if not self._pull_request_info: 270 self._pull_request_info = PullRequestInfo.create_from_pull_request_url( 271 self.pull_request_url 272 ) 273 return self._pull_request_info 274 275 @property 276 def pull_request_comment_body(self) -> t.Optional[str]: 277 if self.is_comment_added: 278 return self.payload["comment"]["body"] 279 return None 280 281 282class GithubController: 283 BOT_HEADER_MSG = ":robot: **SQLMesh Bot Info** :robot:" 284 MAX_BYTE_LENGTH = 65535 285 286 def __init__( 287 self, 288 paths: t.Union[Path, t.Iterable[Path]], 289 token: str, 290 config: t.Optional[t.Union[Config, str]] = None, 291 event: t.Optional[GithubEvent] = None, 292 client: t.Optional[Github] = None, 293 context: t.Optional[Context] = None, 294 ) -> None: 295 from github import Github 296 297 logger.debug(f"Initializing GithubController with paths: {paths} and config: {config}") 298 299 self.config = config 300 self._paths = paths 301 self._token = token 302 self._event = event or GithubEvent.from_env() 303 logger.debug(f"Github event: {json.dumps(self._event.payload)}") 304 self._pr_plan_builder: t.Optional[PlanBuilder] = None 305 self._prod_plan_builder: t.Optional[PlanBuilder] = None 306 self._prod_plan_with_gaps_builder: t.Optional[PlanBuilder] = None 307 self._check_run_mapping: t.Dict[str, CheckRun] = {} 308 309 if not isinstance(get_console(), MarkdownConsole): 310 raise CICDBotError("Console must be a markdown console.") 311 self._console = t.cast(MarkdownConsole, get_console()) 312 313 from github.Consts import DEFAULT_BASE_URL 314 from github.Auth import Token 315 316 self._client: Github = client or Github( 317 base_url=os.environ.get("GITHUB_API_URL", DEFAULT_BASE_URL), auth=Token(self._token) 318 ) 319 320 self._repo: Repository = self._client.get_repo( 321 self._event.pull_request_info.full_repo_path, lazy=True 322 ) 323 self._pull_request: PullRequest = self._repo.get_pull( 324 self._event.pull_request_info.pr_number 325 ) 326 self._issue: Issue = self._repo.get_issue(self._event.pull_request_info.pr_number) 327 self._reviews: t.Iterable[PullRequestReview] = self._pull_request.get_reviews() 328 # TODO: The python module says that user names can be None and this is not currently handled 329 self._approvers: t.Set[str] = { 330 review.user.login or "UNKNOWN" 331 for review in self._reviews 332 if review.state.lower() == "approved" 333 } 334 logger.debug(f"Approvers: {', '.join(self._approvers)}") 335 self._context: Context = context or Context(paths=self._paths, config=self.config) 336 337 # Bot config needs the context to be initialized 338 logger.debug(f"Bot config: {self.bot_config.json(indent=2)}") 339 340 @property 341 def deploy_command_enabled(self) -> bool: 342 return self.bot_config.enable_deploy_command 343 344 @property 345 def is_comment_added(self) -> bool: 346 return self._event.is_comment_added 347 348 @property 349 def _required_approvers(self) -> t.List[User]: 350 required_approvers = [ 351 user 352 for user in self._context.users 353 if user.is_required_approver and user.github_username 354 ] 355 logger.debug( 356 f"Required approvers: {', '.join(user.github_username for user in required_approvers if user.github_username)}" 357 ) 358 return required_approvers 359 360 @property 361 def _required_approvers_with_approval(self) -> t.List[User]: 362 return [ 363 user for user in self._required_approvers if user.github_username in self._approvers 364 ] 365 366 @property 367 def pr_environment_name(self) -> str: 368 return Environment.sanitize_name( 369 "_".join( 370 [ 371 self.bot_config.pr_environment_name or self._event.pull_request_info.repo, 372 str(self._event.pull_request_info.pr_number), 373 ] 374 ) 375 ) 376 377 @property 378 def do_required_approval_check(self) -> bool: 379 """We want to skip required approval check if no users have this role""" 380 do_required_approval_check = bool(self._required_approvers) 381 logger.debug(f"Do required approval check: {do_required_approval_check}") 382 return do_required_approval_check 383 384 @property 385 def has_required_approval(self) -> bool: 386 """ 387 Check if the PR has a required approver. 388 389 TODO: Allow defining requiring some number, or all, required approvers. 390 """ 391 if not self._required_approvers or self._required_approvers_with_approval: 392 logger.debug("Has required Approval") 393 return True 394 logger.debug("Does not have required approval") 395 return False 396 397 @property 398 def pr_plan(self) -> Plan: 399 if not self._pr_plan_builder: 400 self._pr_plan_builder = self._context.plan_builder( 401 environment=self.pr_environment_name, 402 skip_tests=True, 403 skip_linter=True, 404 categorizer_config=self.bot_config.auto_categorize_changes, 405 start=self.bot_config.default_pr_start, 406 min_intervals=self.bot_config.pr_min_intervals, 407 skip_backfill=self.bot_config.skip_pr_backfill, 408 include_unmodified=self.bot_config.pr_include_unmodified, 409 forward_only=self.forward_only_plan, 410 ) 411 assert self._pr_plan_builder 412 return self._pr_plan_builder.build() 413 414 @property 415 def pr_plan_or_none(self) -> t.Optional[Plan]: 416 try: 417 return self.pr_plan 418 except: 419 return None 420 421 @property 422 def pr_plan_flags(self) -> t.Optional[t.Dict[str, UserProvidedFlags]]: 423 if pr_plan := self.pr_plan_or_none: 424 return pr_plan.user_provided_flags 425 if pr_plan_builder := self._pr_plan_builder: 426 return pr_plan_builder._user_provided_flags 427 return None 428 429 @property 430 def prod_plan(self) -> Plan: 431 if not self._prod_plan_builder: 432 self._prod_plan_builder = self._context.plan_builder( 433 c.PROD, 434 no_gaps=True, 435 skip_tests=True, 436 skip_linter=True, 437 categorizer_config=self.bot_config.auto_categorize_changes, 438 run=self.bot_config.run_on_deploy_to_prod, 439 forward_only=self.forward_only_plan, 440 ) 441 assert self._prod_plan_builder 442 return self._prod_plan_builder.build() 443 444 @property 445 def prod_plan_with_gaps(self) -> Plan: 446 if not self._prod_plan_with_gaps_builder: 447 self._prod_plan_with_gaps_builder = self._context.plan_builder( 448 c.PROD, 449 # this is required to highlight any data gaps between this PR environment and prod (since PR environments may only contain a subset of data) 450 no_gaps=False, 451 skip_tests=True, 452 skip_linter=True, 453 categorizer_config=self.bot_config.auto_categorize_changes, 454 run=self.bot_config.run_on_deploy_to_prod, 455 forward_only=self.forward_only_plan, 456 ) 457 assert self._prod_plan_with_gaps_builder 458 return self._prod_plan_with_gaps_builder.build() 459 460 @property 461 def bot_config(self) -> GithubCICDBotConfig: 462 bot_config = self._context.config.cicd_bot or GithubCICDBotConfig( 463 auto_categorize_changes=self._context.auto_categorize_changes 464 ) 465 return bot_config 466 467 @property 468 def modified_snapshots(self) -> t.Dict[SnapshotId, t.Union[Snapshot, SnapshotTableInfo]]: 469 return self.prod_plan_with_gaps.modified_snapshots 470 471 @property 472 def removed_snapshots(self) -> t.Set[SnapshotId]: 473 return set(self.prod_plan_with_gaps.context_diff.removed_snapshots) 474 475 @property 476 def pr_targets_prod_branch(self) -> bool: 477 return self._pull_request.base.ref in self.bot_config.prod_branch_names 478 479 @property 480 def forward_only_plan(self) -> bool: 481 default = self._context.config.plan.forward_only 482 head_ref = self._pull_request.head.ref 483 if isinstance(head_ref, str): 484 return head_ref.endswith(self.bot_config.forward_only_branch_suffix) or default 485 return default 486 487 @classmethod 488 def _append_output(cls, key: str, value: str) -> None: 489 """ 490 Appends the given key/value to output so they can be read by following steps 491 """ 492 logger.debug(f"Setting output. Key: {key}, Value: {value}") 493 494 # GitHub Actions sets this environment variable 495 if output_file := os.environ.get("GITHUB_OUTPUT"): 496 with open(output_file, "a", encoding="utf-8") as fh: 497 print(f"{key}={value}", file=fh) 498 499 def get_forward_only_plan_post_deployment_tip(self, plan: Plan) -> str: 500 if not plan.forward_only: 501 return "" 502 503 example_model_name = "<model name>" 504 for snapshot_id in sorted(plan.snapshots): 505 snapshot = plan.snapshots[snapshot_id] 506 if snapshot.is_incremental: 507 example_model_name = snapshot.node.name 508 break 509 510 return ( 511 "> [!TIP]\n" 512 "> In order to see this forward-only plan retroactively apply to historical intervals on the production model, run the below for date ranges in scope:\n" 513 "> \n" 514 f"> `$ sqlmesh plan --restate-model {example_model_name} --start YYYY-MM-DD --end YYYY-MM-DD`\n" 515 ">\n" 516 "> Learn more: https://sqlmesh.readthedocs.io/en/stable/concepts/plans/?h=restate#restatement-plans" 517 ) 518 519 def get_plan_summary(self, plan: Plan) -> str: 520 # use Verbosity.VERY_VERBOSE to prevent the list of models from being truncated 521 # this is particularly important for the "Models needing backfill" list because 522 # there is no easy way to tell this otherwise 523 orig_verbosity = self._console.verbosity 524 self._console.verbosity = Verbosity.VERY_VERBOSE 525 526 try: 527 # Clear out any output that might exist from prior steps 528 self._console.consume_captured_output() 529 if plan.restatements: 530 self._console._print("\n**Restating models**\n") 531 else: 532 self._console.show_environment_difference_summary( 533 context_diff=plan.context_diff, 534 no_diff=False, 535 ) 536 if plan.context_diff.has_changes: 537 self._console.show_model_difference_summary( 538 context_diff=plan.context_diff, 539 environment_naming_info=plan.environment_naming_info, 540 default_catalog=self._context.default_catalog, 541 no_diff=False, 542 ) 543 difference_summary = self._console.consume_captured_output() 544 self._console._show_missing_dates(plan, self._context.default_catalog) 545 missing_dates = self._console.consume_captured_output() 546 547 plan_flags_section = ( 548 f"\n\n{self._generate_plan_flags_section(plan.user_provided_flags)}" 549 if plan.user_provided_flags 550 else "" 551 ) 552 553 if not difference_summary and not missing_dates: 554 return f"No changes to apply.{plan_flags_section}" 555 556 warnings_block = self._console.consume_captured_warnings() 557 errors_block = self._console.consume_captured_errors() 558 559 return f"{warnings_block}{errors_block}{difference_summary}\n{missing_dates}{plan_flags_section}" 560 except PlanError as e: 561 logger.exception("Plan failed to generate") 562 return f"Plan failed to generate. Check for pending or unresolved changes. Error: {e}" 563 finally: 564 self._console.verbosity = orig_verbosity 565 566 def get_pr_environment_summary( 567 self, conclusion: GithubCheckConclusion, exception: t.Optional[Exception] = None 568 ) -> str: 569 heading = "" 570 summary = "" 571 572 if conclusion.is_success: 573 summary = self._get_pr_environment_summary_success() 574 elif conclusion.is_action_required: 575 heading = f":warning: Action Required to create or update PR Environment `{self.pr_environment_name}` :warning:" 576 summary = self._get_pr_environment_summary_action_required(exception) 577 elif conclusion.is_failure: 578 heading = ( 579 f":x: Failed to create or update PR Environment `{self.pr_environment_name}` :x:" 580 ) 581 summary = self._get_pr_environment_summary_failure(exception) 582 elif conclusion.is_skipped: 583 heading = f":next_track_button: Skipped creating or updating PR Environment `{self.pr_environment_name}` :next_track_button:" 584 summary = self._get_pr_environment_summary_skipped(exception) 585 else: 586 heading = f":interrobang: Got an unexpected conclusion: {conclusion.value}" 587 588 # note: we just add warnings here, errors will be covered by the "failure" conclusion 589 if warnings := self._console.consume_captured_warnings(): 590 summary = f"{warnings}\n{summary}" 591 592 return f"{heading}\n\n{summary}".strip() 593 594 def _get_pr_environment_summary_success(self) -> str: 595 prod_plan = self.prod_plan_with_gaps 596 597 if not prod_plan.has_changes: 598 summary = "No models were modified in this PR.\n" 599 else: 600 intro = self._generate_pr_environment_summary_intro() 601 summary = intro + self._generate_pr_environment_summary_list(prod_plan) 602 603 if prod_plan.user_provided_flags: 604 summary += self._generate_plan_flags_section(prod_plan.user_provided_flags) 605 606 return summary 607 608 def _get_pr_environment_summary_skipped(self, exception: t.Optional[Exception] = None) -> str: 609 if isinstance(exception, NoChangesPlanError): 610 skip_reason = "No changes were detected compared to the prod environment." 611 elif isinstance(exception, TestFailure): 612 skip_reason = "Unit Test(s) Failed so skipping PR creation" 613 else: 614 skip_reason = "A prior stage failed resulting in skipping PR creation." 615 616 return skip_reason 617 618 def _get_pr_environment_summary_action_required( 619 self, exception: t.Optional[Exception] = None 620 ) -> str: 621 plan = self.pr_plan_or_none 622 if isinstance(exception, UncategorizedPlanError) and plan: 623 failure_msg = f"The following models could not be categorized automatically:\n" 624 for snapshot in plan.uncategorized: 625 failure_msg += f"- {snapshot.name}\n" 626 failure_msg += ( 627 f"\nRun `sqlmesh plan {self.pr_environment_name}` locally to apply these changes.\n\n" 628 "If you would like the bot to automatically categorize changes, check the [documentation](https://sqlmesh.readthedocs.io/en/stable/integrations/github/) for more information." 629 ) 630 else: 631 failure_msg = "Please check the Actions Workflow logs for more information." 632 633 return failure_msg 634 635 def _get_pr_environment_summary_failure(self, exception: t.Optional[Exception] = None) -> str: 636 console_output = self._console.consume_captured_output() 637 failure_msg = "" 638 639 if isinstance(exception, PlanError): 640 if exception.args and (msg := exception.args[0]) and isinstance(msg, str): 641 failure_msg += f"*{msg}*\n" 642 if console_output: 643 failure_msg += f"\n{console_output}" 644 elif isinstance(exception, (SQLMeshError, SqlglotError, ValueError)): 645 # this logic is taken from the global error handler attached to the CLI, which uses `click.echo()` to output the message 646 # so cant be re-used here because it bypasses the Console 647 failure_msg = f"**Error:** {str(exception)}" 648 elif exception: 649 logger.debug( 650 "Got unexpected error. Error Type: " 651 + str(type(exception)) 652 + " Stack trace: " 653 + traceback.format_exc() 654 ) 655 failure_msg = f"This is an unexpected error.\n\n**Exception:**\n```\n{traceback.format_exc()}\n```" 656 657 if captured_errors := self._console.consume_captured_errors(): 658 failure_msg = f"{captured_errors}\n{failure_msg}" 659 660 if plan_flags := self.pr_plan_flags: 661 failure_msg += f"\n\n{self._generate_plan_flags_section(plan_flags)}" 662 663 return failure_msg 664 665 def run_tests(self) -> t.Tuple[ModelTextTestResult, str]: 666 """ 667 Run tests for the PR 668 """ 669 return self._context._run_tests(verbosity=Verbosity.VERBOSE) 670 671 def run_linter(self) -> None: 672 """ 673 Run linter for the PR 674 """ 675 self._console.consume_captured_output() 676 self._context.lint_models() 677 678 def _get_or_create_comment(self, header: str = BOT_HEADER_MSG) -> IssueComment: 679 comment = seq_get( 680 [comment for comment in self._issue.get_comments() if header in comment.body], 681 0, 682 ) 683 if not comment: 684 logger.debug(f"Did not find comment so creating one with header: {header}") 685 return self._issue.create_comment(header) 686 logger.debug(f"Found comment with header: {header}") 687 return comment 688 689 def _get_merge_state_status(self) -> MergeStateStatus: 690 """ 691 This feature is currently in preview and therefore not available in the python module. 692 So we query GraphQL directly instead. 693 """ 694 headers = { 695 "Authorization": f"Bearer {self._token}", 696 "Accept": "application/vnd.github.merge-info-preview+json", 697 } 698 query = f"""{{ 699 repository(owner: "{self._event.pull_request_info.owner}", name: "{self._event.pull_request_info.repo}") {{ 700 pullRequest(number: {self._event.pull_request_info.pr_number}) {{ 701 title 702 state 703 mergeStateStatus 704 }} 705 }} 706 }}""" 707 request = requests.post( 708 os.environ["GITHUB_GRAPHQL_URL"], 709 json={"query": query}, 710 headers=headers, 711 ) 712 if request.status_code == 200: 713 merge_status = MergeStateStatus( 714 request.json()["data"]["repository"]["pullRequest"]["mergeStateStatus"].lower() 715 ) 716 logger.debug(f"Merge state status: {merge_status.value}") 717 return merge_status 718 raise CICDBotError(f"Unable to get merge state status. Error: {request.text}") 719 720 def update_sqlmesh_comment_info( 721 self, value: str, *, dedup_regex: t.Optional[str] 722 ) -> t.Tuple[bool, IssueComment]: 723 """ 724 Update the SQLMesh PR Comment for the given lookup key with the given value. If a comment does not exist then 725 it creates one. It determines the comment to update by looking for a comment with the header. If a dedup 726 regex is provided then it will check if the value already exists in the comment and if so it will not update 727 """ 728 comment = self._get_or_create_comment() 729 if dedup_regex: 730 # If we find a match against the regex then we just return since the comment has already been posted 731 if seq_get(re.findall(dedup_regex, comment.body), 0): 732 return False, comment 733 full_comment = f"{comment.body}\n{value}" 734 body, *truncated = self._chunk_up_api_message(f"{full_comment}") 735 if truncated: 736 logger.warning( 737 f"Comment body was too long so we truncated it. Full text: {full_comment}" 738 ) 739 comment.edit(body=body) 740 return True, comment 741 742 def update_pr_environment(self) -> None: 743 """ 744 Creates a PR environment from the logic present in the PR. If the PR contains changes that are 745 uncategorized, then an error will be raised. 746 """ 747 self._console.consume_captured_output() # clear output buffer 748 self._context.apply(self.pr_plan) # will raise if PR environment creation fails 749 750 # update PR info comment 751 vde_title = "- :eyes: To **review** this PR's changes, use virtual data environment:" 752 comment_value = f"{vde_title}\n - `{self.pr_environment_name}`" 753 if self.bot_config.enable_deploy_command: 754 full_command = f"{self.bot_config.command_namespace or ''}/deploy" 755 comment_value += f"\n- :arrow_forward: To **apply** this PR's plan to prod, comment:\n - `{full_command}`" 756 dedup_regex = vde_title.replace("*", r"\*") + r".*" 757 updated_comment, _ = self.update_sqlmesh_comment_info( 758 value=comment_value, 759 dedup_regex=dedup_regex, 760 ) 761 if updated_comment: 762 self._append_output("created_pr_environment", "true") 763 764 def deploy_to_prod(self) -> None: 765 """ 766 Attempts to deploy a plan to prod. If the plan is not up-to-date or has gaps then it will raise. 767 """ 768 # If the PR is already merged then we will not deploy to prod if this event was triggered prior to the merge. 769 # The deploy can still happen if the workflow is configured to listen for `closed` events. 770 if self._pull_request.merged and not self._event.is_pull_request_closed: 771 raise CICDBotError( 772 "PR is already merged and this event was triggered prior to the merge." 773 ) 774 merge_status = self._get_merge_state_status() 775 if self.bot_config.check_if_blocked_on_deploy_to_prod and merge_status.is_blocked: 776 raise CICDBotError( 777 "Branch protection or ruleset requirement is likely not satisfied, e.g. missing CODEOWNERS approval. " 778 "Please check PR and resolve any issues. To disable this check, set `check_if_blocked_on_deploy_to_prod` to false in the bot configuration." 779 ) 780 if merge_status.is_dirty: 781 raise CICDBotError( 782 "Merge commit cannot be cleanly created. Likely from a merge conflict. " 783 "Please check PR and resolve any issues." 784 ) 785 plan_summary = f"""<details> 786 <summary>:ship: Prod Plan Being Applied</summary> 787 788{self.get_plan_summary(self.prod_plan)} 789</details> 790 791""" 792 if self.forward_only_plan: 793 plan_summary = ( 794 f"{self.get_forward_only_plan_post_deployment_tip(self.prod_plan)}\n{plan_summary}" 795 ) 796 797 self.update_sqlmesh_comment_info( 798 value=plan_summary, 799 dedup_regex=None, 800 ) 801 self._context.apply(self.prod_plan) 802 803 def try_invalidate_pr_environment(self) -> None: 804 """ 805 Marks the PR environment for garbage collection. 806 """ 807 if self.bot_config.invalidate_environment_after_deploy: 808 self._context.invalidate_environment(self.pr_environment_name) 809 810 def _update_check( 811 self, 812 name: str, 813 status: GithubCheckStatus, 814 title: str, 815 conclusion: t.Optional[GithubCheckConclusion] = None, 816 full_summary: t.Optional[str] = None, 817 ) -> None: 818 """ 819 Updates the status of the merge commit. 820 """ 821 current_time = now() 822 kwargs: t.Dict[str, t.Any] = { 823 "name": name, 824 # Note: The environment variable `GITHUB_SHA` would be the merge commit so that is why instead we 825 # get the last commit on the PR. 826 "head_sha": self._pull_request.head.sha, 827 "status": status.value, 828 } 829 if status.is_in_progress: 830 kwargs["started_at"] = current_time 831 if status.is_completed: 832 kwargs["completed_at"] = current_time 833 if conclusion: 834 kwargs["conclusion"] = conclusion.value 835 full_summary = full_summary or title 836 summary, text, *truncated = self._chunk_up_api_message(full_summary) + [None] 837 if truncated and truncated[0] is not None: 838 logger.warning(f"Summary was too long so we truncated it. Full text: {full_summary}") 839 kwargs["output"] = {"title": title, "summary": summary} 840 if text: 841 kwargs["output"]["text"] = text 842 logger.debug(f"Updating check with kwargs: {kwargs}") 843 844 if self.running_in_github_actions: 845 # Only make the API call to update the checks if we are running within GitHub Actions 846 # One very annoying limitation of the Pull Request Checks API is that its only available to GitHub Apps 847 # and not personal access tokens, which makes it unable to be utilized during local development 848 if name in self._check_run_mapping: 849 logger.debug(f"Found check run in mapping so updating it. Name: {name}") 850 check_run = self._check_run_mapping[name] 851 check_run.edit( 852 **{ 853 k: v 854 for k, v in kwargs.items() 855 if k not in ("name", "head_sha", "started_at") 856 } 857 ) 858 else: 859 logger.debug(f"Did not find check run in mapping so creating it. Name: {name}") 860 self._check_run_mapping[name] = self._repo.create_check_run(**kwargs) 861 else: 862 # Output the summary using print() so the newlines are resolved and the result can easily 863 # be disambiguated from the rest of the console output and copy+pasted into a Markdown renderer 864 print( 865 f"---CHECK OUTPUT START: {kwargs['output']['title']} ---\n{kwargs['output']['summary']}\n---CHECK OUTPUT END---\n" 866 ) 867 868 if conclusion: 869 self._append_output( 870 word_characters_only(name.replace("SQLMesh - ", "").lower()), conclusion.value 871 ) 872 873 def _update_check_handler( 874 self, 875 check_name: str, 876 status: GithubCheckStatus, 877 conclusion: t.Optional[GithubCheckConclusion], 878 status_handler: t.Callable[[GithubCheckStatus], t.Tuple[str, t.Optional[str]]], 879 conclusion_handler: t.Callable[ 880 [GithubCheckConclusion], t.Tuple[GithubCheckConclusion, str, t.Optional[str]] 881 ], 882 ) -> None: 883 if conclusion: 884 conclusion, title, summary = conclusion_handler(conclusion) 885 else: 886 title, summary = status_handler(status) 887 self._update_check( 888 name=check_name, 889 status=status, 890 title=title, 891 conclusion=conclusion, 892 full_summary=summary, 893 ) 894 895 def update_linter_check( 896 self, 897 status: GithubCheckStatus, 898 conclusion: t.Optional[GithubCheckConclusion] = None, 899 ) -> None: 900 if not self._context.config.linter.enabled: 901 return 902 903 def conclusion_handler( 904 conclusion: GithubCheckConclusion, 905 ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]: 906 linter_summary = self._console.consume_captured_output() or "Linter Success" 907 908 title = "Linter results" 909 910 return conclusion, title, linter_summary 911 912 self._update_check_handler( 913 check_name="SQLMesh - Linter", 914 status=status, 915 conclusion=conclusion, 916 status_handler=lambda status: ( 917 { 918 GithubCheckStatus.IN_PROGRESS: "Running linter", 919 GithubCheckStatus.QUEUED: "Waiting to Run linter", 920 }[status], 921 None, 922 ), 923 conclusion_handler=conclusion_handler, 924 ) 925 926 def update_test_check( 927 self, 928 status: GithubCheckStatus, 929 conclusion: t.Optional[GithubCheckConclusion] = None, 930 result: t.Optional[ModelTextTestResult] = None, 931 traceback: t.Optional[str] = None, 932 ) -> None: 933 """ 934 Updates the status of tests for code in the PR 935 """ 936 937 def conclusion_handler( 938 conclusion: GithubCheckConclusion, 939 result: t.Optional[ModelTextTestResult], 940 ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]: 941 if result: 942 # Clear out console 943 self._console.consume_captured_output() 944 self._console.log_test_results( 945 result, 946 self._context.test_connection_config._engine_adapter.DIALECT, 947 ) 948 test_summary = self._console.consume_captured_output() 949 test_title = "Tests Passed" if result.wasSuccessful() else "Tests Failed" 950 test_conclusion = ( 951 GithubCheckConclusion.SUCCESS 952 if result.wasSuccessful() 953 else GithubCheckConclusion.FAILURE 954 ) 955 return test_conclusion, test_title, test_summary 956 if traceback: 957 self._console._print(traceback) 958 959 test_title = "Skipped Tests" if conclusion.is_skipped else "Tests Failed" 960 return conclusion, test_title, traceback 961 962 self._update_check_handler( 963 check_name="SQLMesh - Run Unit Tests", 964 status=status, 965 conclusion=conclusion, 966 status_handler=lambda status: ( 967 { 968 GithubCheckStatus.IN_PROGRESS: "Running Tests", 969 GithubCheckStatus.QUEUED: "Waiting to Run Tests", 970 }[status], 971 None, 972 ), 973 conclusion_handler=functools.partial(conclusion_handler, result=result), 974 ) 975 976 def update_required_approval_check( 977 self, status: GithubCheckStatus, conclusion: t.Optional[GithubCheckConclusion] = None 978 ) -> None: 979 """ 980 Updates the status of the merge commit for the required approval. 981 """ 982 983 def conclusion_handler( 984 conclusion: GithubCheckConclusion, 985 ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]: 986 test_summary = "**List of possible required approvers:**\n" 987 for user in self._required_approvers: 988 test_summary += f"- `{user.github_username or user.username}`\n" 989 990 title = ( 991 f"Obtained approval from required approvers: {', '.join([user.github_username or user.username for user in self._required_approvers_with_approval])}" 992 if conclusion.is_success 993 else "Need a Required Approval" 994 ) 995 return conclusion, title, test_summary 996 997 # If we get a skip that means required approvers is not configured therefore it does not need to be displayed 998 if conclusion and conclusion.is_skipped: 999 return 1000 1001 self._update_check_handler( 1002 check_name="SQLMesh - Has Required Approval", 1003 status=status, 1004 conclusion=conclusion, 1005 status_handler=lambda status: ( 1006 { 1007 GithubCheckStatus.IN_PROGRESS: "Checking if we have required Approvers", 1008 GithubCheckStatus.QUEUED: "Waiting to Check if we have required Approvers", 1009 }[status], 1010 None, 1011 ), 1012 conclusion_handler=conclusion_handler, 1013 ) 1014 1015 def update_pr_environment_check( 1016 self, status: GithubCheckStatus, exception: t.Optional[Exception] = None 1017 ) -> t.Optional[GithubCheckConclusion]: 1018 """ 1019 Updates the status of the merge commit for the PR environment. 1020 """ 1021 conclusion: t.Optional[GithubCheckConclusion] = None 1022 if isinstance(exception, (NoChangesPlanError, TestFailure, LinterError)): 1023 conclusion = GithubCheckConclusion.SKIPPED 1024 elif isinstance(exception, UncategorizedPlanError): 1025 conclusion = GithubCheckConclusion.ACTION_REQUIRED 1026 elif exception: 1027 conclusion = GithubCheckConclusion.FAILURE 1028 elif status.is_completed: 1029 conclusion = GithubCheckConclusion.SUCCESS 1030 1031 check_title_static = "PR Virtual Data Environment: " 1032 check_title = check_title_static + self.pr_environment_name 1033 1034 def conclusion_handler( 1035 conclusion: GithubCheckConclusion, exception: t.Optional[Exception] 1036 ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]: 1037 summary = self.get_pr_environment_summary(conclusion, exception) 1038 self._append_output("pr_environment_name", self.pr_environment_name) 1039 return conclusion, check_title, summary 1040 1041 self._update_check_handler( 1042 check_name="SQLMesh - PR Environment Synced", 1043 status=status, 1044 conclusion=conclusion, 1045 status_handler=lambda status: ( 1046 check_title, 1047 { 1048 GithubCheckStatus.QUEUED: f":pause_button: Waiting to create or update PR Environment `{self.pr_environment_name}`", 1049 GithubCheckStatus.IN_PROGRESS: f":rocket: Creating or Updating PR Environment `{self.pr_environment_name}`", 1050 }[status], 1051 ), 1052 conclusion_handler=functools.partial(conclusion_handler, exception=exception), 1053 ) 1054 return conclusion 1055 1056 def update_prod_plan_preview_check( 1057 self, 1058 status: GithubCheckStatus, 1059 conclusion: t.Optional[GithubCheckConclusion] = None, 1060 summary: t.Optional[str] = None, 1061 ) -> None: 1062 """ 1063 Updates the status of the merge commit for the prod plan preview. 1064 """ 1065 1066 def conclusion_handler( 1067 conclusion: GithubCheckConclusion, summary: t.Optional[str] = None 1068 ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]: 1069 conclusion_to_title = { 1070 GithubCheckConclusion.SUCCESS: "Prod Plan Preview", 1071 GithubCheckConclusion.CANCELLED: "Cancelled generating prod plan preview", 1072 GithubCheckConclusion.SKIPPED: "Skipped generating prod plan preview since PR was not synchronized", 1073 GithubCheckConclusion.FAILURE: "Failed to generate prod plan preview", 1074 } 1075 title = conclusion_to_title.get( 1076 conclusion, f"Got an unexpected conclusion: {conclusion.value}" 1077 ) 1078 if conclusion == GithubCheckConclusion.SUCCESS and summary: 1079 summary = ( 1080 f"This is a preview that shows the differences between this PR environment `{self.pr_environment_name}` and `prod`.\n\n" 1081 "These are the changes that would be deployed.\n\n" 1082 ) + summary 1083 1084 return conclusion, title, summary 1085 1086 self._update_check_handler( 1087 check_name="SQLMesh - Prod Plan Preview", 1088 status=status, 1089 conclusion=conclusion, 1090 status_handler=lambda status: ( 1091 { 1092 GithubCheckStatus.IN_PROGRESS: "Generating Prod Plan", 1093 GithubCheckStatus.QUEUED: "Waiting to Generate Prod Plan", 1094 }[status], 1095 None, 1096 ), 1097 conclusion_handler=functools.partial(conclusion_handler, summary=summary), 1098 ) 1099 1100 def update_prod_environment_check( 1101 self, 1102 status: GithubCheckStatus, 1103 conclusion: t.Optional[GithubCheckConclusion] = None, 1104 skip_reason: t.Optional[str] = None, 1105 plan_error: t.Optional[PlanError] = None, 1106 ) -> None: 1107 """ 1108 Updates the status of the merge commit for the prod environment. 1109 """ 1110 1111 def conclusion_handler( 1112 conclusion: GithubCheckConclusion, skip_reason: t.Optional[str] = None 1113 ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]: 1114 conclusion_to_title = { 1115 GithubCheckConclusion.SUCCESS: "Deployed to Prod", 1116 GithubCheckConclusion.CANCELLED: "Cancelled deploying to prod", 1117 GithubCheckConclusion.SKIPPED: "Skipped deployment", 1118 GithubCheckConclusion.FAILURE: "Failed to deploy to prod", 1119 GithubCheckConclusion.ACTION_REQUIRED: "Failed due to error applying plan", 1120 } 1121 title = ( 1122 conclusion_to_title.get(conclusion) 1123 or f"Got an unexpected conclusion: {conclusion.value}" 1124 ) 1125 if conclusion.is_skipped: 1126 summary = skip_reason 1127 elif conclusion.is_failure: 1128 captured_errors = self._console.consume_captured_errors() 1129 summary = ( 1130 captured_errors or f"{title}\n\n**Error:**\n```\n{traceback.format_exc()}\n```" 1131 ) 1132 elif conclusion.is_action_required: 1133 if plan_error: 1134 summary = f"**Plan error:**\n```\n{plan_error}\n```" 1135 else: 1136 summary = "Got an action required conclusion but no plan error was provided. This is unexpected." 1137 else: 1138 summary = "**Generated Prod Plan**\n" + self.get_plan_summary(self.prod_plan) 1139 1140 return conclusion, title, summary 1141 1142 self._update_check_handler( 1143 check_name="SQLMesh - Prod Environment Synced", 1144 status=status, 1145 conclusion=conclusion, 1146 status_handler=lambda status: ( 1147 { 1148 GithubCheckStatus.IN_PROGRESS: "Deploying to Prod", 1149 GithubCheckStatus.QUEUED: "Waiting to see if we can deploy to prod", 1150 }[status], 1151 None, 1152 ), 1153 conclusion_handler=functools.partial(conclusion_handler, skip_reason=skip_reason), 1154 ) 1155 1156 def try_merge_pr(self) -> None: 1157 """ 1158 Merges the PR using the merge method defined in the bot config. If one is not defined then a merge is not 1159 performed 1160 """ 1161 if self.bot_config.merge_method: 1162 logger.debug(f"Merging PR with merge method: {self.bot_config.merge_method.value}") 1163 self._pull_request.merge(merge_method=self.bot_config.merge_method.value) 1164 else: 1165 logger.debug("No merge method defined so skipping merge") 1166 1167 def get_command_from_comment(self) -> BotCommand: 1168 """ 1169 Gets the command from the comment 1170 """ 1171 if not self._event.is_comment_added: 1172 logger.debug("Event is not a comment so returning invalid") 1173 return BotCommand.INVALID 1174 if self._event.pull_request_comment_body is None: 1175 raise CICDBotError("Unable to get comment body") 1176 logger.debug(f"Getting command from comment body: {self._event.pull_request_comment_body}") 1177 return BotCommand.from_comment_body( 1178 self._event.pull_request_comment_body, self.bot_config.command_namespace 1179 ) 1180 1181 def _chunk_up_api_message(self, message: str) -> t.List[str]: 1182 """ 1183 Chunks up the message into `MAX_BYTE_LENGTH` byte chunks 1184 """ 1185 message_encoded = message.encode("utf-8") 1186 return [ 1187 message_encoded[i : i + self.MAX_BYTE_LENGTH].decode("utf-8", "ignore") 1188 for i in range(0, len(message_encoded), self.MAX_BYTE_LENGTH) 1189 ] 1190 1191 @property 1192 def running_in_github_actions(self) -> bool: 1193 return os.environ.get("GITHUB_ACTIONS", None) == "true" 1194 1195 @property 1196 def version_info(self) -> str: 1197 from sqlmesh.cli.main import _sqlmesh_version 1198 1199 return _sqlmesh_version() 1200 1201 def _generate_plan_flags_section( 1202 self, user_provided_flags: t.Dict[str, UserProvidedFlags] 1203 ) -> str: 1204 # collapsed section syntax: 1205 # https://docs.github.com/en/get-started/writing-on-github/working-with-advanced-formatting/organizing-information-with-collapsed-sections#creating-a-collapsed-section 1206 section = "<details>\n\n<summary>Plan flags</summary>\n\n" 1207 for flag_name, flag_value in user_provided_flags.items(): 1208 section += f"- `{flag_name}` = `{flag_value}`\n" 1209 section += "\n</details>" 1210 1211 return section 1212 1213 def _generate_pr_environment_summary_intro(self) -> str: 1214 note = "" 1215 subset_reasons = [] 1216 1217 if self.bot_config.skip_pr_backfill: 1218 subset_reasons.append("`skip_pr_backfill` is enabled") 1219 1220 if default_pr_start := self.bot_config.default_pr_start: 1221 subset_reasons.append(f"`default_pr_start` is set to `{default_pr_start}`") 1222 1223 if subset_reasons: 1224 note = ( 1225 "> [!IMPORTANT]\n" 1226 f"> This PR environment may only contain a subset of data because:\n" 1227 + "\n".join(f"> - {r}" for r in subset_reasons) 1228 + "\n" 1229 "> \n" 1230 "> This means that deploying to `prod` may not be a simple virtual update if there is still some data to load.\n" 1231 "> See `Dates not loaded in PR` below or the `Prod Plan Preview` check for more information.\n\n" 1232 ) 1233 1234 return ( 1235 f"Here is a summary of data that has been loaded into the PR environment `{self.pr_environment_name}` and could be deployed to `prod`.\n\n" 1236 + note 1237 ) 1238 1239 def _generate_pr_environment_summary_list(self, plan: Plan) -> str: 1240 added_snapshot_ids = set(plan.context_diff.added) 1241 modified_snapshot_ids = set( 1242 s.snapshot_id for s, _ in plan.context_diff.modified_snapshots.values() 1243 ) 1244 removed_snapshot_ids = set(plan.context_diff.removed_snapshots.keys()) 1245 1246 # note: we sort these to get a deterministic order for the output tests 1247 table_records = sorted( 1248 [ 1249 SnapshotSummaryRecord(snapshot_id=snapshot_id, plan=plan) 1250 for snapshot_id in ( 1251 added_snapshot_ids | modified_snapshot_ids | removed_snapshot_ids 1252 ) 1253 ], 1254 key=lambda r: r.display_name, 1255 ) 1256 1257 sections = [ 1258 ("### Added", [r for r in table_records if r.is_added]), 1259 ("### Removed", [r for r in table_records if r.is_removed]), 1260 ("### Directly Modified", [r for r in table_records if r.is_directly_modified]), 1261 ("### Indirectly Modified", [r for r in table_records if r.is_indirectly_modified]), 1262 ( 1263 "### Metadata Updated", 1264 [r for r in table_records if r.is_metadata_updated and not r.is_modified], 1265 ), 1266 ] 1267 1268 summary = "" 1269 for title, records in sections: 1270 if records: 1271 summary += f"\n{title}\n" 1272 1273 for record in records: 1274 summary += f"{record.as_markdown_list_item}\n" 1275 1276 return summary 1277 1278 1279@dataclass 1280class SnapshotSummaryRecord: 1281 snapshot_id: SnapshotId 1282 plan: Plan 1283 1284 @property 1285 def snapshot(self) -> Snapshot: 1286 if self.is_removed: 1287 raise ValueError("Removed snapshots only have SnapshotTableInfo available") 1288 return self.plan.snapshots[self.snapshot_id] 1289 1290 @cached_property 1291 def snapshot_table_info(self) -> SnapshotTableInfo: 1292 if self.is_removed: 1293 return self.plan.modified_snapshots[self.snapshot_id].table_info 1294 return self.plan.snapshots[self.snapshot_id].table_info 1295 1296 @property 1297 def display_name(self) -> str: 1298 dialect = None if self.is_removed else self.snapshot.node.dialect 1299 return self.snapshot_table_info.display_name( 1300 self.plan.environment_naming_info, default_catalog=None, dialect=dialect 1301 ) 1302 1303 @property 1304 def change_category(self) -> str: 1305 if self.is_removed: 1306 return SNAPSHOT_CHANGE_CATEGORY_STR[SnapshotChangeCategory.BREAKING] 1307 1308 if change_category := self.snapshot.change_category: 1309 return SNAPSHOT_CHANGE_CATEGORY_STR[change_category] 1310 1311 return "Uncategorized" 1312 1313 @property 1314 def is_added(self) -> bool: 1315 return self.snapshot_id in self.plan.context_diff.added 1316 1317 @property 1318 def is_removed(self) -> bool: 1319 return self.snapshot_id in self.plan.context_diff.removed_snapshots 1320 1321 @property 1322 def is_dev_preview(self) -> bool: 1323 return not self.plan.deployability_index.is_deployable(self.snapshot_id) 1324 1325 @property 1326 def is_directly_modified(self) -> bool: 1327 return self.plan.context_diff.directly_modified(self.snapshot_table_info.name) 1328 1329 @property 1330 def is_indirectly_modified(self) -> bool: 1331 return self.plan.context_diff.indirectly_modified(self.snapshot_table_info.name) 1332 1333 @property 1334 def is_modified(self) -> bool: 1335 return self.is_directly_modified or self.is_indirectly_modified 1336 1337 @property 1338 def is_metadata_updated(self) -> bool: 1339 return self.plan.context_diff.metadata_updated(self.snapshot_table_info.name) 1340 1341 @property 1342 def is_incremental(self) -> bool: 1343 return self.snapshot_table_info.is_incremental 1344 1345 @property 1346 def modification_type(self) -> str: 1347 if self.is_directly_modified: 1348 return "Directly modified" 1349 if self.is_indirectly_modified: 1350 return "Indirectly modified" 1351 if self.is_metadata_updated: 1352 return "Metadata updated" 1353 1354 return "Unknown" 1355 1356 @property 1357 def loaded_intervals(self) -> SnapshotIntervals: 1358 if self.is_removed: 1359 raise ValueError("Removed snapshots dont have loaded intervals available") 1360 1361 return SnapshotIntervals( 1362 snapshot_id=self.snapshot_id, 1363 intervals=( 1364 self.snapshot.dev_intervals 1365 if self.snapshot.is_forward_only 1366 else self.snapshot.intervals 1367 ), 1368 ) 1369 1370 @property 1371 def loaded_intervals_rendered(self) -> str: 1372 if self.is_removed: 1373 return "REMOVED" 1374 1375 return self._format_intervals(self.loaded_intervals) 1376 1377 @property 1378 def missing_intervals(self) -> t.Optional[SnapshotIntervals]: 1379 return next( 1380 (si for si in self.plan.missing_intervals if si.snapshot_id == self.snapshot_id), 1381 None, 1382 ) 1383 1384 @property 1385 def missing_intervals_formatted(self) -> str: 1386 if not self.is_removed and (intervals := self.missing_intervals): 1387 return self._format_intervals(intervals) 1388 1389 return "N/A" 1390 1391 @property 1392 def as_markdown_list_item(self) -> str: 1393 if self.is_removed: 1394 return f"- `{self.display_name}` ({self.change_category})" 1395 1396 how_applied = "" 1397 1398 if not self.is_incremental: 1399 from sqlmesh.core.console import _format_missing_intervals 1400 1401 # note: this is to re-use the '[recreate view]' and '[full refresh]' text and keep it in sync with updates to the CLI 1402 # it doesnt actually use the passed intervals, those are handled differently 1403 how_applied = _format_missing_intervals(self.snapshot, self.loaded_intervals) 1404 1405 how_applied_str = f" [{how_applied}]" if how_applied else "" 1406 1407 item = f"- `{self.display_name}` ({self.change_category})\n" 1408 1409 if self.snapshot_table_info.model_kind_name: 1410 item += f" **Kind:** {self.snapshot_table_info.model_kind_name}{how_applied_str}\n" 1411 1412 if self.is_incremental: 1413 # in-depth interval info is only relevant for incremental models 1414 item += f" **Dates loaded in PR:** [{self.loaded_intervals_rendered}]\n" 1415 if self.missing_intervals: 1416 item += f" **Dates *not* loaded in PR:** [{self.missing_intervals_formatted}]\n" 1417 1418 return item 1419 1420 def _format_intervals(self, intervals: SnapshotIntervals) -> str: 1421 preview_modifier = " (**preview**)" if self.is_dev_preview else "" 1422 return f"{intervals.format_intervals(self.snapshot.node.interval_unit)}{preview_modifier}"
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
65class PullRequestInfo(PydanticModel): 66 """Contains information related to a pull request that can be used to construct other objects/URLs""" 67 68 owner: str 69 repo: str 70 pr_number: int 71 72 @property 73 def full_repo_path(self) -> str: 74 return "/".join([self.owner, self.repo]) 75 76 @classmethod 77 def create_from_pull_request_url(cls, pull_request_url: str) -> PullRequestInfo: 78 owner, repo, _, pr_number = pull_request_url.split("/")[-4:] 79 return cls( 80 owner=owner, 81 repo=repo, 82 pr_number=int(pr_number), 83 )
Contains information related to a pull request that can be used to construct other objects/URLs
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
86class GithubCheckStatus(str, Enum): 87 QUEUED = "queued" 88 IN_PROGRESS = "in_progress" 89 COMPLETED = "completed" 90 91 @property 92 def is_queued(self) -> bool: 93 return self == GithubCheckStatus.QUEUED 94 95 @property 96 def is_in_progress(self) -> bool: 97 return self == GithubCheckStatus.IN_PROGRESS 98 99 @property 100 def is_completed(self) -> bool: 101 return self == GithubCheckStatus.COMPLETED
An enumeration.
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
104class GithubCheckConclusion(str, Enum): 105 SUCCESS = "success" 106 FAILURE = "failure" 107 NEUTRAL = "neutral" 108 CANCELLED = "cancelled" 109 TIMED_OUT = "timed_out" 110 ACTION_REQUIRED = "action_required" 111 SKIPPED = "skipped" 112 113 @property 114 def is_success(self) -> bool: 115 return self == GithubCheckConclusion.SUCCESS 116 117 @property 118 def is_failure(self) -> bool: 119 return self == GithubCheckConclusion.FAILURE 120 121 @property 122 def is_neutral(self) -> bool: 123 return self == GithubCheckConclusion.NEUTRAL 124 125 @property 126 def is_cancelled(self) -> bool: 127 return self == GithubCheckConclusion.CANCELLED 128 129 @property 130 def is_timed_out(self) -> bool: 131 return self == GithubCheckConclusion.TIMED_OUT 132 133 @property 134 def is_action_required(self) -> bool: 135 return self == GithubCheckConclusion.ACTION_REQUIRED 136 137 @property 138 def is_skipped(self) -> bool: 139 return self == GithubCheckConclusion.SKIPPED
An enumeration.
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
142class MergeStateStatus(str, Enum): 143 """ 144 https://docs.github.com/en/graphql/reference/enums#mergestatestatus 145 """ 146 147 BEHIND = "behind" 148 BLOCKED = "blocked" 149 CLEAN = "clean" 150 DIRTY = "dirty" 151 DRAFT = "draft" 152 HAS_HOOKS = "has_hooks" 153 UNKNOWN = "unknown" 154 UNSTABLE = "unstable" 155 156 @property 157 def is_behind(self) -> bool: 158 return self == MergeStateStatus.BEHIND 159 160 @property 161 def is_blocked(self) -> bool: 162 return self == MergeStateStatus.BLOCKED 163 164 @property 165 def is_clean(self) -> bool: 166 return self == MergeStateStatus.CLEAN 167 168 @property 169 def is_dirty(self) -> bool: 170 return self == MergeStateStatus.DIRTY 171 172 @property 173 def is_draft(self) -> bool: 174 return self == MergeStateStatus.DRAFT 175 176 @property 177 def is_has_hooks(self) -> bool: 178 return self == MergeStateStatus.HAS_HOOKS 179 180 @property 181 def is_unknown(self) -> bool: 182 return self == MergeStateStatus.UNKNOWN 183 184 @property 185 def is_unstable(self) -> bool: 186 return self == MergeStateStatus.UNSTABLE
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
189class BotCommand(Enum): 190 INVALID = 1 191 DEPLOY_PROD = 2 192 193 @classmethod 194 def from_comment_body(cls, body: str, namespace: t.Optional[str] = None) -> BotCommand: 195 body = body.strip() 196 namespace = namespace.strip() if namespace else "" 197 input_to_command = { 198 namespace + "/deploy": cls.DEPLOY_PROD, 199 } 200 return input_to_command.get(body, cls.INVALID) 201 202 @property 203 def is_invalid(self) -> bool: 204 return self == self.INVALID 205 206 @property 207 def is_deploy_prod(self) -> bool: 208 return self == self.DEPLOY_PROD
An enumeration.
193 @classmethod 194 def from_comment_body(cls, body: str, namespace: t.Optional[str] = None) -> BotCommand: 195 body = body.strip() 196 namespace = namespace.strip() if namespace else "" 197 input_to_command = { 198 namespace + "/deploy": cls.DEPLOY_PROD, 199 } 200 return input_to_command.get(body, cls.INVALID)
Inherited Members
- enum.Enum
- name
- value
211class GithubEvent: 212 """ 213 Takes a Github Actions event payload and provides a simple interface to access 214 """ 215 216 def __init__(self, payload: t.Dict[str, t.Any]) -> None: 217 self.payload = payload 218 self._pull_request_info: t.Optional[PullRequestInfo] = None 219 220 @classmethod 221 def from_obj(cls, obj: t.Dict[str, t.Any]) -> GithubEvent: 222 return cls(payload=obj) 223 224 @classmethod 225 def from_path(cls, path: t.Union[str, pathlib.Path]) -> GithubEvent: 226 with open(pathlib.Path(path), "r", encoding="utf-8") as f: 227 return cls.from_obj(json.load(f)) 228 229 @classmethod 230 def from_env(cls) -> GithubEvent: 231 return cls.from_path(os.environ["GITHUB_EVENT_PATH"]) 232 233 @property 234 def is_review(self) -> bool: 235 return bool(self.payload.get("review")) 236 237 @property 238 def is_comment(self) -> bool: 239 comment = self.payload.get("comment") 240 if not comment: 241 return False 242 if not comment.get("body"): 243 return False 244 return True 245 246 @property 247 def is_comment_added(self) -> bool: 248 return self.is_comment and self.payload.get("action") != "deleted" 249 250 @property 251 def is_pull_request(self) -> bool: 252 return bool(self.payload.get("pull_request")) 253 254 @property 255 def is_pull_request_closed(self) -> bool: 256 return self.is_pull_request and self.payload.get("action") == "closed" 257 258 @property 259 def pull_request_url(self) -> str: 260 if self.is_review: 261 return self.payload["review"]["pull_request_url"] 262 if self.is_comment: 263 return self.payload["issue"]["pull_request"]["url"] 264 if self.is_pull_request: 265 return self.payload["pull_request"]["_links"]["self"]["href"] 266 raise CICDBotError("Unable to determine pull request url") 267 268 @property 269 def pull_request_info(self) -> PullRequestInfo: 270 if not self._pull_request_info: 271 self._pull_request_info = PullRequestInfo.create_from_pull_request_url( 272 self.pull_request_url 273 ) 274 return self._pull_request_info 275 276 @property 277 def pull_request_comment_body(self) -> t.Optional[str]: 278 if self.is_comment_added: 279 return self.payload["comment"]["body"] 280 return None
Takes a Github Actions event payload and provides a simple interface to access
258 @property 259 def pull_request_url(self) -> str: 260 if self.is_review: 261 return self.payload["review"]["pull_request_url"] 262 if self.is_comment: 263 return self.payload["issue"]["pull_request"]["url"] 264 if self.is_pull_request: 265 return self.payload["pull_request"]["_links"]["self"]["href"] 266 raise CICDBotError("Unable to determine pull request url")
283class GithubController: 284 BOT_HEADER_MSG = ":robot: **SQLMesh Bot Info** :robot:" 285 MAX_BYTE_LENGTH = 65535 286 287 def __init__( 288 self, 289 paths: t.Union[Path, t.Iterable[Path]], 290 token: str, 291 config: t.Optional[t.Union[Config, str]] = None, 292 event: t.Optional[GithubEvent] = None, 293 client: t.Optional[Github] = None, 294 context: t.Optional[Context] = None, 295 ) -> None: 296 from github import Github 297 298 logger.debug(f"Initializing GithubController with paths: {paths} and config: {config}") 299 300 self.config = config 301 self._paths = paths 302 self._token = token 303 self._event = event or GithubEvent.from_env() 304 logger.debug(f"Github event: {json.dumps(self._event.payload)}") 305 self._pr_plan_builder: t.Optional[PlanBuilder] = None 306 self._prod_plan_builder: t.Optional[PlanBuilder] = None 307 self._prod_plan_with_gaps_builder: t.Optional[PlanBuilder] = None 308 self._check_run_mapping: t.Dict[str, CheckRun] = {} 309 310 if not isinstance(get_console(), MarkdownConsole): 311 raise CICDBotError("Console must be a markdown console.") 312 self._console = t.cast(MarkdownConsole, get_console()) 313 314 from github.Consts import DEFAULT_BASE_URL 315 from github.Auth import Token 316 317 self._client: Github = client or Github( 318 base_url=os.environ.get("GITHUB_API_URL", DEFAULT_BASE_URL), auth=Token(self._token) 319 ) 320 321 self._repo: Repository = self._client.get_repo( 322 self._event.pull_request_info.full_repo_path, lazy=True 323 ) 324 self._pull_request: PullRequest = self._repo.get_pull( 325 self._event.pull_request_info.pr_number 326 ) 327 self._issue: Issue = self._repo.get_issue(self._event.pull_request_info.pr_number) 328 self._reviews: t.Iterable[PullRequestReview] = self._pull_request.get_reviews() 329 # TODO: The python module says that user names can be None and this is not currently handled 330 self._approvers: t.Set[str] = { 331 review.user.login or "UNKNOWN" 332 for review in self._reviews 333 if review.state.lower() == "approved" 334 } 335 logger.debug(f"Approvers: {', '.join(self._approvers)}") 336 self._context: Context = context or Context(paths=self._paths, config=self.config) 337 338 # Bot config needs the context to be initialized 339 logger.debug(f"Bot config: {self.bot_config.json(indent=2)}") 340 341 @property 342 def deploy_command_enabled(self) -> bool: 343 return self.bot_config.enable_deploy_command 344 345 @property 346 def is_comment_added(self) -> bool: 347 return self._event.is_comment_added 348 349 @property 350 def _required_approvers(self) -> t.List[User]: 351 required_approvers = [ 352 user 353 for user in self._context.users 354 if user.is_required_approver and user.github_username 355 ] 356 logger.debug( 357 f"Required approvers: {', '.join(user.github_username for user in required_approvers if user.github_username)}" 358 ) 359 return required_approvers 360 361 @property 362 def _required_approvers_with_approval(self) -> t.List[User]: 363 return [ 364 user for user in self._required_approvers if user.github_username in self._approvers 365 ] 366 367 @property 368 def pr_environment_name(self) -> str: 369 return Environment.sanitize_name( 370 "_".join( 371 [ 372 self.bot_config.pr_environment_name or self._event.pull_request_info.repo, 373 str(self._event.pull_request_info.pr_number), 374 ] 375 ) 376 ) 377 378 @property 379 def do_required_approval_check(self) -> bool: 380 """We want to skip required approval check if no users have this role""" 381 do_required_approval_check = bool(self._required_approvers) 382 logger.debug(f"Do required approval check: {do_required_approval_check}") 383 return do_required_approval_check 384 385 @property 386 def has_required_approval(self) -> bool: 387 """ 388 Check if the PR has a required approver. 389 390 TODO: Allow defining requiring some number, or all, required approvers. 391 """ 392 if not self._required_approvers or self._required_approvers_with_approval: 393 logger.debug("Has required Approval") 394 return True 395 logger.debug("Does not have required approval") 396 return False 397 398 @property 399 def pr_plan(self) -> Plan: 400 if not self._pr_plan_builder: 401 self._pr_plan_builder = self._context.plan_builder( 402 environment=self.pr_environment_name, 403 skip_tests=True, 404 skip_linter=True, 405 categorizer_config=self.bot_config.auto_categorize_changes, 406 start=self.bot_config.default_pr_start, 407 min_intervals=self.bot_config.pr_min_intervals, 408 skip_backfill=self.bot_config.skip_pr_backfill, 409 include_unmodified=self.bot_config.pr_include_unmodified, 410 forward_only=self.forward_only_plan, 411 ) 412 assert self._pr_plan_builder 413 return self._pr_plan_builder.build() 414 415 @property 416 def pr_plan_or_none(self) -> t.Optional[Plan]: 417 try: 418 return self.pr_plan 419 except: 420 return None 421 422 @property 423 def pr_plan_flags(self) -> t.Optional[t.Dict[str, UserProvidedFlags]]: 424 if pr_plan := self.pr_plan_or_none: 425 return pr_plan.user_provided_flags 426 if pr_plan_builder := self._pr_plan_builder: 427 return pr_plan_builder._user_provided_flags 428 return None 429 430 @property 431 def prod_plan(self) -> Plan: 432 if not self._prod_plan_builder: 433 self._prod_plan_builder = self._context.plan_builder( 434 c.PROD, 435 no_gaps=True, 436 skip_tests=True, 437 skip_linter=True, 438 categorizer_config=self.bot_config.auto_categorize_changes, 439 run=self.bot_config.run_on_deploy_to_prod, 440 forward_only=self.forward_only_plan, 441 ) 442 assert self._prod_plan_builder 443 return self._prod_plan_builder.build() 444 445 @property 446 def prod_plan_with_gaps(self) -> Plan: 447 if not self._prod_plan_with_gaps_builder: 448 self._prod_plan_with_gaps_builder = self._context.plan_builder( 449 c.PROD, 450 # this is required to highlight any data gaps between this PR environment and prod (since PR environments may only contain a subset of data) 451 no_gaps=False, 452 skip_tests=True, 453 skip_linter=True, 454 categorizer_config=self.bot_config.auto_categorize_changes, 455 run=self.bot_config.run_on_deploy_to_prod, 456 forward_only=self.forward_only_plan, 457 ) 458 assert self._prod_plan_with_gaps_builder 459 return self._prod_plan_with_gaps_builder.build() 460 461 @property 462 def bot_config(self) -> GithubCICDBotConfig: 463 bot_config = self._context.config.cicd_bot or GithubCICDBotConfig( 464 auto_categorize_changes=self._context.auto_categorize_changes 465 ) 466 return bot_config 467 468 @property 469 def modified_snapshots(self) -> t.Dict[SnapshotId, t.Union[Snapshot, SnapshotTableInfo]]: 470 return self.prod_plan_with_gaps.modified_snapshots 471 472 @property 473 def removed_snapshots(self) -> t.Set[SnapshotId]: 474 return set(self.prod_plan_with_gaps.context_diff.removed_snapshots) 475 476 @property 477 def pr_targets_prod_branch(self) -> bool: 478 return self._pull_request.base.ref in self.bot_config.prod_branch_names 479 480 @property 481 def forward_only_plan(self) -> bool: 482 default = self._context.config.plan.forward_only 483 head_ref = self._pull_request.head.ref 484 if isinstance(head_ref, str): 485 return head_ref.endswith(self.bot_config.forward_only_branch_suffix) or default 486 return default 487 488 @classmethod 489 def _append_output(cls, key: str, value: str) -> None: 490 """ 491 Appends the given key/value to output so they can be read by following steps 492 """ 493 logger.debug(f"Setting output. Key: {key}, Value: {value}") 494 495 # GitHub Actions sets this environment variable 496 if output_file := os.environ.get("GITHUB_OUTPUT"): 497 with open(output_file, "a", encoding="utf-8") as fh: 498 print(f"{key}={value}", file=fh) 499 500 def get_forward_only_plan_post_deployment_tip(self, plan: Plan) -> str: 501 if not plan.forward_only: 502 return "" 503 504 example_model_name = "<model name>" 505 for snapshot_id in sorted(plan.snapshots): 506 snapshot = plan.snapshots[snapshot_id] 507 if snapshot.is_incremental: 508 example_model_name = snapshot.node.name 509 break 510 511 return ( 512 "> [!TIP]\n" 513 "> In order to see this forward-only plan retroactively apply to historical intervals on the production model, run the below for date ranges in scope:\n" 514 "> \n" 515 f"> `$ sqlmesh plan --restate-model {example_model_name} --start YYYY-MM-DD --end YYYY-MM-DD`\n" 516 ">\n" 517 "> Learn more: https://sqlmesh.readthedocs.io/en/stable/concepts/plans/?h=restate#restatement-plans" 518 ) 519 520 def get_plan_summary(self, plan: Plan) -> str: 521 # use Verbosity.VERY_VERBOSE to prevent the list of models from being truncated 522 # this is particularly important for the "Models needing backfill" list because 523 # there is no easy way to tell this otherwise 524 orig_verbosity = self._console.verbosity 525 self._console.verbosity = Verbosity.VERY_VERBOSE 526 527 try: 528 # Clear out any output that might exist from prior steps 529 self._console.consume_captured_output() 530 if plan.restatements: 531 self._console._print("\n**Restating models**\n") 532 else: 533 self._console.show_environment_difference_summary( 534 context_diff=plan.context_diff, 535 no_diff=False, 536 ) 537 if plan.context_diff.has_changes: 538 self._console.show_model_difference_summary( 539 context_diff=plan.context_diff, 540 environment_naming_info=plan.environment_naming_info, 541 default_catalog=self._context.default_catalog, 542 no_diff=False, 543 ) 544 difference_summary = self._console.consume_captured_output() 545 self._console._show_missing_dates(plan, self._context.default_catalog) 546 missing_dates = self._console.consume_captured_output() 547 548 plan_flags_section = ( 549 f"\n\n{self._generate_plan_flags_section(plan.user_provided_flags)}" 550 if plan.user_provided_flags 551 else "" 552 ) 553 554 if not difference_summary and not missing_dates: 555 return f"No changes to apply.{plan_flags_section}" 556 557 warnings_block = self._console.consume_captured_warnings() 558 errors_block = self._console.consume_captured_errors() 559 560 return f"{warnings_block}{errors_block}{difference_summary}\n{missing_dates}{plan_flags_section}" 561 except PlanError as e: 562 logger.exception("Plan failed to generate") 563 return f"Plan failed to generate. Check for pending or unresolved changes. Error: {e}" 564 finally: 565 self._console.verbosity = orig_verbosity 566 567 def get_pr_environment_summary( 568 self, conclusion: GithubCheckConclusion, exception: t.Optional[Exception] = None 569 ) -> str: 570 heading = "" 571 summary = "" 572 573 if conclusion.is_success: 574 summary = self._get_pr_environment_summary_success() 575 elif conclusion.is_action_required: 576 heading = f":warning: Action Required to create or update PR Environment `{self.pr_environment_name}` :warning:" 577 summary = self._get_pr_environment_summary_action_required(exception) 578 elif conclusion.is_failure: 579 heading = ( 580 f":x: Failed to create or update PR Environment `{self.pr_environment_name}` :x:" 581 ) 582 summary = self._get_pr_environment_summary_failure(exception) 583 elif conclusion.is_skipped: 584 heading = f":next_track_button: Skipped creating or updating PR Environment `{self.pr_environment_name}` :next_track_button:" 585 summary = self._get_pr_environment_summary_skipped(exception) 586 else: 587 heading = f":interrobang: Got an unexpected conclusion: {conclusion.value}" 588 589 # note: we just add warnings here, errors will be covered by the "failure" conclusion 590 if warnings := self._console.consume_captured_warnings(): 591 summary = f"{warnings}\n{summary}" 592 593 return f"{heading}\n\n{summary}".strip() 594 595 def _get_pr_environment_summary_success(self) -> str: 596 prod_plan = self.prod_plan_with_gaps 597 598 if not prod_plan.has_changes: 599 summary = "No models were modified in this PR.\n" 600 else: 601 intro = self._generate_pr_environment_summary_intro() 602 summary = intro + self._generate_pr_environment_summary_list(prod_plan) 603 604 if prod_plan.user_provided_flags: 605 summary += self._generate_plan_flags_section(prod_plan.user_provided_flags) 606 607 return summary 608 609 def _get_pr_environment_summary_skipped(self, exception: t.Optional[Exception] = None) -> str: 610 if isinstance(exception, NoChangesPlanError): 611 skip_reason = "No changes were detected compared to the prod environment." 612 elif isinstance(exception, TestFailure): 613 skip_reason = "Unit Test(s) Failed so skipping PR creation" 614 else: 615 skip_reason = "A prior stage failed resulting in skipping PR creation." 616 617 return skip_reason 618 619 def _get_pr_environment_summary_action_required( 620 self, exception: t.Optional[Exception] = None 621 ) -> str: 622 plan = self.pr_plan_or_none 623 if isinstance(exception, UncategorizedPlanError) and plan: 624 failure_msg = f"The following models could not be categorized automatically:\n" 625 for snapshot in plan.uncategorized: 626 failure_msg += f"- {snapshot.name}\n" 627 failure_msg += ( 628 f"\nRun `sqlmesh plan {self.pr_environment_name}` locally to apply these changes.\n\n" 629 "If you would like the bot to automatically categorize changes, check the [documentation](https://sqlmesh.readthedocs.io/en/stable/integrations/github/) for more information." 630 ) 631 else: 632 failure_msg = "Please check the Actions Workflow logs for more information." 633 634 return failure_msg 635 636 def _get_pr_environment_summary_failure(self, exception: t.Optional[Exception] = None) -> str: 637 console_output = self._console.consume_captured_output() 638 failure_msg = "" 639 640 if isinstance(exception, PlanError): 641 if exception.args and (msg := exception.args[0]) and isinstance(msg, str): 642 failure_msg += f"*{msg}*\n" 643 if console_output: 644 failure_msg += f"\n{console_output}" 645 elif isinstance(exception, (SQLMeshError, SqlglotError, ValueError)): 646 # this logic is taken from the global error handler attached to the CLI, which uses `click.echo()` to output the message 647 # so cant be re-used here because it bypasses the Console 648 failure_msg = f"**Error:** {str(exception)}" 649 elif exception: 650 logger.debug( 651 "Got unexpected error. Error Type: " 652 + str(type(exception)) 653 + " Stack trace: " 654 + traceback.format_exc() 655 ) 656 failure_msg = f"This is an unexpected error.\n\n**Exception:**\n```\n{traceback.format_exc()}\n```" 657 658 if captured_errors := self._console.consume_captured_errors(): 659 failure_msg = f"{captured_errors}\n{failure_msg}" 660 661 if plan_flags := self.pr_plan_flags: 662 failure_msg += f"\n\n{self._generate_plan_flags_section(plan_flags)}" 663 664 return failure_msg 665 666 def run_tests(self) -> t.Tuple[ModelTextTestResult, str]: 667 """ 668 Run tests for the PR 669 """ 670 return self._context._run_tests(verbosity=Verbosity.VERBOSE) 671 672 def run_linter(self) -> None: 673 """ 674 Run linter for the PR 675 """ 676 self._console.consume_captured_output() 677 self._context.lint_models() 678 679 def _get_or_create_comment(self, header: str = BOT_HEADER_MSG) -> IssueComment: 680 comment = seq_get( 681 [comment for comment in self._issue.get_comments() if header in comment.body], 682 0, 683 ) 684 if not comment: 685 logger.debug(f"Did not find comment so creating one with header: {header}") 686 return self._issue.create_comment(header) 687 logger.debug(f"Found comment with header: {header}") 688 return comment 689 690 def _get_merge_state_status(self) -> MergeStateStatus: 691 """ 692 This feature is currently in preview and therefore not available in the python module. 693 So we query GraphQL directly instead. 694 """ 695 headers = { 696 "Authorization": f"Bearer {self._token}", 697 "Accept": "application/vnd.github.merge-info-preview+json", 698 } 699 query = f"""{{ 700 repository(owner: "{self._event.pull_request_info.owner}", name: "{self._event.pull_request_info.repo}") {{ 701 pullRequest(number: {self._event.pull_request_info.pr_number}) {{ 702 title 703 state 704 mergeStateStatus 705 }} 706 }} 707 }}""" 708 request = requests.post( 709 os.environ["GITHUB_GRAPHQL_URL"], 710 json={"query": query}, 711 headers=headers, 712 ) 713 if request.status_code == 200: 714 merge_status = MergeStateStatus( 715 request.json()["data"]["repository"]["pullRequest"]["mergeStateStatus"].lower() 716 ) 717 logger.debug(f"Merge state status: {merge_status.value}") 718 return merge_status 719 raise CICDBotError(f"Unable to get merge state status. Error: {request.text}") 720 721 def update_sqlmesh_comment_info( 722 self, value: str, *, dedup_regex: t.Optional[str] 723 ) -> t.Tuple[bool, IssueComment]: 724 """ 725 Update the SQLMesh PR Comment for the given lookup key with the given value. If a comment does not exist then 726 it creates one. It determines the comment to update by looking for a comment with the header. If a dedup 727 regex is provided then it will check if the value already exists in the comment and if so it will not update 728 """ 729 comment = self._get_or_create_comment() 730 if dedup_regex: 731 # If we find a match against the regex then we just return since the comment has already been posted 732 if seq_get(re.findall(dedup_regex, comment.body), 0): 733 return False, comment 734 full_comment = f"{comment.body}\n{value}" 735 body, *truncated = self._chunk_up_api_message(f"{full_comment}") 736 if truncated: 737 logger.warning( 738 f"Comment body was too long so we truncated it. Full text: {full_comment}" 739 ) 740 comment.edit(body=body) 741 return True, comment 742 743 def update_pr_environment(self) -> None: 744 """ 745 Creates a PR environment from the logic present in the PR. If the PR contains changes that are 746 uncategorized, then an error will be raised. 747 """ 748 self._console.consume_captured_output() # clear output buffer 749 self._context.apply(self.pr_plan) # will raise if PR environment creation fails 750 751 # update PR info comment 752 vde_title = "- :eyes: To **review** this PR's changes, use virtual data environment:" 753 comment_value = f"{vde_title}\n - `{self.pr_environment_name}`" 754 if self.bot_config.enable_deploy_command: 755 full_command = f"{self.bot_config.command_namespace or ''}/deploy" 756 comment_value += f"\n- :arrow_forward: To **apply** this PR's plan to prod, comment:\n - `{full_command}`" 757 dedup_regex = vde_title.replace("*", r"\*") + r".*" 758 updated_comment, _ = self.update_sqlmesh_comment_info( 759 value=comment_value, 760 dedup_regex=dedup_regex, 761 ) 762 if updated_comment: 763 self._append_output("created_pr_environment", "true") 764 765 def deploy_to_prod(self) -> None: 766 """ 767 Attempts to deploy a plan to prod. If the plan is not up-to-date or has gaps then it will raise. 768 """ 769 # If the PR is already merged then we will not deploy to prod if this event was triggered prior to the merge. 770 # The deploy can still happen if the workflow is configured to listen for `closed` events. 771 if self._pull_request.merged and not self._event.is_pull_request_closed: 772 raise CICDBotError( 773 "PR is already merged and this event was triggered prior to the merge." 774 ) 775 merge_status = self._get_merge_state_status() 776 if self.bot_config.check_if_blocked_on_deploy_to_prod and merge_status.is_blocked: 777 raise CICDBotError( 778 "Branch protection or ruleset requirement is likely not satisfied, e.g. missing CODEOWNERS approval. " 779 "Please check PR and resolve any issues. To disable this check, set `check_if_blocked_on_deploy_to_prod` to false in the bot configuration." 780 ) 781 if merge_status.is_dirty: 782 raise CICDBotError( 783 "Merge commit cannot be cleanly created. Likely from a merge conflict. " 784 "Please check PR and resolve any issues." 785 ) 786 plan_summary = f"""<details> 787 <summary>:ship: Prod Plan Being Applied</summary> 788 789{self.get_plan_summary(self.prod_plan)} 790</details> 791 792""" 793 if self.forward_only_plan: 794 plan_summary = ( 795 f"{self.get_forward_only_plan_post_deployment_tip(self.prod_plan)}\n{plan_summary}" 796 ) 797 798 self.update_sqlmesh_comment_info( 799 value=plan_summary, 800 dedup_regex=None, 801 ) 802 self._context.apply(self.prod_plan) 803 804 def try_invalidate_pr_environment(self) -> None: 805 """ 806 Marks the PR environment for garbage collection. 807 """ 808 if self.bot_config.invalidate_environment_after_deploy: 809 self._context.invalidate_environment(self.pr_environment_name) 810 811 def _update_check( 812 self, 813 name: str, 814 status: GithubCheckStatus, 815 title: str, 816 conclusion: t.Optional[GithubCheckConclusion] = None, 817 full_summary: t.Optional[str] = None, 818 ) -> None: 819 """ 820 Updates the status of the merge commit. 821 """ 822 current_time = now() 823 kwargs: t.Dict[str, t.Any] = { 824 "name": name, 825 # Note: The environment variable `GITHUB_SHA` would be the merge commit so that is why instead we 826 # get the last commit on the PR. 827 "head_sha": self._pull_request.head.sha, 828 "status": status.value, 829 } 830 if status.is_in_progress: 831 kwargs["started_at"] = current_time 832 if status.is_completed: 833 kwargs["completed_at"] = current_time 834 if conclusion: 835 kwargs["conclusion"] = conclusion.value 836 full_summary = full_summary or title 837 summary, text, *truncated = self._chunk_up_api_message(full_summary) + [None] 838 if truncated and truncated[0] is not None: 839 logger.warning(f"Summary was too long so we truncated it. Full text: {full_summary}") 840 kwargs["output"] = {"title": title, "summary": summary} 841 if text: 842 kwargs["output"]["text"] = text 843 logger.debug(f"Updating check with kwargs: {kwargs}") 844 845 if self.running_in_github_actions: 846 # Only make the API call to update the checks if we are running within GitHub Actions 847 # One very annoying limitation of the Pull Request Checks API is that its only available to GitHub Apps 848 # and not personal access tokens, which makes it unable to be utilized during local development 849 if name in self._check_run_mapping: 850 logger.debug(f"Found check run in mapping so updating it. Name: {name}") 851 check_run = self._check_run_mapping[name] 852 check_run.edit( 853 **{ 854 k: v 855 for k, v in kwargs.items() 856 if k not in ("name", "head_sha", "started_at") 857 } 858 ) 859 else: 860 logger.debug(f"Did not find check run in mapping so creating it. Name: {name}") 861 self._check_run_mapping[name] = self._repo.create_check_run(**kwargs) 862 else: 863 # Output the summary using print() so the newlines are resolved and the result can easily 864 # be disambiguated from the rest of the console output and copy+pasted into a Markdown renderer 865 print( 866 f"---CHECK OUTPUT START: {kwargs['output']['title']} ---\n{kwargs['output']['summary']}\n---CHECK OUTPUT END---\n" 867 ) 868 869 if conclusion: 870 self._append_output( 871 word_characters_only(name.replace("SQLMesh - ", "").lower()), conclusion.value 872 ) 873 874 def _update_check_handler( 875 self, 876 check_name: str, 877 status: GithubCheckStatus, 878 conclusion: t.Optional[GithubCheckConclusion], 879 status_handler: t.Callable[[GithubCheckStatus], t.Tuple[str, t.Optional[str]]], 880 conclusion_handler: t.Callable[ 881 [GithubCheckConclusion], t.Tuple[GithubCheckConclusion, str, t.Optional[str]] 882 ], 883 ) -> None: 884 if conclusion: 885 conclusion, title, summary = conclusion_handler(conclusion) 886 else: 887 title, summary = status_handler(status) 888 self._update_check( 889 name=check_name, 890 status=status, 891 title=title, 892 conclusion=conclusion, 893 full_summary=summary, 894 ) 895 896 def update_linter_check( 897 self, 898 status: GithubCheckStatus, 899 conclusion: t.Optional[GithubCheckConclusion] = None, 900 ) -> None: 901 if not self._context.config.linter.enabled: 902 return 903 904 def conclusion_handler( 905 conclusion: GithubCheckConclusion, 906 ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]: 907 linter_summary = self._console.consume_captured_output() or "Linter Success" 908 909 title = "Linter results" 910 911 return conclusion, title, linter_summary 912 913 self._update_check_handler( 914 check_name="SQLMesh - Linter", 915 status=status, 916 conclusion=conclusion, 917 status_handler=lambda status: ( 918 { 919 GithubCheckStatus.IN_PROGRESS: "Running linter", 920 GithubCheckStatus.QUEUED: "Waiting to Run linter", 921 }[status], 922 None, 923 ), 924 conclusion_handler=conclusion_handler, 925 ) 926 927 def update_test_check( 928 self, 929 status: GithubCheckStatus, 930 conclusion: t.Optional[GithubCheckConclusion] = None, 931 result: t.Optional[ModelTextTestResult] = None, 932 traceback: t.Optional[str] = None, 933 ) -> None: 934 """ 935 Updates the status of tests for code in the PR 936 """ 937 938 def conclusion_handler( 939 conclusion: GithubCheckConclusion, 940 result: t.Optional[ModelTextTestResult], 941 ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]: 942 if result: 943 # Clear out console 944 self._console.consume_captured_output() 945 self._console.log_test_results( 946 result, 947 self._context.test_connection_config._engine_adapter.DIALECT, 948 ) 949 test_summary = self._console.consume_captured_output() 950 test_title = "Tests Passed" if result.wasSuccessful() else "Tests Failed" 951 test_conclusion = ( 952 GithubCheckConclusion.SUCCESS 953 if result.wasSuccessful() 954 else GithubCheckConclusion.FAILURE 955 ) 956 return test_conclusion, test_title, test_summary 957 if traceback: 958 self._console._print(traceback) 959 960 test_title = "Skipped Tests" if conclusion.is_skipped else "Tests Failed" 961 return conclusion, test_title, traceback 962 963 self._update_check_handler( 964 check_name="SQLMesh - Run Unit Tests", 965 status=status, 966 conclusion=conclusion, 967 status_handler=lambda status: ( 968 { 969 GithubCheckStatus.IN_PROGRESS: "Running Tests", 970 GithubCheckStatus.QUEUED: "Waiting to Run Tests", 971 }[status], 972 None, 973 ), 974 conclusion_handler=functools.partial(conclusion_handler, result=result), 975 ) 976 977 def update_required_approval_check( 978 self, status: GithubCheckStatus, conclusion: t.Optional[GithubCheckConclusion] = None 979 ) -> None: 980 """ 981 Updates the status of the merge commit for the required approval. 982 """ 983 984 def conclusion_handler( 985 conclusion: GithubCheckConclusion, 986 ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]: 987 test_summary = "**List of possible required approvers:**\n" 988 for user in self._required_approvers: 989 test_summary += f"- `{user.github_username or user.username}`\n" 990 991 title = ( 992 f"Obtained approval from required approvers: {', '.join([user.github_username or user.username for user in self._required_approvers_with_approval])}" 993 if conclusion.is_success 994 else "Need a Required Approval" 995 ) 996 return conclusion, title, test_summary 997 998 # If we get a skip that means required approvers is not configured therefore it does not need to be displayed 999 if conclusion and conclusion.is_skipped: 1000 return 1001 1002 self._update_check_handler( 1003 check_name="SQLMesh - Has Required Approval", 1004 status=status, 1005 conclusion=conclusion, 1006 status_handler=lambda status: ( 1007 { 1008 GithubCheckStatus.IN_PROGRESS: "Checking if we have required Approvers", 1009 GithubCheckStatus.QUEUED: "Waiting to Check if we have required Approvers", 1010 }[status], 1011 None, 1012 ), 1013 conclusion_handler=conclusion_handler, 1014 ) 1015 1016 def update_pr_environment_check( 1017 self, status: GithubCheckStatus, exception: t.Optional[Exception] = None 1018 ) -> t.Optional[GithubCheckConclusion]: 1019 """ 1020 Updates the status of the merge commit for the PR environment. 1021 """ 1022 conclusion: t.Optional[GithubCheckConclusion] = None 1023 if isinstance(exception, (NoChangesPlanError, TestFailure, LinterError)): 1024 conclusion = GithubCheckConclusion.SKIPPED 1025 elif isinstance(exception, UncategorizedPlanError): 1026 conclusion = GithubCheckConclusion.ACTION_REQUIRED 1027 elif exception: 1028 conclusion = GithubCheckConclusion.FAILURE 1029 elif status.is_completed: 1030 conclusion = GithubCheckConclusion.SUCCESS 1031 1032 check_title_static = "PR Virtual Data Environment: " 1033 check_title = check_title_static + self.pr_environment_name 1034 1035 def conclusion_handler( 1036 conclusion: GithubCheckConclusion, exception: t.Optional[Exception] 1037 ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]: 1038 summary = self.get_pr_environment_summary(conclusion, exception) 1039 self._append_output("pr_environment_name", self.pr_environment_name) 1040 return conclusion, check_title, summary 1041 1042 self._update_check_handler( 1043 check_name="SQLMesh - PR Environment Synced", 1044 status=status, 1045 conclusion=conclusion, 1046 status_handler=lambda status: ( 1047 check_title, 1048 { 1049 GithubCheckStatus.QUEUED: f":pause_button: Waiting to create or update PR Environment `{self.pr_environment_name}`", 1050 GithubCheckStatus.IN_PROGRESS: f":rocket: Creating or Updating PR Environment `{self.pr_environment_name}`", 1051 }[status], 1052 ), 1053 conclusion_handler=functools.partial(conclusion_handler, exception=exception), 1054 ) 1055 return conclusion 1056 1057 def update_prod_plan_preview_check( 1058 self, 1059 status: GithubCheckStatus, 1060 conclusion: t.Optional[GithubCheckConclusion] = None, 1061 summary: t.Optional[str] = None, 1062 ) -> None: 1063 """ 1064 Updates the status of the merge commit for the prod plan preview. 1065 """ 1066 1067 def conclusion_handler( 1068 conclusion: GithubCheckConclusion, summary: t.Optional[str] = None 1069 ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]: 1070 conclusion_to_title = { 1071 GithubCheckConclusion.SUCCESS: "Prod Plan Preview", 1072 GithubCheckConclusion.CANCELLED: "Cancelled generating prod plan preview", 1073 GithubCheckConclusion.SKIPPED: "Skipped generating prod plan preview since PR was not synchronized", 1074 GithubCheckConclusion.FAILURE: "Failed to generate prod plan preview", 1075 } 1076 title = conclusion_to_title.get( 1077 conclusion, f"Got an unexpected conclusion: {conclusion.value}" 1078 ) 1079 if conclusion == GithubCheckConclusion.SUCCESS and summary: 1080 summary = ( 1081 f"This is a preview that shows the differences between this PR environment `{self.pr_environment_name}` and `prod`.\n\n" 1082 "These are the changes that would be deployed.\n\n" 1083 ) + summary 1084 1085 return conclusion, title, summary 1086 1087 self._update_check_handler( 1088 check_name="SQLMesh - Prod Plan Preview", 1089 status=status, 1090 conclusion=conclusion, 1091 status_handler=lambda status: ( 1092 { 1093 GithubCheckStatus.IN_PROGRESS: "Generating Prod Plan", 1094 GithubCheckStatus.QUEUED: "Waiting to Generate Prod Plan", 1095 }[status], 1096 None, 1097 ), 1098 conclusion_handler=functools.partial(conclusion_handler, summary=summary), 1099 ) 1100 1101 def update_prod_environment_check( 1102 self, 1103 status: GithubCheckStatus, 1104 conclusion: t.Optional[GithubCheckConclusion] = None, 1105 skip_reason: t.Optional[str] = None, 1106 plan_error: t.Optional[PlanError] = None, 1107 ) -> None: 1108 """ 1109 Updates the status of the merge commit for the prod environment. 1110 """ 1111 1112 def conclusion_handler( 1113 conclusion: GithubCheckConclusion, skip_reason: t.Optional[str] = None 1114 ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]: 1115 conclusion_to_title = { 1116 GithubCheckConclusion.SUCCESS: "Deployed to Prod", 1117 GithubCheckConclusion.CANCELLED: "Cancelled deploying to prod", 1118 GithubCheckConclusion.SKIPPED: "Skipped deployment", 1119 GithubCheckConclusion.FAILURE: "Failed to deploy to prod", 1120 GithubCheckConclusion.ACTION_REQUIRED: "Failed due to error applying plan", 1121 } 1122 title = ( 1123 conclusion_to_title.get(conclusion) 1124 or f"Got an unexpected conclusion: {conclusion.value}" 1125 ) 1126 if conclusion.is_skipped: 1127 summary = skip_reason 1128 elif conclusion.is_failure: 1129 captured_errors = self._console.consume_captured_errors() 1130 summary = ( 1131 captured_errors or f"{title}\n\n**Error:**\n```\n{traceback.format_exc()}\n```" 1132 ) 1133 elif conclusion.is_action_required: 1134 if plan_error: 1135 summary = f"**Plan error:**\n```\n{plan_error}\n```" 1136 else: 1137 summary = "Got an action required conclusion but no plan error was provided. This is unexpected." 1138 else: 1139 summary = "**Generated Prod Plan**\n" + self.get_plan_summary(self.prod_plan) 1140 1141 return conclusion, title, summary 1142 1143 self._update_check_handler( 1144 check_name="SQLMesh - Prod Environment Synced", 1145 status=status, 1146 conclusion=conclusion, 1147 status_handler=lambda status: ( 1148 { 1149 GithubCheckStatus.IN_PROGRESS: "Deploying to Prod", 1150 GithubCheckStatus.QUEUED: "Waiting to see if we can deploy to prod", 1151 }[status], 1152 None, 1153 ), 1154 conclusion_handler=functools.partial(conclusion_handler, skip_reason=skip_reason), 1155 ) 1156 1157 def try_merge_pr(self) -> None: 1158 """ 1159 Merges the PR using the merge method defined in the bot config. If one is not defined then a merge is not 1160 performed 1161 """ 1162 if self.bot_config.merge_method: 1163 logger.debug(f"Merging PR with merge method: {self.bot_config.merge_method.value}") 1164 self._pull_request.merge(merge_method=self.bot_config.merge_method.value) 1165 else: 1166 logger.debug("No merge method defined so skipping merge") 1167 1168 def get_command_from_comment(self) -> BotCommand: 1169 """ 1170 Gets the command from the comment 1171 """ 1172 if not self._event.is_comment_added: 1173 logger.debug("Event is not a comment so returning invalid") 1174 return BotCommand.INVALID 1175 if self._event.pull_request_comment_body is None: 1176 raise CICDBotError("Unable to get comment body") 1177 logger.debug(f"Getting command from comment body: {self._event.pull_request_comment_body}") 1178 return BotCommand.from_comment_body( 1179 self._event.pull_request_comment_body, self.bot_config.command_namespace 1180 ) 1181 1182 def _chunk_up_api_message(self, message: str) -> t.List[str]: 1183 """ 1184 Chunks up the message into `MAX_BYTE_LENGTH` byte chunks 1185 """ 1186 message_encoded = message.encode("utf-8") 1187 return [ 1188 message_encoded[i : i + self.MAX_BYTE_LENGTH].decode("utf-8", "ignore") 1189 for i in range(0, len(message_encoded), self.MAX_BYTE_LENGTH) 1190 ] 1191 1192 @property 1193 def running_in_github_actions(self) -> bool: 1194 return os.environ.get("GITHUB_ACTIONS", None) == "true" 1195 1196 @property 1197 def version_info(self) -> str: 1198 from sqlmesh.cli.main import _sqlmesh_version 1199 1200 return _sqlmesh_version() 1201 1202 def _generate_plan_flags_section( 1203 self, user_provided_flags: t.Dict[str, UserProvidedFlags] 1204 ) -> str: 1205 # collapsed section syntax: 1206 # https://docs.github.com/en/get-started/writing-on-github/working-with-advanced-formatting/organizing-information-with-collapsed-sections#creating-a-collapsed-section 1207 section = "<details>\n\n<summary>Plan flags</summary>\n\n" 1208 for flag_name, flag_value in user_provided_flags.items(): 1209 section += f"- `{flag_name}` = `{flag_value}`\n" 1210 section += "\n</details>" 1211 1212 return section 1213 1214 def _generate_pr_environment_summary_intro(self) -> str: 1215 note = "" 1216 subset_reasons = [] 1217 1218 if self.bot_config.skip_pr_backfill: 1219 subset_reasons.append("`skip_pr_backfill` is enabled") 1220 1221 if default_pr_start := self.bot_config.default_pr_start: 1222 subset_reasons.append(f"`default_pr_start` is set to `{default_pr_start}`") 1223 1224 if subset_reasons: 1225 note = ( 1226 "> [!IMPORTANT]\n" 1227 f"> This PR environment may only contain a subset of data because:\n" 1228 + "\n".join(f"> - {r}" for r in subset_reasons) 1229 + "\n" 1230 "> \n" 1231 "> This means that deploying to `prod` may not be a simple virtual update if there is still some data to load.\n" 1232 "> See `Dates not loaded in PR` below or the `Prod Plan Preview` check for more information.\n\n" 1233 ) 1234 1235 return ( 1236 f"Here is a summary of data that has been loaded into the PR environment `{self.pr_environment_name}` and could be deployed to `prod`.\n\n" 1237 + note 1238 ) 1239 1240 def _generate_pr_environment_summary_list(self, plan: Plan) -> str: 1241 added_snapshot_ids = set(plan.context_diff.added) 1242 modified_snapshot_ids = set( 1243 s.snapshot_id for s, _ in plan.context_diff.modified_snapshots.values() 1244 ) 1245 removed_snapshot_ids = set(plan.context_diff.removed_snapshots.keys()) 1246 1247 # note: we sort these to get a deterministic order for the output tests 1248 table_records = sorted( 1249 [ 1250 SnapshotSummaryRecord(snapshot_id=snapshot_id, plan=plan) 1251 for snapshot_id in ( 1252 added_snapshot_ids | modified_snapshot_ids | removed_snapshot_ids 1253 ) 1254 ], 1255 key=lambda r: r.display_name, 1256 ) 1257 1258 sections = [ 1259 ("### Added", [r for r in table_records if r.is_added]), 1260 ("### Removed", [r for r in table_records if r.is_removed]), 1261 ("### Directly Modified", [r for r in table_records if r.is_directly_modified]), 1262 ("### Indirectly Modified", [r for r in table_records if r.is_indirectly_modified]), 1263 ( 1264 "### Metadata Updated", 1265 [r for r in table_records if r.is_metadata_updated and not r.is_modified], 1266 ), 1267 ] 1268 1269 summary = "" 1270 for title, records in sections: 1271 if records: 1272 summary += f"\n{title}\n" 1273 1274 for record in records: 1275 summary += f"{record.as_markdown_list_item}\n" 1276 1277 return summary
287 def __init__( 288 self, 289 paths: t.Union[Path, t.Iterable[Path]], 290 token: str, 291 config: t.Optional[t.Union[Config, str]] = None, 292 event: t.Optional[GithubEvent] = None, 293 client: t.Optional[Github] = None, 294 context: t.Optional[Context] = None, 295 ) -> None: 296 from github import Github 297 298 logger.debug(f"Initializing GithubController with paths: {paths} and config: {config}") 299 300 self.config = config 301 self._paths = paths 302 self._token = token 303 self._event = event or GithubEvent.from_env() 304 logger.debug(f"Github event: {json.dumps(self._event.payload)}") 305 self._pr_plan_builder: t.Optional[PlanBuilder] = None 306 self._prod_plan_builder: t.Optional[PlanBuilder] = None 307 self._prod_plan_with_gaps_builder: t.Optional[PlanBuilder] = None 308 self._check_run_mapping: t.Dict[str, CheckRun] = {} 309 310 if not isinstance(get_console(), MarkdownConsole): 311 raise CICDBotError("Console must be a markdown console.") 312 self._console = t.cast(MarkdownConsole, get_console()) 313 314 from github.Consts import DEFAULT_BASE_URL 315 from github.Auth import Token 316 317 self._client: Github = client or Github( 318 base_url=os.environ.get("GITHUB_API_URL", DEFAULT_BASE_URL), auth=Token(self._token) 319 ) 320 321 self._repo: Repository = self._client.get_repo( 322 self._event.pull_request_info.full_repo_path, lazy=True 323 ) 324 self._pull_request: PullRequest = self._repo.get_pull( 325 self._event.pull_request_info.pr_number 326 ) 327 self._issue: Issue = self._repo.get_issue(self._event.pull_request_info.pr_number) 328 self._reviews: t.Iterable[PullRequestReview] = self._pull_request.get_reviews() 329 # TODO: The python module says that user names can be None and this is not currently handled 330 self._approvers: t.Set[str] = { 331 review.user.login or "UNKNOWN" 332 for review in self._reviews 333 if review.state.lower() == "approved" 334 } 335 logger.debug(f"Approvers: {', '.join(self._approvers)}") 336 self._context: Context = context or Context(paths=self._paths, config=self.config) 337 338 # Bot config needs the context to be initialized 339 logger.debug(f"Bot config: {self.bot_config.json(indent=2)}")
378 @property 379 def do_required_approval_check(self) -> bool: 380 """We want to skip required approval check if no users have this role""" 381 do_required_approval_check = bool(self._required_approvers) 382 logger.debug(f"Do required approval check: {do_required_approval_check}") 383 return do_required_approval_check
We want to skip required approval check if no users have this role
385 @property 386 def has_required_approval(self) -> bool: 387 """ 388 Check if the PR has a required approver. 389 390 TODO: Allow defining requiring some number, or all, required approvers. 391 """ 392 if not self._required_approvers or self._required_approvers_with_approval: 393 logger.debug("Has required Approval") 394 return True 395 logger.debug("Does not have required approval") 396 return False
Check if the PR has a required approver.
TODO: Allow defining requiring some number, or all, required approvers.
398 @property 399 def pr_plan(self) -> Plan: 400 if not self._pr_plan_builder: 401 self._pr_plan_builder = self._context.plan_builder( 402 environment=self.pr_environment_name, 403 skip_tests=True, 404 skip_linter=True, 405 categorizer_config=self.bot_config.auto_categorize_changes, 406 start=self.bot_config.default_pr_start, 407 min_intervals=self.bot_config.pr_min_intervals, 408 skip_backfill=self.bot_config.skip_pr_backfill, 409 include_unmodified=self.bot_config.pr_include_unmodified, 410 forward_only=self.forward_only_plan, 411 ) 412 assert self._pr_plan_builder 413 return self._pr_plan_builder.build()
430 @property 431 def prod_plan(self) -> Plan: 432 if not self._prod_plan_builder: 433 self._prod_plan_builder = self._context.plan_builder( 434 c.PROD, 435 no_gaps=True, 436 skip_tests=True, 437 skip_linter=True, 438 categorizer_config=self.bot_config.auto_categorize_changes, 439 run=self.bot_config.run_on_deploy_to_prod, 440 forward_only=self.forward_only_plan, 441 ) 442 assert self._prod_plan_builder 443 return self._prod_plan_builder.build()
445 @property 446 def prod_plan_with_gaps(self) -> Plan: 447 if not self._prod_plan_with_gaps_builder: 448 self._prod_plan_with_gaps_builder = self._context.plan_builder( 449 c.PROD, 450 # this is required to highlight any data gaps between this PR environment and prod (since PR environments may only contain a subset of data) 451 no_gaps=False, 452 skip_tests=True, 453 skip_linter=True, 454 categorizer_config=self.bot_config.auto_categorize_changes, 455 run=self.bot_config.run_on_deploy_to_prod, 456 forward_only=self.forward_only_plan, 457 ) 458 assert self._prod_plan_with_gaps_builder 459 return self._prod_plan_with_gaps_builder.build()
500 def get_forward_only_plan_post_deployment_tip(self, plan: Plan) -> str: 501 if not plan.forward_only: 502 return "" 503 504 example_model_name = "<model name>" 505 for snapshot_id in sorted(plan.snapshots): 506 snapshot = plan.snapshots[snapshot_id] 507 if snapshot.is_incremental: 508 example_model_name = snapshot.node.name 509 break 510 511 return ( 512 "> [!TIP]\n" 513 "> In order to see this forward-only plan retroactively apply to historical intervals on the production model, run the below for date ranges in scope:\n" 514 "> \n" 515 f"> `$ sqlmesh plan --restate-model {example_model_name} --start YYYY-MM-DD --end YYYY-MM-DD`\n" 516 ">\n" 517 "> Learn more: https://sqlmesh.readthedocs.io/en/stable/concepts/plans/?h=restate#restatement-plans" 518 )
520 def get_plan_summary(self, plan: Plan) -> str: 521 # use Verbosity.VERY_VERBOSE to prevent the list of models from being truncated 522 # this is particularly important for the "Models needing backfill" list because 523 # there is no easy way to tell this otherwise 524 orig_verbosity = self._console.verbosity 525 self._console.verbosity = Verbosity.VERY_VERBOSE 526 527 try: 528 # Clear out any output that might exist from prior steps 529 self._console.consume_captured_output() 530 if plan.restatements: 531 self._console._print("\n**Restating models**\n") 532 else: 533 self._console.show_environment_difference_summary( 534 context_diff=plan.context_diff, 535 no_diff=False, 536 ) 537 if plan.context_diff.has_changes: 538 self._console.show_model_difference_summary( 539 context_diff=plan.context_diff, 540 environment_naming_info=plan.environment_naming_info, 541 default_catalog=self._context.default_catalog, 542 no_diff=False, 543 ) 544 difference_summary = self._console.consume_captured_output() 545 self._console._show_missing_dates(plan, self._context.default_catalog) 546 missing_dates = self._console.consume_captured_output() 547 548 plan_flags_section = ( 549 f"\n\n{self._generate_plan_flags_section(plan.user_provided_flags)}" 550 if plan.user_provided_flags 551 else "" 552 ) 553 554 if not difference_summary and not missing_dates: 555 return f"No changes to apply.{plan_flags_section}" 556 557 warnings_block = self._console.consume_captured_warnings() 558 errors_block = self._console.consume_captured_errors() 559 560 return f"{warnings_block}{errors_block}{difference_summary}\n{missing_dates}{plan_flags_section}" 561 except PlanError as e: 562 logger.exception("Plan failed to generate") 563 return f"Plan failed to generate. Check for pending or unresolved changes. Error: {e}" 564 finally: 565 self._console.verbosity = orig_verbosity
567 def get_pr_environment_summary( 568 self, conclusion: GithubCheckConclusion, exception: t.Optional[Exception] = None 569 ) -> str: 570 heading = "" 571 summary = "" 572 573 if conclusion.is_success: 574 summary = self._get_pr_environment_summary_success() 575 elif conclusion.is_action_required: 576 heading = f":warning: Action Required to create or update PR Environment `{self.pr_environment_name}` :warning:" 577 summary = self._get_pr_environment_summary_action_required(exception) 578 elif conclusion.is_failure: 579 heading = ( 580 f":x: Failed to create or update PR Environment `{self.pr_environment_name}` :x:" 581 ) 582 summary = self._get_pr_environment_summary_failure(exception) 583 elif conclusion.is_skipped: 584 heading = f":next_track_button: Skipped creating or updating PR Environment `{self.pr_environment_name}` :next_track_button:" 585 summary = self._get_pr_environment_summary_skipped(exception) 586 else: 587 heading = f":interrobang: Got an unexpected conclusion: {conclusion.value}" 588 589 # note: we just add warnings here, errors will be covered by the "failure" conclusion 590 if warnings := self._console.consume_captured_warnings(): 591 summary = f"{warnings}\n{summary}" 592 593 return f"{heading}\n\n{summary}".strip()
666 def run_tests(self) -> t.Tuple[ModelTextTestResult, str]: 667 """ 668 Run tests for the PR 669 """ 670 return self._context._run_tests(verbosity=Verbosity.VERBOSE)
Run tests for the PR
672 def run_linter(self) -> None: 673 """ 674 Run linter for the PR 675 """ 676 self._console.consume_captured_output() 677 self._context.lint_models()
Run linter for the PR
721 def update_sqlmesh_comment_info( 722 self, value: str, *, dedup_regex: t.Optional[str] 723 ) -> t.Tuple[bool, IssueComment]: 724 """ 725 Update the SQLMesh PR Comment for the given lookup key with the given value. If a comment does not exist then 726 it creates one. It determines the comment to update by looking for a comment with the header. If a dedup 727 regex is provided then it will check if the value already exists in the comment and if so it will not update 728 """ 729 comment = self._get_or_create_comment() 730 if dedup_regex: 731 # If we find a match against the regex then we just return since the comment has already been posted 732 if seq_get(re.findall(dedup_regex, comment.body), 0): 733 return False, comment 734 full_comment = f"{comment.body}\n{value}" 735 body, *truncated = self._chunk_up_api_message(f"{full_comment}") 736 if truncated: 737 logger.warning( 738 f"Comment body was too long so we truncated it. Full text: {full_comment}" 739 ) 740 comment.edit(body=body) 741 return True, comment
Update the SQLMesh PR Comment for the given lookup key with the given value. If a comment does not exist then it creates one. It determines the comment to update by looking for a comment with the header. If a dedup regex is provided then it will check if the value already exists in the comment and if so it will not update
743 def update_pr_environment(self) -> None: 744 """ 745 Creates a PR environment from the logic present in the PR. If the PR contains changes that are 746 uncategorized, then an error will be raised. 747 """ 748 self._console.consume_captured_output() # clear output buffer 749 self._context.apply(self.pr_plan) # will raise if PR environment creation fails 750 751 # update PR info comment 752 vde_title = "- :eyes: To **review** this PR's changes, use virtual data environment:" 753 comment_value = f"{vde_title}\n - `{self.pr_environment_name}`" 754 if self.bot_config.enable_deploy_command: 755 full_command = f"{self.bot_config.command_namespace or ''}/deploy" 756 comment_value += f"\n- :arrow_forward: To **apply** this PR's plan to prod, comment:\n - `{full_command}`" 757 dedup_regex = vde_title.replace("*", r"\*") + r".*" 758 updated_comment, _ = self.update_sqlmesh_comment_info( 759 value=comment_value, 760 dedup_regex=dedup_regex, 761 ) 762 if updated_comment: 763 self._append_output("created_pr_environment", "true")
Creates a PR environment from the logic present in the PR. If the PR contains changes that are uncategorized, then an error will be raised.
765 def deploy_to_prod(self) -> None: 766 """ 767 Attempts to deploy a plan to prod. If the plan is not up-to-date or has gaps then it will raise. 768 """ 769 # If the PR is already merged then we will not deploy to prod if this event was triggered prior to the merge. 770 # The deploy can still happen if the workflow is configured to listen for `closed` events. 771 if self._pull_request.merged and not self._event.is_pull_request_closed: 772 raise CICDBotError( 773 "PR is already merged and this event was triggered prior to the merge." 774 ) 775 merge_status = self._get_merge_state_status() 776 if self.bot_config.check_if_blocked_on_deploy_to_prod and merge_status.is_blocked: 777 raise CICDBotError( 778 "Branch protection or ruleset requirement is likely not satisfied, e.g. missing CODEOWNERS approval. " 779 "Please check PR and resolve any issues. To disable this check, set `check_if_blocked_on_deploy_to_prod` to false in the bot configuration." 780 ) 781 if merge_status.is_dirty: 782 raise CICDBotError( 783 "Merge commit cannot be cleanly created. Likely from a merge conflict. " 784 "Please check PR and resolve any issues." 785 ) 786 plan_summary = f"""<details> 787 <summary>:ship: Prod Plan Being Applied</summary> 788 789{self.get_plan_summary(self.prod_plan)} 790</details> 791 792""" 793 if self.forward_only_plan: 794 plan_summary = ( 795 f"{self.get_forward_only_plan_post_deployment_tip(self.prod_plan)}\n{plan_summary}" 796 ) 797 798 self.update_sqlmesh_comment_info( 799 value=plan_summary, 800 dedup_regex=None, 801 ) 802 self._context.apply(self.prod_plan)
Attempts to deploy a plan to prod. If the plan is not up-to-date or has gaps then it will raise.
804 def try_invalidate_pr_environment(self) -> None: 805 """ 806 Marks the PR environment for garbage collection. 807 """ 808 if self.bot_config.invalidate_environment_after_deploy: 809 self._context.invalidate_environment(self.pr_environment_name)
Marks the PR environment for garbage collection.
896 def update_linter_check( 897 self, 898 status: GithubCheckStatus, 899 conclusion: t.Optional[GithubCheckConclusion] = None, 900 ) -> None: 901 if not self._context.config.linter.enabled: 902 return 903 904 def conclusion_handler( 905 conclusion: GithubCheckConclusion, 906 ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]: 907 linter_summary = self._console.consume_captured_output() or "Linter Success" 908 909 title = "Linter results" 910 911 return conclusion, title, linter_summary 912 913 self._update_check_handler( 914 check_name="SQLMesh - Linter", 915 status=status, 916 conclusion=conclusion, 917 status_handler=lambda status: ( 918 { 919 GithubCheckStatus.IN_PROGRESS: "Running linter", 920 GithubCheckStatus.QUEUED: "Waiting to Run linter", 921 }[status], 922 None, 923 ), 924 conclusion_handler=conclusion_handler, 925 )
927 def update_test_check( 928 self, 929 status: GithubCheckStatus, 930 conclusion: t.Optional[GithubCheckConclusion] = None, 931 result: t.Optional[ModelTextTestResult] = None, 932 traceback: t.Optional[str] = None, 933 ) -> None: 934 """ 935 Updates the status of tests for code in the PR 936 """ 937 938 def conclusion_handler( 939 conclusion: GithubCheckConclusion, 940 result: t.Optional[ModelTextTestResult], 941 ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]: 942 if result: 943 # Clear out console 944 self._console.consume_captured_output() 945 self._console.log_test_results( 946 result, 947 self._context.test_connection_config._engine_adapter.DIALECT, 948 ) 949 test_summary = self._console.consume_captured_output() 950 test_title = "Tests Passed" if result.wasSuccessful() else "Tests Failed" 951 test_conclusion = ( 952 GithubCheckConclusion.SUCCESS 953 if result.wasSuccessful() 954 else GithubCheckConclusion.FAILURE 955 ) 956 return test_conclusion, test_title, test_summary 957 if traceback: 958 self._console._print(traceback) 959 960 test_title = "Skipped Tests" if conclusion.is_skipped else "Tests Failed" 961 return conclusion, test_title, traceback 962 963 self._update_check_handler( 964 check_name="SQLMesh - Run Unit Tests", 965 status=status, 966 conclusion=conclusion, 967 status_handler=lambda status: ( 968 { 969 GithubCheckStatus.IN_PROGRESS: "Running Tests", 970 GithubCheckStatus.QUEUED: "Waiting to Run Tests", 971 }[status], 972 None, 973 ), 974 conclusion_handler=functools.partial(conclusion_handler, result=result), 975 )
Updates the status of tests for code in the PR
977 def update_required_approval_check( 978 self, status: GithubCheckStatus, conclusion: t.Optional[GithubCheckConclusion] = None 979 ) -> None: 980 """ 981 Updates the status of the merge commit for the required approval. 982 """ 983 984 def conclusion_handler( 985 conclusion: GithubCheckConclusion, 986 ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]: 987 test_summary = "**List of possible required approvers:**\n" 988 for user in self._required_approvers: 989 test_summary += f"- `{user.github_username or user.username}`\n" 990 991 title = ( 992 f"Obtained approval from required approvers: {', '.join([user.github_username or user.username for user in self._required_approvers_with_approval])}" 993 if conclusion.is_success 994 else "Need a Required Approval" 995 ) 996 return conclusion, title, test_summary 997 998 # If we get a skip that means required approvers is not configured therefore it does not need to be displayed 999 if conclusion and conclusion.is_skipped: 1000 return 1001 1002 self._update_check_handler( 1003 check_name="SQLMesh - Has Required Approval", 1004 status=status, 1005 conclusion=conclusion, 1006 status_handler=lambda status: ( 1007 { 1008 GithubCheckStatus.IN_PROGRESS: "Checking if we have required Approvers", 1009 GithubCheckStatus.QUEUED: "Waiting to Check if we have required Approvers", 1010 }[status], 1011 None, 1012 ), 1013 conclusion_handler=conclusion_handler, 1014 )
Updates the status of the merge commit for the required approval.
1016 def update_pr_environment_check( 1017 self, status: GithubCheckStatus, exception: t.Optional[Exception] = None 1018 ) -> t.Optional[GithubCheckConclusion]: 1019 """ 1020 Updates the status of the merge commit for the PR environment. 1021 """ 1022 conclusion: t.Optional[GithubCheckConclusion] = None 1023 if isinstance(exception, (NoChangesPlanError, TestFailure, LinterError)): 1024 conclusion = GithubCheckConclusion.SKIPPED 1025 elif isinstance(exception, UncategorizedPlanError): 1026 conclusion = GithubCheckConclusion.ACTION_REQUIRED 1027 elif exception: 1028 conclusion = GithubCheckConclusion.FAILURE 1029 elif status.is_completed: 1030 conclusion = GithubCheckConclusion.SUCCESS 1031 1032 check_title_static = "PR Virtual Data Environment: " 1033 check_title = check_title_static + self.pr_environment_name 1034 1035 def conclusion_handler( 1036 conclusion: GithubCheckConclusion, exception: t.Optional[Exception] 1037 ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]: 1038 summary = self.get_pr_environment_summary(conclusion, exception) 1039 self._append_output("pr_environment_name", self.pr_environment_name) 1040 return conclusion, check_title, summary 1041 1042 self._update_check_handler( 1043 check_name="SQLMesh - PR Environment Synced", 1044 status=status, 1045 conclusion=conclusion, 1046 status_handler=lambda status: ( 1047 check_title, 1048 { 1049 GithubCheckStatus.QUEUED: f":pause_button: Waiting to create or update PR Environment `{self.pr_environment_name}`", 1050 GithubCheckStatus.IN_PROGRESS: f":rocket: Creating or Updating PR Environment `{self.pr_environment_name}`", 1051 }[status], 1052 ), 1053 conclusion_handler=functools.partial(conclusion_handler, exception=exception), 1054 ) 1055 return conclusion
Updates the status of the merge commit for the PR environment.
1057 def update_prod_plan_preview_check( 1058 self, 1059 status: GithubCheckStatus, 1060 conclusion: t.Optional[GithubCheckConclusion] = None, 1061 summary: t.Optional[str] = None, 1062 ) -> None: 1063 """ 1064 Updates the status of the merge commit for the prod plan preview. 1065 """ 1066 1067 def conclusion_handler( 1068 conclusion: GithubCheckConclusion, summary: t.Optional[str] = None 1069 ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]: 1070 conclusion_to_title = { 1071 GithubCheckConclusion.SUCCESS: "Prod Plan Preview", 1072 GithubCheckConclusion.CANCELLED: "Cancelled generating prod plan preview", 1073 GithubCheckConclusion.SKIPPED: "Skipped generating prod plan preview since PR was not synchronized", 1074 GithubCheckConclusion.FAILURE: "Failed to generate prod plan preview", 1075 } 1076 title = conclusion_to_title.get( 1077 conclusion, f"Got an unexpected conclusion: {conclusion.value}" 1078 ) 1079 if conclusion == GithubCheckConclusion.SUCCESS and summary: 1080 summary = ( 1081 f"This is a preview that shows the differences between this PR environment `{self.pr_environment_name}` and `prod`.\n\n" 1082 "These are the changes that would be deployed.\n\n" 1083 ) + summary 1084 1085 return conclusion, title, summary 1086 1087 self._update_check_handler( 1088 check_name="SQLMesh - Prod Plan Preview", 1089 status=status, 1090 conclusion=conclusion, 1091 status_handler=lambda status: ( 1092 { 1093 GithubCheckStatus.IN_PROGRESS: "Generating Prod Plan", 1094 GithubCheckStatus.QUEUED: "Waiting to Generate Prod Plan", 1095 }[status], 1096 None, 1097 ), 1098 conclusion_handler=functools.partial(conclusion_handler, summary=summary), 1099 )
Updates the status of the merge commit for the prod plan preview.
1101 def update_prod_environment_check( 1102 self, 1103 status: GithubCheckStatus, 1104 conclusion: t.Optional[GithubCheckConclusion] = None, 1105 skip_reason: t.Optional[str] = None, 1106 plan_error: t.Optional[PlanError] = None, 1107 ) -> None: 1108 """ 1109 Updates the status of the merge commit for the prod environment. 1110 """ 1111 1112 def conclusion_handler( 1113 conclusion: GithubCheckConclusion, skip_reason: t.Optional[str] = None 1114 ) -> t.Tuple[GithubCheckConclusion, str, t.Optional[str]]: 1115 conclusion_to_title = { 1116 GithubCheckConclusion.SUCCESS: "Deployed to Prod", 1117 GithubCheckConclusion.CANCELLED: "Cancelled deploying to prod", 1118 GithubCheckConclusion.SKIPPED: "Skipped deployment", 1119 GithubCheckConclusion.FAILURE: "Failed to deploy to prod", 1120 GithubCheckConclusion.ACTION_REQUIRED: "Failed due to error applying plan", 1121 } 1122 title = ( 1123 conclusion_to_title.get(conclusion) 1124 or f"Got an unexpected conclusion: {conclusion.value}" 1125 ) 1126 if conclusion.is_skipped: 1127 summary = skip_reason 1128 elif conclusion.is_failure: 1129 captured_errors = self._console.consume_captured_errors() 1130 summary = ( 1131 captured_errors or f"{title}\n\n**Error:**\n```\n{traceback.format_exc()}\n```" 1132 ) 1133 elif conclusion.is_action_required: 1134 if plan_error: 1135 summary = f"**Plan error:**\n```\n{plan_error}\n```" 1136 else: 1137 summary = "Got an action required conclusion but no plan error was provided. This is unexpected." 1138 else: 1139 summary = "**Generated Prod Plan**\n" + self.get_plan_summary(self.prod_plan) 1140 1141 return conclusion, title, summary 1142 1143 self._update_check_handler( 1144 check_name="SQLMesh - Prod Environment Synced", 1145 status=status, 1146 conclusion=conclusion, 1147 status_handler=lambda status: ( 1148 { 1149 GithubCheckStatus.IN_PROGRESS: "Deploying to Prod", 1150 GithubCheckStatus.QUEUED: "Waiting to see if we can deploy to prod", 1151 }[status], 1152 None, 1153 ), 1154 conclusion_handler=functools.partial(conclusion_handler, skip_reason=skip_reason), 1155 )
Updates the status of the merge commit for the prod environment.
1157 def try_merge_pr(self) -> None: 1158 """ 1159 Merges the PR using the merge method defined in the bot config. If one is not defined then a merge is not 1160 performed 1161 """ 1162 if self.bot_config.merge_method: 1163 logger.debug(f"Merging PR with merge method: {self.bot_config.merge_method.value}") 1164 self._pull_request.merge(merge_method=self.bot_config.merge_method.value) 1165 else: 1166 logger.debug("No merge method defined so skipping merge")
Merges the PR using the merge method defined in the bot config. If one is not defined then a merge is not performed
1168 def get_command_from_comment(self) -> BotCommand: 1169 """ 1170 Gets the command from the comment 1171 """ 1172 if not self._event.is_comment_added: 1173 logger.debug("Event is not a comment so returning invalid") 1174 return BotCommand.INVALID 1175 if self._event.pull_request_comment_body is None: 1176 raise CICDBotError("Unable to get comment body") 1177 logger.debug(f"Getting command from comment body: {self._event.pull_request_comment_body}") 1178 return BotCommand.from_comment_body( 1179 self._event.pull_request_comment_body, self.bot_config.command_namespace 1180 )
Gets the command from the comment
1280@dataclass 1281class SnapshotSummaryRecord: 1282 snapshot_id: SnapshotId 1283 plan: Plan 1284 1285 @property 1286 def snapshot(self) -> Snapshot: 1287 if self.is_removed: 1288 raise ValueError("Removed snapshots only have SnapshotTableInfo available") 1289 return self.plan.snapshots[self.snapshot_id] 1290 1291 @cached_property 1292 def snapshot_table_info(self) -> SnapshotTableInfo: 1293 if self.is_removed: 1294 return self.plan.modified_snapshots[self.snapshot_id].table_info 1295 return self.plan.snapshots[self.snapshot_id].table_info 1296 1297 @property 1298 def display_name(self) -> str: 1299 dialect = None if self.is_removed else self.snapshot.node.dialect 1300 return self.snapshot_table_info.display_name( 1301 self.plan.environment_naming_info, default_catalog=None, dialect=dialect 1302 ) 1303 1304 @property 1305 def change_category(self) -> str: 1306 if self.is_removed: 1307 return SNAPSHOT_CHANGE_CATEGORY_STR[SnapshotChangeCategory.BREAKING] 1308 1309 if change_category := self.snapshot.change_category: 1310 return SNAPSHOT_CHANGE_CATEGORY_STR[change_category] 1311 1312 return "Uncategorized" 1313 1314 @property 1315 def is_added(self) -> bool: 1316 return self.snapshot_id in self.plan.context_diff.added 1317 1318 @property 1319 def is_removed(self) -> bool: 1320 return self.snapshot_id in self.plan.context_diff.removed_snapshots 1321 1322 @property 1323 def is_dev_preview(self) -> bool: 1324 return not self.plan.deployability_index.is_deployable(self.snapshot_id) 1325 1326 @property 1327 def is_directly_modified(self) -> bool: 1328 return self.plan.context_diff.directly_modified(self.snapshot_table_info.name) 1329 1330 @property 1331 def is_indirectly_modified(self) -> bool: 1332 return self.plan.context_diff.indirectly_modified(self.snapshot_table_info.name) 1333 1334 @property 1335 def is_modified(self) -> bool: 1336 return self.is_directly_modified or self.is_indirectly_modified 1337 1338 @property 1339 def is_metadata_updated(self) -> bool: 1340 return self.plan.context_diff.metadata_updated(self.snapshot_table_info.name) 1341 1342 @property 1343 def is_incremental(self) -> bool: 1344 return self.snapshot_table_info.is_incremental 1345 1346 @property 1347 def modification_type(self) -> str: 1348 if self.is_directly_modified: 1349 return "Directly modified" 1350 if self.is_indirectly_modified: 1351 return "Indirectly modified" 1352 if self.is_metadata_updated: 1353 return "Metadata updated" 1354 1355 return "Unknown" 1356 1357 @property 1358 def loaded_intervals(self) -> SnapshotIntervals: 1359 if self.is_removed: 1360 raise ValueError("Removed snapshots dont have loaded intervals available") 1361 1362 return SnapshotIntervals( 1363 snapshot_id=self.snapshot_id, 1364 intervals=( 1365 self.snapshot.dev_intervals 1366 if self.snapshot.is_forward_only 1367 else self.snapshot.intervals 1368 ), 1369 ) 1370 1371 @property 1372 def loaded_intervals_rendered(self) -> str: 1373 if self.is_removed: 1374 return "REMOVED" 1375 1376 return self._format_intervals(self.loaded_intervals) 1377 1378 @property 1379 def missing_intervals(self) -> t.Optional[SnapshotIntervals]: 1380 return next( 1381 (si for si in self.plan.missing_intervals if si.snapshot_id == self.snapshot_id), 1382 None, 1383 ) 1384 1385 @property 1386 def missing_intervals_formatted(self) -> str: 1387 if not self.is_removed and (intervals := self.missing_intervals): 1388 return self._format_intervals(intervals) 1389 1390 return "N/A" 1391 1392 @property 1393 def as_markdown_list_item(self) -> str: 1394 if self.is_removed: 1395 return f"- `{self.display_name}` ({self.change_category})" 1396 1397 how_applied = "" 1398 1399 if not self.is_incremental: 1400 from sqlmesh.core.console import _format_missing_intervals 1401 1402 # note: this is to re-use the '[recreate view]' and '[full refresh]' text and keep it in sync with updates to the CLI 1403 # it doesnt actually use the passed intervals, those are handled differently 1404 how_applied = _format_missing_intervals(self.snapshot, self.loaded_intervals) 1405 1406 how_applied_str = f" [{how_applied}]" if how_applied else "" 1407 1408 item = f"- `{self.display_name}` ({self.change_category})\n" 1409 1410 if self.snapshot_table_info.model_kind_name: 1411 item += f" **Kind:** {self.snapshot_table_info.model_kind_name}{how_applied_str}\n" 1412 1413 if self.is_incremental: 1414 # in-depth interval info is only relevant for incremental models 1415 item += f" **Dates loaded in PR:** [{self.loaded_intervals_rendered}]\n" 1416 if self.missing_intervals: 1417 item += f" **Dates *not* loaded in PR:** [{self.missing_intervals_formatted}]\n" 1418 1419 return item 1420 1421 def _format_intervals(self, intervals: SnapshotIntervals) -> str: 1422 preview_modifier = " (**preview**)" if self.is_dev_preview else "" 1423 return f"{intervals.format_intervals(self.snapshot.node.interval_unit)}{preview_modifier}"
1304 @property 1305 def change_category(self) -> str: 1306 if self.is_removed: 1307 return SNAPSHOT_CHANGE_CATEGORY_STR[SnapshotChangeCategory.BREAKING] 1308 1309 if change_category := self.snapshot.change_category: 1310 return SNAPSHOT_CHANGE_CATEGORY_STR[change_category] 1311 1312 return "Uncategorized"
1357 @property 1358 def loaded_intervals(self) -> SnapshotIntervals: 1359 if self.is_removed: 1360 raise ValueError("Removed snapshots dont have loaded intervals available") 1361 1362 return SnapshotIntervals( 1363 snapshot_id=self.snapshot_id, 1364 intervals=( 1365 self.snapshot.dev_intervals 1366 if self.snapshot.is_forward_only 1367 else self.snapshot.intervals 1368 ), 1369 )
1392 @property 1393 def as_markdown_list_item(self) -> str: 1394 if self.is_removed: 1395 return f"- `{self.display_name}` ({self.change_category})" 1396 1397 how_applied = "" 1398 1399 if not self.is_incremental: 1400 from sqlmesh.core.console import _format_missing_intervals 1401 1402 # note: this is to re-use the '[recreate view]' and '[full refresh]' text and keep it in sync with updates to the CLI 1403 # it doesnt actually use the passed intervals, those are handled differently 1404 how_applied = _format_missing_intervals(self.snapshot, self.loaded_intervals) 1405 1406 how_applied_str = f" [{how_applied}]" if how_applied else "" 1407 1408 item = f"- `{self.display_name}` ({self.change_category})\n" 1409 1410 if self.snapshot_table_info.model_kind_name: 1411 item += f" **Kind:** {self.snapshot_table_info.model_kind_name}{how_applied_str}\n" 1412 1413 if self.is_incremental: 1414 # in-depth interval info is only relevant for incremental models 1415 item += f" **Dates loaded in PR:** [{self.loaded_intervals_rendered}]\n" 1416 if self.missing_intervals: 1417 item += f" **Dates *not* loaded in PR:** [{self.missing_intervals_formatted}]\n" 1418 1419 return item