Edit on GitHub

sqlmesh.magics

   1from __future__ import annotations
   2
   3from io import StringIO
   4
   5import functools
   6import logging
   7import typing as t
   8from argparse import Namespace, SUPPRESS
   9from collections import defaultdict
  10from copy import deepcopy
  11from pathlib import Path
  12
  13from hyperscript import h
  14
  15try:
  16    from IPython.core.display import display  # type: ignore
  17except ImportError:
  18    from IPython.display import display
  19
  20from IPython.core.magic import (
  21    Magics,
  22    cell_magic,
  23    line_cell_magic,
  24    line_magic,
  25    magics_class,
  26)
  27from IPython.core.magic_arguments import argument, magic_arguments, parse_argstring
  28from IPython.utils.process import arg_split
  29from rich.jupyter import JupyterRenderable
  30from sqlmesh.cli.project_init import ProjectTemplate, init_example_project
  31from sqlmesh.core import analytics
  32from sqlmesh.core.config import load_configs
  33from sqlmesh.core.config.connection import INIT_DISPLAY_INFO_TO_TYPE
  34from sqlmesh.core.console import create_console, set_console, configure_console
  35from sqlmesh.core.context import Context
  36from sqlmesh.core.dialect import format_model_expressions, parse
  37from sqlmesh.core.model import load_sql_based_model
  38from sqlmesh.core.test import ModelTestMetadata
  39from sqlmesh.utils import yaml, Verbosity, optional_import
  40from sqlmesh.utils.errors import MagicError, MissingContextException, SQLMeshError
  41
  42logger = logging.getLogger(__name__)
  43
  44CONTEXT_VARIABLE_NAMES = [
  45    "context",
  46    "ctx",
  47    "sqlmesh",
  48]
  49
  50
  51def pass_sqlmesh_context(func: t.Callable) -> t.Callable:
  52    @functools.wraps(func)
  53    def wrapper(self: SQLMeshMagics, *args: t.Any, **kwargs: t.Any) -> None:
  54        for variable_name in CONTEXT_VARIABLE_NAMES:
  55            context = self._shell.user_ns.get(variable_name)
  56            if isinstance(context, Context):
  57                break
  58        else:
  59            raise MissingContextException(
  60                f"Context must be defined and initialized with one of these names: {', '.join(CONTEXT_VARIABLE_NAMES)}"
  61            )
  62        old_console = context.console
  63        new_console = create_console(display=self.display)
  64        context.console = new_console
  65        set_console(new_console)
  66        context.refresh()
  67
  68        magic_name = func.__name__
  69        bound_method = getattr(self, magic_name, None)
  70        if bound_method:
  71            args_split = arg_split(args[0])
  72            parser = bound_method.parser
  73
  74            original_parser_actions = deepcopy(parser._actions)
  75            original_parser_defaults = parser._defaults
  76
  77            # Temporarily supress default values, otherwise any missing arg would be set and affect analytics
  78            parser._defaults = {}
  79            for action in parser._actions:
  80                action.default = SUPPRESS
  81
  82            parsed_args, _ = parser.parse_known_args(args_split, Namespace())
  83
  84            parser._actions = original_parser_actions
  85            parser._defaults = original_parser_defaults
  86
  87            command_args = {k for k, v in parsed_args.__dict__.items() if v is not None}
  88            analytics.collector.on_magic_command(command_name=magic_name, command_args=command_args)
  89
  90        func(self, context, *args, **kwargs)
  91
  92        context.console = old_console
  93        set_console(old_console)
  94
  95    return wrapper
  96
  97
  98def format_arguments(func: t.Callable) -> t.Callable:
  99    """Decorator to add common format arguments to magic commands."""
 100    func = argument(
 101        "--normalize",
 102        action="store_true",
 103        help="Whether or not to normalize identifiers to lowercase.",
 104        default=None,
 105    )(func)
 106    func = argument(
 107        "--pad",
 108        type=int,
 109        help="Determines the pad size in a formatted string.",
 110    )(func)
 111    func = argument(
 112        "--indent",
 113        type=int,
 114        help="Determines the indentation size in a formatted string.",
 115    )(func)
 116    func = argument(
 117        "--normalize-functions",
 118        type=str,
 119        help="Whether or not to normalize all function names. Possible values are: 'upper', 'lower'",
 120    )(func)
 121    func = argument(
 122        "--leading-comma",
 123        action="store_true",
 124        help="Determines whether or not the comma is leading or trailing in select expressions. Default is trailing.",
 125        default=None,
 126    )(func)
 127    func = argument(
 128        "--max-text-width",
 129        type=int,
 130        help="The max number of characters in a segment before creating new lines in pretty mode.",
 131    )(func)
 132    return func
 133
 134
 135@magics_class
 136class SQLMeshMagics(Magics):
 137    @property
 138    def display(self) -> t.Callable:
 139        from sqlmesh import RuntimeEnv
 140
 141        if RuntimeEnv.get().is_databricks:
 142            # Use Databricks' special display instead of the normal IPython display
 143            return self._shell.user_ns["display"]
 144        return display
 145
 146    @property
 147    def _shell(self) -> t.Any:
 148        # Make mypy happy.
 149        if not self.shell:
 150            raise RuntimeError("IPython Magics are in invalid state")
 151        return self.shell
 152
 153    @magic_arguments()
 154    @argument(
 155        "paths",
 156        type=str,
 157        nargs="+",
 158        default="",
 159        help="The path(s) to the SQLMesh project(s).",
 160    )
 161    @argument(
 162        "--config",
 163        type=str,
 164        help="Name of the config object. Only applicable to configuration defined using Python script.",
 165    )
 166    @argument("--gateway", type=str, help="The name of the gateway.")
 167    @argument("--ignore-warnings", action="store_true", help="Ignore warnings.")
 168    @argument("--debug", action="store_true", help="Enable debug mode.")
 169    @argument("--log-file-dir", type=str, help="The directory to write the log file to.")
 170    @argument(
 171        "--dotenv", type=str, help="Path to a custom .env file to load environment variables from."
 172    )
 173    @line_magic
 174    def context(self, line: str) -> None:
 175        """Sets the context in the user namespace."""
 176        from sqlmesh import configure_logging, remove_excess_logs
 177
 178        args = parse_argstring(self.context, line)
 179        log_file_dir = args.log_file_dir
 180
 181        configure_logging(
 182            args.debug,
 183            log_file_dir=log_file_dir,
 184            ignore_warnings=args.ignore_warnings,
 185        )
 186        configure_console(ignore_warnings=args.ignore_warnings)
 187
 188        dotenv_path = Path(args.dotenv) if args.dotenv else None
 189        configs = load_configs(
 190            args.config, Context.CONFIG_TYPE, args.paths, dotenv_path=dotenv_path
 191        )
 192        log_limit = list(configs.values())[0].log_limit
 193
 194        remove_excess_logs(log_file_dir, log_limit)
 195
 196        try:
 197            context = Context(paths=args.paths, config=configs, gateway=args.gateway)
 198            self._shell.user_ns["context"] = context
 199        except Exception:
 200            if args.debug:
 201                logger.exception("Failed to initialize SQLMesh context")
 202            raise
 203
 204        context.console.log_success(f"SQLMesh project context set to: {', '.join(args.paths)}")
 205
 206    @magic_arguments()
 207    @argument("path", type=str, help="The path where the new SQLMesh project should be created.")
 208    @argument(
 209        "engine",
 210        type=str,
 211        help=f"Project SQL engine. Supported values: '{', '.join([info[1] for info in sorted(INIT_DISPLAY_INFO_TO_TYPE.values(), key=lambda x: x[0])])}'.",  # type: ignore
 212    )
 213    @argument(
 214        "--template",
 215        "-t",
 216        type=str,
 217        help="Project template. Supported values: dbt, default, empty.",
 218    )
 219    @argument(
 220        "--dlt-pipeline",
 221        type=str,
 222        help="DLT pipeline for which to generate a SQLMesh project. Use alongside template: dlt",
 223    )
 224    @argument(
 225        "--dlt-path",
 226        type=str,
 227        help="The directory where the DLT pipeline resides. Use alongside template: dlt",
 228    )
 229    @line_magic
 230    def init(self, line: str) -> None:
 231        """Creates a SQLMesh project scaffold with a default SQL dialect."""
 232        args = parse_argstring(self.init, line)
 233        try:
 234            project_template = ProjectTemplate(
 235                args.template.lower() if args.template else "default"
 236            )
 237        except ValueError:
 238            raise MagicError(f"Invalid project template '{args.template}'")
 239        init_example_project(
 240            path=args.path,
 241            engine_type=args.engine,
 242            dialect=None,
 243            template=project_template,
 244            pipeline=args.dlt_pipeline,
 245            dlt_path=args.dlt_path,
 246        )
 247        html = str(
 248            h(
 249                "div",
 250                h(
 251                    "span",
 252                    {"style": {"color": "green", "font-weight": "bold"}},
 253                    "SQLMesh project scaffold created",
 254                ),
 255            )
 256        )
 257        self.display(JupyterRenderable(html=html, text=""))
 258
 259    @magic_arguments()
 260    @argument("model", type=str, help="The model.")
 261    @argument("--start", "-s", type=str, help="Start date to render.")
 262    @argument("--end", "-e", type=str, help="End date to render.")
 263    @argument("--execution-time", type=str, help="Execution time.")
 264    @argument("--dialect", "-d", type=str, help="The rendered dialect.")
 265    @line_cell_magic
 266    @pass_sqlmesh_context
 267    def model(self, context: Context, line: str, sql: t.Optional[str] = None) -> None:
 268        """Renders the model and automatically fills in an editable cell with the model definition."""
 269        args = parse_argstring(self.model, line)
 270
 271        model = context.get_model(args.model, raise_if_missing=True)
 272        config = context.config_for_node(model)
 273
 274        if sql:
 275            expressions = parse(sql, default_dialect=config.dialect)
 276            loaded = load_sql_based_model(
 277                expressions,
 278                macros=context._macros,
 279                jinja_macros=context._jinja_macros,
 280                path=model._path,
 281                dialect=config.dialect,
 282                time_column_format=config.time_column_format,
 283                physical_schema_mapping=context.config.physical_schema_mapping,
 284                default_catalog=context.default_catalog,
 285            )
 286
 287            if loaded.name == args.model:
 288                model = loaded
 289        else:
 290            if model._path:
 291                with open(model._path, "r", encoding="utf-8") as file:
 292                    expressions = parse(file.read(), default_dialect=config.dialect)
 293
 294        formatted = format_model_expressions(
 295            expressions,
 296            model.dialect,
 297            rewrite_casts=not config.format.no_rewrite_casts,
 298            **config.format.generator_options,
 299        )
 300
 301        self._shell.set_next_input(
 302            "\n".join(
 303                [
 304                    " ".join(["%%model", line]),
 305                    formatted,
 306                ]
 307            ),
 308            replace=True,
 309        )
 310
 311        if model._path:
 312            with open(model._path, "w", encoding="utf-8") as file:
 313                file.write(formatted)
 314
 315        if sql:
 316            context.console.log_success(f"Model `{args.model}` updated")
 317
 318        context.upsert_model(model)
 319        context.console.show_sql(
 320            context.render(
 321                model.name,
 322                start=args.start,
 323                end=args.end,
 324                execution_time=args.execution_time,
 325            ).sql(pretty=True, dialect=args.dialect or model.dialect)
 326        )
 327
 328    @magic_arguments()
 329    @argument("model", type=str, help="The model.")
 330    @argument("test_name", type=str, nargs="?", default=None, help="The test name to display")
 331    @argument("--ls", action="store_true", help="List tests associated with a model")
 332    @line_cell_magic
 333    @pass_sqlmesh_context
 334    def test(self, context: Context, line: str, test_def_raw: t.Optional[str] = None) -> None:
 335        """Allow the user to list tests for a model, output a specific test, and then write their changes back"""
 336        args = parse_argstring(self.test, line)
 337        if not args.test_name and not args.ls:
 338            raise MagicError("Must provide either test name or `--ls` to list tests")
 339
 340        test_meta = context.select_tests()
 341
 342        tests: t.Dict[str, t.Dict[str, ModelTestMetadata]] = defaultdict(dict)
 343        for model_test_metadata in test_meta:
 344            model = model_test_metadata.body.get("model")
 345            if not model:
 346                context.console.log_error(
 347                    f"Test found that does not have `model` defined: {model_test_metadata.path}"
 348                )
 349            else:
 350                tests[model][model_test_metadata.test_name] = model_test_metadata
 351
 352        model = context.get_model(args.model, raise_if_missing=True)
 353
 354        if args.ls:
 355            # TODO: Provide better UI for displaying tests
 356            for test_name in tests[model.name]:
 357                context.console.log_status_update(test_name)
 358            return
 359
 360        test = tests[model.name][args.test_name]
 361        test_def = yaml.load(test_def_raw) if test_def_raw else test.body
 362        test_def_output = yaml.dump(test_def)
 363
 364        self._shell.set_next_input(
 365            "\n".join(
 366                [
 367                    " ".join(["%%test", line]),
 368                    test_def_output,
 369                ]
 370            ),
 371            replace=True,
 372        )
 373
 374        with open(test.path, "r+", encoding="utf-8") as file:
 375            content = yaml.load(file.read())
 376            content[args.test_name] = test_def
 377            file.seek(0)
 378            yaml.dump(content, file)
 379            file.truncate()
 380
 381    @magic_arguments()
 382    @argument(
 383        "environment",
 384        nargs="?",
 385        type=str,
 386        help="The environment to run the plan against",
 387    )
 388    @argument("--start", "-s", type=str, help="Start date to backfill.")
 389    @argument("--end", "-e", type=str, help="End date to backfill.")
 390    @argument("--execution-time", type=str, help="Execution time.")
 391    @argument(
 392        "--create-from",
 393        type=str,
 394        help="The environment to create the target environment from if it doesn't exist. Default: prod.",
 395    )
 396    @argument(
 397        "--skip-tests",
 398        "-t",
 399        action="store_true",
 400        help="Skip the unit tests defined for the model.",
 401    )
 402    @argument(
 403        "--skip-linter",
 404        action="store_true",
 405        help="Skip the linter for the model.",
 406    )
 407    @argument(
 408        "--restate-model",
 409        "-r",
 410        type=str,
 411        nargs="*",
 412        help="Restate data for specified models (and models downstream from the one specified). For production environment, all related model versions will have their intervals wiped, but only the current versions will be backfilled. For development environment, only the current model versions will be affected.",
 413    )
 414    @argument(
 415        "--no-gaps",
 416        "-g",
 417        action="store_true",
 418        help="Ensure that new snapshots have no data gaps when comparing to existing snapshots for matching models in the target environment.",
 419    )
 420    @argument(
 421        "--skip-backfill",
 422        "--dry-run",
 423        action="store_true",
 424        help="Skip the backfill step and only create a virtual update for the plan.",
 425    )
 426    @argument(
 427        "--empty-backfill",
 428        action="store_true",
 429        help="Produce empty backfill. Like --skip-backfill no models will be backfilled, unlike --skip-backfill missing intervals will be recorded as if they were backfilled.",
 430    )
 431    @argument(
 432        "--forward-only",
 433        action="store_true",
 434        help="Create a plan for forward-only changes.",
 435        default=None,
 436    )
 437    @argument(
 438        "--effective-from",
 439        type=str,
 440        help="The effective date from which to apply forward-only changes on production.",
 441    )
 442    @argument(
 443        "--no-prompts",
 444        action="store_true",
 445        help="Disables interactive prompts for the backfill time range. Please note that if this flag is set and there are uncategorized changes, plan creation will fail.",
 446        default=None,
 447    )
 448    @argument(
 449        "--auto-apply",
 450        action="store_true",
 451        help="Automatically applies the new plan after creation.",
 452        default=None,
 453    )
 454    @argument(
 455        "--no-auto-categorization",
 456        action="store_true",
 457        help="Disable automatic change categorization.",
 458        default=None,
 459    )
 460    @argument(
 461        "--include-unmodified",
 462        action="store_true",
 463        help="Include unmodified models in the target environment.",
 464        default=None,
 465    )
 466    @argument(
 467        "--select-model",
 468        type=str,
 469        nargs="*",
 470        help="Select specific model changes that should be included in the plan.",
 471    )
 472    @argument(
 473        "--backfill-model",
 474        type=str,
 475        nargs="*",
 476        help="Backfill only the models whose names match the expression.",
 477    )
 478    @argument(
 479        "--no-diff",
 480        action="store_true",
 481        help="Hide text differences for changed models.",
 482        default=None,
 483    )
 484    @argument(
 485        "--run",
 486        action="store_true",
 487        help="Run latest intervals as part of the plan application (prod environment only).",
 488    )
 489    @argument(
 490        "--ignore-cron",
 491        action="store_true",
 492        help="Run for all missing intervals, ignoring individual cron schedules. Only applies if --run is set.",
 493        default=None,
 494    )
 495    @argument(
 496        "--enable-preview",
 497        action="store_true",
 498        help="Enable preview for forward-only models when targeting a development environment.",
 499        default=None,
 500    )
 501    @argument(
 502        "--diff-rendered",
 503        action="store_true",
 504        help="Output text differences for the rendered versions of the models and standalone audits",
 505    )
 506    @argument(
 507        "--verbose",
 508        "-v",
 509        action="count",
 510        default=0,
 511        help="Verbose output. Use -vv for very verbose.",
 512    )
 513    @line_magic
 514    @pass_sqlmesh_context
 515    def plan(self, context: Context, line: str) -> None:
 516        """Goes through a set of prompts to both establish a plan and apply it"""
 517        args = parse_argstring(self.plan, line)
 518
 519        setattr(context.console, "verbosity", Verbosity(args.verbose))
 520
 521        context.plan(
 522            args.environment,
 523            start=args.start,
 524            end=args.end,
 525            execution_time=args.execution_time,
 526            create_from=args.create_from,
 527            skip_tests=args.skip_tests,
 528            restate_models=args.restate_model,
 529            backfill_models=args.backfill_model,
 530            no_gaps=args.no_gaps,
 531            skip_backfill=args.skip_backfill,
 532            empty_backfill=args.empty_backfill,
 533            forward_only=args.forward_only,
 534            no_prompts=args.no_prompts,
 535            auto_apply=args.auto_apply,
 536            no_auto_categorization=args.no_auto_categorization,
 537            effective_from=args.effective_from,
 538            include_unmodified=args.include_unmodified,
 539            select_models=args.select_model,
 540            no_diff=args.no_diff,
 541            run=args.run,
 542            ignore_cron=args.run,
 543            enable_preview=args.enable_preview,
 544            diff_rendered=args.diff_rendered,
 545        )
 546
 547    @magic_arguments()
 548    @argument(
 549        "environment",
 550        nargs="?",
 551        type=str,
 552        help="The environment to run against",
 553    )
 554    @argument("--start", "-s", type=str, help="Start date to evaluate.")
 555    @argument("--end", "-e", type=str, help="End date to evaluate.")
 556    @argument("--skip-janitor", action="store_true", help="Skip the janitor task.")
 557    @argument(
 558        "--ignore-cron",
 559        action="store_true",
 560        help="Run for all missing intervals, ignoring individual cron schedules.",
 561    )
 562    @argument(
 563        "--select-model",
 564        type=str,
 565        nargs="*",
 566        help="Select specific models to run. Note: this always includes upstream dependencies.",
 567    )
 568    @argument(
 569        "--exit-on-env-update",
 570        type=int,
 571        help="If set, the command will exit with the specified code if the run is interrupted by an update to the target environment.",
 572    )
 573    @argument(
 574        "--no-auto-upstream",
 575        action="store_true",
 576        help="Do not automatically include upstream models. Only applicable when --select-model is used. Note: this may result in missing / invalid data for the selected models.",
 577    )
 578    @line_magic
 579    @pass_sqlmesh_context
 580    def run_dag(self, context: Context, line: str) -> None:
 581        """Evaluate the DAG of models using the built-in scheduler."""
 582        args = parse_argstring(self.run_dag, line)
 583
 584        completion_status = context.run(
 585            args.environment,
 586            start=args.start,
 587            end=args.end,
 588            skip_janitor=args.skip_janitor,
 589            ignore_cron=args.ignore_cron,
 590            select_models=args.select_model,
 591            exit_on_env_update=args.exit_on_env_update,
 592            no_auto_upstream=args.no_auto_upstream,
 593        )
 594        if completion_status.is_failure:
 595            raise SQLMeshError("Error Running DAG. Check logs for details.")
 596
 597    @magic_arguments()
 598    @argument("model", type=str, help="The model.")
 599    @argument("--start", "-s", type=str, help="Start date to render.")
 600    @argument("--end", "-e", type=str, help="End date to render.")
 601    @argument("--execution-time", type=str, help="Execution time.")
 602    @argument(
 603        "--limit",
 604        type=int,
 605        help="The number of rows which the query should be limited to.",
 606    )
 607    @line_magic
 608    @pass_sqlmesh_context
 609    def evaluate(self, context: Context, line: str) -> None:
 610        """Evaluate a model query and fetches a dataframe."""
 611        context.refresh()
 612
 613        snowpark = optional_import("snowflake.snowpark")
 614        args = parse_argstring(self.evaluate, line)
 615
 616        df = context.evaluate(
 617            args.model,
 618            start=args.start,
 619            end=args.end,
 620            execution_time=args.execution_time,
 621            limit=args.limit,
 622        )
 623
 624        if snowpark and isinstance(df, snowpark.DataFrame):
 625            df = df.limit(args.limit or 100).to_pandas()
 626
 627        self.display(df)
 628
 629    @magic_arguments()
 630    @argument("model", type=str, help="The model.")
 631    @argument("--start", "-s", type=str, help="Start date to render.")
 632    @argument("--end", "-e", type=str, help="End date to render.")
 633    @argument("--execution-time", type=str, help="Execution time.")
 634    @argument(
 635        "--expand",
 636        type=t.Union[bool, t.Iterable[str]],
 637        help="Whether or not to use expand materialized models, defaults to False. If True, all referenced models are expanded as raw queries. If a list, only referenced models are expanded as raw queries.",
 638    )
 639    @argument("--dialect", type=str, help="SQL dialect to render.")
 640    @argument("--no-format", action="store_true", help="Disable fancy formatting of the query.")
 641    @format_arguments
 642    @line_magic
 643    @pass_sqlmesh_context
 644    def render(self, context: Context, line: str) -> None:
 645        """Renders a model's query, optionally expanding referenced models."""
 646        context.refresh()
 647        render_opts = vars(parse_argstring(self.render, line))
 648        model = render_opts.pop("model")
 649        dialect = render_opts.pop("dialect", None)
 650
 651        model = context.get_model(model, raise_if_missing=True)
 652
 653        query = context.render(
 654            model,
 655            start=render_opts.pop("start", None),
 656            end=render_opts.pop("end", None),
 657            execution_time=render_opts.pop("execution_time", None),
 658            expand=render_opts.pop("expand", False),
 659        )
 660
 661        no_format = render_opts.pop("no_format", False)
 662
 663        format_config = context.config_for_node(model).format
 664        format_options = {
 665            **format_config.generator_options,
 666            **{k: v for k, v in render_opts.items() if v is not None},
 667        }
 668
 669        sql = query.sql(
 670            pretty=True,
 671            dialect=context.config.dialect if dialect is None else dialect,
 672            **format_options,
 673        )
 674
 675        if no_format:
 676            context.console.log_status_update(sql)
 677        else:
 678            context.console.show_sql(sql)
 679
 680    @magic_arguments()
 681    @argument(
 682        "df_var",
 683        default=None,
 684        nargs="?",
 685        type=str,
 686        help="An optional variable name to store the resulting dataframe.",
 687    )
 688    @cell_magic
 689    @pass_sqlmesh_context
 690    def fetchdf(self, context: Context, line: str, sql: str) -> None:
 691        """Fetches a dataframe from sql, optionally storing it in a variable."""
 692        args = parse_argstring(self.fetchdf, line)
 693        df = context.fetchdf(sql)
 694        if args.df_var:
 695            self._shell.user_ns[args.df_var] = df
 696        self.display(df)
 697
 698    @magic_arguments()
 699    @argument("--file", "-f", type=str, help="An optional file path to write the HTML output to.")
 700    @argument(
 701        "--select-model",
 702        type=str,
 703        nargs="*",
 704        help="Select specific models to include in the dag.",
 705    )
 706    @line_magic
 707    @pass_sqlmesh_context
 708    def dag(self, context: Context, line: str) -> None:
 709        """Displays the HTML DAG."""
 710        args = parse_argstring(self.dag, line)
 711        dag = context.get_dag(args.select_model)
 712        if args.file:
 713            with open(args.file, "w", encoding="utf-8") as file:
 714                file.write(str(dag))
 715        # TODO: Have this go through console instead of calling display directly
 716        self.display(dag)
 717
 718    @magic_arguments()
 719    @line_magic
 720    @pass_sqlmesh_context
 721    def migrate(self, context: Context, line: str) -> None:
 722        """Migrate SQLMesh to the current running version."""
 723        context.migrate()
 724        context.console.log_success("Migration complete")
 725
 726    @magic_arguments()
 727    @argument(
 728        "--strict",
 729        action="store_true",
 730        help="Raise an error if the external model is missing in the database",
 731    )
 732    @line_magic
 733    @pass_sqlmesh_context
 734    def create_external_models(self, context: Context, line: str) -> None:
 735        """Create a schema file containing external model schemas."""
 736        args = parse_argstring(self.create_external_models, line)
 737        context.create_external_models(strict=args.strict)
 738
 739    @magic_arguments()
 740    @argument(
 741        "source_to_target",
 742        type=str,
 743        metavar="SOURCE:TARGET",
 744        help="Source and target in `SOURCE:TARGET` format",
 745    )
 746    @argument(
 747        "--on",
 748        type=str,
 749        nargs="*",
 750        help="The column to join on. Can be specified multiple times. The model grain will be used if not specified.",
 751    )
 752    @argument(
 753        "--skip-columns",
 754        type=str,
 755        nargs="*",
 756        help="The column(s) to skip when comparing the source and target table.",
 757    )
 758    @argument(
 759        "--model",
 760        type=str,
 761        help="The model to diff against when source and target are environments and not tables.",
 762    )
 763    @argument(
 764        "--where",
 765        type=str,
 766        help="An optional where statement to filter results.",
 767    )
 768    @argument(
 769        "--limit",
 770        type=int,
 771        default=20,
 772        help="The limit of the sample dataframe.",
 773    )
 774    @argument(
 775        "--show-sample",
 776        action="store_true",
 777        help="Show a sample of the rows that differ. With many columns, the output can be very wide.",
 778    )
 779    @argument(
 780        "--decimals",
 781        type=int,
 782        default=3,
 783        help="The number of decimal places to keep when comparing floating point columns. Default: 3",
 784    )
 785    @argument(
 786        "--select-model",
 787        type=str,
 788        nargs="*",
 789        help="Specify one or more models to data diff. Use wildcards to diff multiple models. Ex: '*' (all models with applied plan diffs), 'demo.model+' (this and downstream models), 'git:feature_branch' (models with direct modifications in this branch only)",
 790    )
 791    @argument(
 792        "--skip-grain-check",
 793        action="store_true",
 794        help="Disable the check for a primary key (grain) that is missing or is not unique.",
 795    )
 796    @argument(
 797        "--warn-grain-check",
 798        action="store_true",
 799        help="Warn if any selected model is missing a grain, and compute diffs for the remaining models.",
 800    )
 801    @argument(
 802        "--schema-diff-ignore-case",
 803        action="store_true",
 804        help="If set, when performing a schema diff the case of column names is ignored when matching between the two schemas. For example, 'col_a' in the source schema and 'COL_A' in the target schema will be treated as the same column.",
 805    )
 806    @line_magic
 807    @pass_sqlmesh_context
 808    def table_diff(self, context: Context, line: str) -> None:
 809        """Show the diff between two tables.
 810
 811        Can either be two tables or two environments and a model.
 812        """
 813        args = parse_argstring(self.table_diff, line)
 814        source, target = args.source_to_target.split(":")
 815        select_models = {args.model} if args.model else args.select_model or None
 816        context.table_diff(
 817            source=source,
 818            target=target,
 819            on=args.on,
 820            skip_columns=args.skip_columns,
 821            select_models=select_models,
 822            where=args.where,
 823            limit=args.limit,
 824            show_sample=args.show_sample,
 825            decimals=args.decimals,
 826            skip_grain_check=args.skip_grain_check,
 827            warn_grain_check=args.warn_grain_check,
 828            schema_diff_ignore_case=args.schema_diff_ignore_case,
 829        )
 830
 831    @magic_arguments()
 832    @argument(
 833        "model_name",
 834        nargs="?",
 835        type=str,
 836        help="The name of the model to get the table name for.",
 837    )
 838    @argument(
 839        "--environment",
 840        type=str,
 841        help="The environment to source the model version from.",
 842    )
 843    @argument(
 844        "--prod",
 845        action="store_true",
 846        help="If set, return the name of the physical table that will be used in production for the model version promoted in the target environment.",
 847    )
 848    @line_magic
 849    @pass_sqlmesh_context
 850    def table_name(self, context: Context, line: str) -> None:
 851        """Prints the name of the physical table for the given model."""
 852        args = parse_argstring(self.table_name, line)
 853        context.console.log_status_update(
 854            context.table_name(args.model_name, args.environment, args.prod)
 855        )
 856
 857    @magic_arguments()
 858    @argument(
 859        "pipeline",
 860        nargs="?",
 861        type=str,
 862        help="The dlt pipeline to attach for this SQLMesh project.",
 863    )
 864    @argument(
 865        "--table",
 866        "-t",
 867        type=str,
 868        nargs="*",
 869        help="The specific dlt tables to refresh in the SQLMesh models.",
 870    )
 871    @argument(
 872        "--force",
 873        "-f",
 874        action="store_true",
 875        help="If set, existing models are overwritten with the new DLT tables.",
 876    )
 877    @argument(
 878        "--dlt-path",
 879        type=str,
 880        help="The directory where the DLT pipeline resides.",
 881    )
 882    @line_magic
 883    @pass_sqlmesh_context
 884    def dlt_refresh(self, context: Context, line: str) -> None:
 885        """Attaches to a DLT pipeline with the option to update specific or all missing tables in the SQLMesh project."""
 886        from sqlmesh.integrations.dlt import generate_dlt_models
 887
 888        args = parse_argstring(self.dlt_refresh, line)
 889        sqlmesh_models = generate_dlt_models(
 890            context, args.pipeline, list(args.table or []), args.force, args.dlt_path
 891        )
 892        if sqlmesh_models:
 893            model_names = "\n".join([f"- {model_name}" for model_name in sqlmesh_models])
 894            context.console.log_success(f"Updated SQLMesh project with models:\n{model_names}")
 895        else:
 896            context.console.log_success("All SQLMesh models are up to date.")
 897
 898    @magic_arguments()
 899    @argument(
 900        "--read",
 901        type=str,
 902        default="",
 903        help="The input dialect of the sql string.",
 904    )
 905    @argument(
 906        "--write",
 907        type=str,
 908        default="",
 909        help="The output dialect of the sql string.",
 910    )
 911    @line_cell_magic
 912    @pass_sqlmesh_context
 913    def rewrite(self, context: Context, line: str, sql: str) -> None:
 914        """Rewrite a sql expression with semantic references into an executable query.
 915
 916        https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/
 917        """
 918        args = parse_argstring(self.rewrite, line)
 919        context.console.show_sql(
 920            context.rewrite(sql, args.read).sql(
 921                dialect=args.write or context.config.dialect, pretty=True
 922            )
 923        )
 924
 925    @magic_arguments()
 926    @argument(
 927        "--transpile",
 928        "-t",
 929        type=str,
 930        help="Transpile project models to the specified dialect.",
 931    )
 932    @argument(
 933        "--check",
 934        action="store_true",
 935        help="Whether or not to check formatting (but not actually format anything).",
 936        default=None,
 937    )
 938    @argument(
 939        "--append-newline",
 940        action="store_true",
 941        help="Include a newline at the end of the output.",
 942        default=None,
 943    )
 944    @argument(
 945        "--no-rewrite-casts",
 946        action="store_true",
 947        help="Preserve the existing casts, without rewriting them to use the :: syntax.",
 948        default=None,
 949    )
 950    @format_arguments
 951    @line_magic
 952    @pass_sqlmesh_context
 953    def format(self, context: Context, line: str) -> bool:
 954        """Format all SQL models and audits."""
 955        format_opts = vars(parse_argstring(self.format, line))
 956        if format_opts.pop("no_rewrite_casts", None):
 957            format_opts["rewrite_casts"] = False
 958
 959        return context.format(**{k: v for k, v in format_opts.items() if v is not None})
 960
 961    @magic_arguments()
 962    @argument("environment", type=str, help="The environment to diff local state against.")
 963    @line_magic
 964    @pass_sqlmesh_context
 965    def diff(self, context: Context, line: str) -> None:
 966        """Show the diff between the local state and the target environment."""
 967        args = parse_argstring(self.diff, line)
 968        context.diff(args.environment)
 969
 970    @magic_arguments()
 971    @argument("environment", type=str, help="The environment to invalidate.")
 972    @line_magic
 973    @pass_sqlmesh_context
 974    def invalidate(self, context: Context, line: str) -> None:
 975        """Invalidate the target environment, forcing its removal during the next run of the janitor process."""
 976        args = parse_argstring(self.invalidate, line)
 977        context.invalidate_environment(args.environment)
 978
 979    @magic_arguments()
 980    @argument(
 981        "--ignore-ttl",
 982        action="store_true",
 983        help="Cleanup snapshots that are not referenced in any environment, regardless of when they're set to expire",
 984    )
 985    @line_magic
 986    @pass_sqlmesh_context
 987    def janitor(self, context: Context, line: str) -> None:
 988        """Run the janitor process to clean up old environments and expired snapshots."""
 989        args = parse_argstring(self.janitor, line)
 990        context.run_janitor(ignore_ttl=args.ignore_ttl)
 991
 992    @magic_arguments()
 993    @argument("model", type=str)
 994    @argument(
 995        "--query",
 996        "-q",
 997        type=str,
 998        nargs="+",
 999        default=[],
1000        help="Queries that will be used to generate data for the model's dependencies.",
1001    )
1002    @argument(
1003        "--overwrite",
1004        "-o",
1005        action="store_true",
1006        help="When true, the fixture file will be overwritten in case it already exists.",
1007    )
1008    @argument(
1009        "--var",
1010        "-v",
1011        type=str,
1012        nargs="+",
1013        help="Key-value pairs that will define variables needed by the model.",
1014    )
1015    @argument(
1016        "--path",
1017        "-p",
1018        type=str,
1019        help="The file path corresponding to the fixture, relative to the test directory. "
1020        "By default, the fixture will be created under the test directory and the file "
1021        "name will be inferred based on the test's name.",
1022    )
1023    @argument(
1024        "--name",
1025        "-n",
1026        type=str,
1027        help="The name of the test that will be created. By default, it's inferred based on the model's name.",
1028    )
1029    @argument(
1030        "--include-ctes",
1031        action="store_true",
1032        help="When true, CTE fixtures will also be generated.",
1033    )
1034    @line_magic
1035    @pass_sqlmesh_context
1036    def create_test(self, context: Context, line: str) -> None:
1037        """Generate a unit test fixture for a given model."""
1038        args = parse_argstring(self.create_test, line)
1039        queries = iter(args.query)
1040        variables = iter(args.var) if args.var else None
1041        context.create_test(
1042            args.model,
1043            input_queries={k: v.strip('"') for k, v in dict(zip(queries, queries)).items()},
1044            overwrite=args.overwrite,
1045            variables=dict(zip(variables, variables)) if variables else None,
1046            path=args.path,
1047            name=args.name,
1048            include_ctes=args.include_ctes,
1049        )
1050
1051    @magic_arguments()
1052    @argument("tests", nargs="*", type=str)
1053    @argument(
1054        "--pattern",
1055        "-k",
1056        nargs="*",
1057        type=str,
1058        help="Only run tests that match the pattern of substring.",
1059    )
1060    @argument(
1061        "--verbose",
1062        "-v",
1063        action="count",
1064        default=0,
1065        help="Verbose output. Use -vv for very verbose.",
1066    )
1067    @argument(
1068        "--preserve-fixtures",
1069        action="store_true",
1070        help="Preserve the fixture tables in the testing database, useful for debugging.",
1071    )
1072    @line_magic
1073    @pass_sqlmesh_context
1074    def run_test(self, context: Context, line: str) -> None:
1075        """Run unit test(s)."""
1076        args = parse_argstring(self.run_test, line)
1077
1078        context.test(
1079            match_patterns=args.pattern,
1080            tests=args.tests,
1081            verbosity=Verbosity(args.verbose),
1082            preserve_fixtures=args.preserve_fixtures,
1083            stream=StringIO(),  # consume the output instead of redirecting to stdout
1084        )
1085
1086    @magic_arguments()
1087    @argument(
1088        "models", type=str, nargs="*", help="A model to audit. Multiple models can be audited."
1089    )
1090    @argument("--start", "-s", type=str, help="Start date to audit.")
1091    @argument("--end", "-e", type=str, help="End date to audit.")
1092    @argument("--execution-time", type=str, help="Execution time.")
1093    @line_magic
1094    @pass_sqlmesh_context
1095    def audit(self, context: Context, line: str) -> bool:
1096        """Run audit(s)"""
1097        args = parse_argstring(self.audit, line)
1098        return context.audit(
1099            models=args.models, start=args.start, end=args.end, execution_time=args.execution_time
1100        )
1101
1102    @magic_arguments()
1103    @argument("environment", nargs="?", type=str, help="The environment to check intervals for.")
1104    @argument(
1105        "--no-signals",
1106        action="store_true",
1107        help="Disable signal checks and only show missing intervals.",
1108        default=False,
1109    )
1110    @argument(
1111        "--select-model",
1112        type=str,
1113        nargs="*",
1114        help="Select specific model changes that should be included in the plan.",
1115    )
1116    @argument("--start", "-s", type=str, help="Start date of intervals to check for.")
1117    @argument("--end", "-e", type=str, help="End date of intervals to check for.")
1118    @line_magic
1119    @pass_sqlmesh_context
1120    def check_intervals(self, context: Context, line: str) -> None:
1121        """Show missing intervals in an environment, respecting signals."""
1122        args = parse_argstring(self.check_intervals, line)
1123
1124        context.console.show_intervals(
1125            context.check_intervals(
1126                environment=args.environment,
1127                no_signals=args.no_signals,
1128                select_models=args.select_model,
1129                start=args.start,
1130                end=args.end,
1131            )
1132        )
1133
1134    @magic_arguments()
1135    @argument(
1136        "--skip-connection",
1137        action="store_true",
1138        help="Skip the connection test.",
1139        default=False,
1140    )
1141    @argument(
1142        "--verbose",
1143        "-v",
1144        action="count",
1145        default=0,
1146        help="Verbose output. Use -vv for very verbose.",
1147    )
1148    @line_magic
1149    @pass_sqlmesh_context
1150    def info(self, context: Context, line: str) -> None:
1151        """Display SQLMesh project information."""
1152        args = parse_argstring(self.info, line)
1153        context.print_info(skip_connection=args.skip_connection, verbosity=Verbosity(args.verbose))
1154
1155    @magic_arguments()
1156    @line_magic
1157    @pass_sqlmesh_context
1158    def rollback(self, context: Context, line: str) -> None:
1159        """Rollback SQLMesh to the previous migration."""
1160        context.rollback()
1161
1162    @magic_arguments()
1163    @line_magic
1164    @pass_sqlmesh_context
1165    def clean(self, context: Context, line: str) -> None:
1166        """Clears the SQLMesh cache and any build artifacts."""
1167        context.clear_caches()
1168        context.console.log_success("SQLMesh cache and build artifacts cleared")
1169
1170    @magic_arguments()
1171    @line_magic
1172    @pass_sqlmesh_context
1173    def environments(self, context: Context, line: str) -> None:
1174        """Prints the list of SQLMesh environments with its expiry datetime."""
1175        context.print_environment_names()
1176
1177    @magic_arguments()
1178    @argument(
1179        "--models",
1180        "--model",
1181        type=str,
1182        nargs="*",
1183        help="A model to lint. Multiple models can be linted. If no models are specified, every model will be linted.",
1184    )
1185    @line_magic
1186    @pass_sqlmesh_context
1187    def lint(self, context: Context, line: str) -> None:
1188        """Run linter for target model(s)"""
1189        args = parse_argstring(self.lint, line)
1190        context.lint_models(args.models)
1191
1192    @magic_arguments()
1193    @line_magic
1194    @pass_sqlmesh_context
1195    def destroy(self, context: Context, line: str) -> None:
1196        """Removes all project resources, engine-managed objects, state tables and clears the SQLMesh cache."""
1197        context.destroy()
1198
1199
1200def register_magics() -> None:
1201    try:
1202        shell = get_ipython()  # type: ignore
1203        shell.register_magics(SQLMeshMagics)
1204    except NameError:
1205        pass
logger = <Logger sqlmesh.magics (WARNING)>
CONTEXT_VARIABLE_NAMES = ['context', 'ctx', 'sqlmesh']
def pass_sqlmesh_context(func: Callable) -> Callable:
52def pass_sqlmesh_context(func: t.Callable) -> t.Callable:
53    @functools.wraps(func)
54    def wrapper(self: SQLMeshMagics, *args: t.Any, **kwargs: t.Any) -> None:
55        for variable_name in CONTEXT_VARIABLE_NAMES:
56            context = self._shell.user_ns.get(variable_name)
57            if isinstance(context, Context):
58                break
59        else:
60            raise MissingContextException(
61                f"Context must be defined and initialized with one of these names: {', '.join(CONTEXT_VARIABLE_NAMES)}"
62            )
63        old_console = context.console
64        new_console = create_console(display=self.display)
65        context.console = new_console
66        set_console(new_console)
67        context.refresh()
68
69        magic_name = func.__name__
70        bound_method = getattr(self, magic_name, None)
71        if bound_method:
72            args_split = arg_split(args[0])
73            parser = bound_method.parser
74
75            original_parser_actions = deepcopy(parser._actions)
76            original_parser_defaults = parser._defaults
77
78            # Temporarily supress default values, otherwise any missing arg would be set and affect analytics
79            parser._defaults = {}
80            for action in parser._actions:
81                action.default = SUPPRESS
82
83            parsed_args, _ = parser.parse_known_args(args_split, Namespace())
84
85            parser._actions = original_parser_actions
86            parser._defaults = original_parser_defaults
87
88            command_args = {k for k, v in parsed_args.__dict__.items() if v is not None}
89            analytics.collector.on_magic_command(command_name=magic_name, command_args=command_args)
90
91        func(self, context, *args, **kwargs)
92
93        context.console = old_console
94        set_console(old_console)
95
96    return wrapper
def format_arguments(func: Callable) -> Callable:
 99def format_arguments(func: t.Callable) -> t.Callable:
100    """Decorator to add common format arguments to magic commands."""
101    func = argument(
102        "--normalize",
103        action="store_true",
104        help="Whether or not to normalize identifiers to lowercase.",
105        default=None,
106    )(func)
107    func = argument(
108        "--pad",
109        type=int,
110        help="Determines the pad size in a formatted string.",
111    )(func)
112    func = argument(
113        "--indent",
114        type=int,
115        help="Determines the indentation size in a formatted string.",
116    )(func)
117    func = argument(
118        "--normalize-functions",
119        type=str,
120        help="Whether or not to normalize all function names. Possible values are: 'upper', 'lower'",
121    )(func)
122    func = argument(
123        "--leading-comma",
124        action="store_true",
125        help="Determines whether or not the comma is leading or trailing in select expressions. Default is trailing.",
126        default=None,
127    )(func)
128    func = argument(
129        "--max-text-width",
130        type=int,
131        help="The max number of characters in a segment before creating new lines in pretty mode.",
132    )(func)
133    return func

Decorator to add common format arguments to magic commands.

@magics_class
class SQLMeshMagics(IPython.core.magic.Magics):
 136@magics_class
 137class SQLMeshMagics(Magics):
 138    @property
 139    def display(self) -> t.Callable:
 140        from sqlmesh import RuntimeEnv
 141
 142        if RuntimeEnv.get().is_databricks:
 143            # Use Databricks' special display instead of the normal IPython display
 144            return self._shell.user_ns["display"]
 145        return display
 146
 147    @property
 148    def _shell(self) -> t.Any:
 149        # Make mypy happy.
 150        if not self.shell:
 151            raise RuntimeError("IPython Magics are in invalid state")
 152        return self.shell
 153
 154    @magic_arguments()
 155    @argument(
 156        "paths",
 157        type=str,
 158        nargs="+",
 159        default="",
 160        help="The path(s) to the SQLMesh project(s).",
 161    )
 162    @argument(
 163        "--config",
 164        type=str,
 165        help="Name of the config object. Only applicable to configuration defined using Python script.",
 166    )
 167    @argument("--gateway", type=str, help="The name of the gateway.")
 168    @argument("--ignore-warnings", action="store_true", help="Ignore warnings.")
 169    @argument("--debug", action="store_true", help="Enable debug mode.")
 170    @argument("--log-file-dir", type=str, help="The directory to write the log file to.")
 171    @argument(
 172        "--dotenv", type=str, help="Path to a custom .env file to load environment variables from."
 173    )
 174    @line_magic
 175    def context(self, line: str) -> None:
 176        """Sets the context in the user namespace."""
 177        from sqlmesh import configure_logging, remove_excess_logs
 178
 179        args = parse_argstring(self.context, line)
 180        log_file_dir = args.log_file_dir
 181
 182        configure_logging(
 183            args.debug,
 184            log_file_dir=log_file_dir,
 185            ignore_warnings=args.ignore_warnings,
 186        )
 187        configure_console(ignore_warnings=args.ignore_warnings)
 188
 189        dotenv_path = Path(args.dotenv) if args.dotenv else None
 190        configs = load_configs(
 191            args.config, Context.CONFIG_TYPE, args.paths, dotenv_path=dotenv_path
 192        )
 193        log_limit = list(configs.values())[0].log_limit
 194
 195        remove_excess_logs(log_file_dir, log_limit)
 196
 197        try:
 198            context = Context(paths=args.paths, config=configs, gateway=args.gateway)
 199            self._shell.user_ns["context"] = context
 200        except Exception:
 201            if args.debug:
 202                logger.exception("Failed to initialize SQLMesh context")
 203            raise
 204
 205        context.console.log_success(f"SQLMesh project context set to: {', '.join(args.paths)}")
 206
 207    @magic_arguments()
 208    @argument("path", type=str, help="The path where the new SQLMesh project should be created.")
 209    @argument(
 210        "engine",
 211        type=str,
 212        help=f"Project SQL engine. Supported values: '{', '.join([info[1] for info in sorted(INIT_DISPLAY_INFO_TO_TYPE.values(), key=lambda x: x[0])])}'.",  # type: ignore
 213    )
 214    @argument(
 215        "--template",
 216        "-t",
 217        type=str,
 218        help="Project template. Supported values: dbt, default, empty.",
 219    )
 220    @argument(
 221        "--dlt-pipeline",
 222        type=str,
 223        help="DLT pipeline for which to generate a SQLMesh project. Use alongside template: dlt",
 224    )
 225    @argument(
 226        "--dlt-path",
 227        type=str,
 228        help="The directory where the DLT pipeline resides. Use alongside template: dlt",
 229    )
 230    @line_magic
 231    def init(self, line: str) -> None:
 232        """Creates a SQLMesh project scaffold with a default SQL dialect."""
 233        args = parse_argstring(self.init, line)
 234        try:
 235            project_template = ProjectTemplate(
 236                args.template.lower() if args.template else "default"
 237            )
 238        except ValueError:
 239            raise MagicError(f"Invalid project template '{args.template}'")
 240        init_example_project(
 241            path=args.path,
 242            engine_type=args.engine,
 243            dialect=None,
 244            template=project_template,
 245            pipeline=args.dlt_pipeline,
 246            dlt_path=args.dlt_path,
 247        )
 248        html = str(
 249            h(
 250                "div",
 251                h(
 252                    "span",
 253                    {"style": {"color": "green", "font-weight": "bold"}},
 254                    "SQLMesh project scaffold created",
 255                ),
 256            )
 257        )
 258        self.display(JupyterRenderable(html=html, text=""))
 259
 260    @magic_arguments()
 261    @argument("model", type=str, help="The model.")
 262    @argument("--start", "-s", type=str, help="Start date to render.")
 263    @argument("--end", "-e", type=str, help="End date to render.")
 264    @argument("--execution-time", type=str, help="Execution time.")
 265    @argument("--dialect", "-d", type=str, help="The rendered dialect.")
 266    @line_cell_magic
 267    @pass_sqlmesh_context
 268    def model(self, context: Context, line: str, sql: t.Optional[str] = None) -> None:
 269        """Renders the model and automatically fills in an editable cell with the model definition."""
 270        args = parse_argstring(self.model, line)
 271
 272        model = context.get_model(args.model, raise_if_missing=True)
 273        config = context.config_for_node(model)
 274
 275        if sql:
 276            expressions = parse(sql, default_dialect=config.dialect)
 277            loaded = load_sql_based_model(
 278                expressions,
 279                macros=context._macros,
 280                jinja_macros=context._jinja_macros,
 281                path=model._path,
 282                dialect=config.dialect,
 283                time_column_format=config.time_column_format,
 284                physical_schema_mapping=context.config.physical_schema_mapping,
 285                default_catalog=context.default_catalog,
 286            )
 287
 288            if loaded.name == args.model:
 289                model = loaded
 290        else:
 291            if model._path:
 292                with open(model._path, "r", encoding="utf-8") as file:
 293                    expressions = parse(file.read(), default_dialect=config.dialect)
 294
 295        formatted = format_model_expressions(
 296            expressions,
 297            model.dialect,
 298            rewrite_casts=not config.format.no_rewrite_casts,
 299            **config.format.generator_options,
 300        )
 301
 302        self._shell.set_next_input(
 303            "\n".join(
 304                [
 305                    " ".join(["%%model", line]),
 306                    formatted,
 307                ]
 308            ),
 309            replace=True,
 310        )
 311
 312        if model._path:
 313            with open(model._path, "w", encoding="utf-8") as file:
 314                file.write(formatted)
 315
 316        if sql:
 317            context.console.log_success(f"Model `{args.model}` updated")
 318
 319        context.upsert_model(model)
 320        context.console.show_sql(
 321            context.render(
 322                model.name,
 323                start=args.start,
 324                end=args.end,
 325                execution_time=args.execution_time,
 326            ).sql(pretty=True, dialect=args.dialect or model.dialect)
 327        )
 328
 329    @magic_arguments()
 330    @argument("model", type=str, help="The model.")
 331    @argument("test_name", type=str, nargs="?", default=None, help="The test name to display")
 332    @argument("--ls", action="store_true", help="List tests associated with a model")
 333    @line_cell_magic
 334    @pass_sqlmesh_context
 335    def test(self, context: Context, line: str, test_def_raw: t.Optional[str] = None) -> None:
 336        """Allow the user to list tests for a model, output a specific test, and then write their changes back"""
 337        args = parse_argstring(self.test, line)
 338        if not args.test_name and not args.ls:
 339            raise MagicError("Must provide either test name or `--ls` to list tests")
 340
 341        test_meta = context.select_tests()
 342
 343        tests: t.Dict[str, t.Dict[str, ModelTestMetadata]] = defaultdict(dict)
 344        for model_test_metadata in test_meta:
 345            model = model_test_metadata.body.get("model")
 346            if not model:
 347                context.console.log_error(
 348                    f"Test found that does not have `model` defined: {model_test_metadata.path}"
 349                )
 350            else:
 351                tests[model][model_test_metadata.test_name] = model_test_metadata
 352
 353        model = context.get_model(args.model, raise_if_missing=True)
 354
 355        if args.ls:
 356            # TODO: Provide better UI for displaying tests
 357            for test_name in tests[model.name]:
 358                context.console.log_status_update(test_name)
 359            return
 360
 361        test = tests[model.name][args.test_name]
 362        test_def = yaml.load(test_def_raw) if test_def_raw else test.body
 363        test_def_output = yaml.dump(test_def)
 364
 365        self._shell.set_next_input(
 366            "\n".join(
 367                [
 368                    " ".join(["%%test", line]),
 369                    test_def_output,
 370                ]
 371            ),
 372            replace=True,
 373        )
 374
 375        with open(test.path, "r+", encoding="utf-8") as file:
 376            content = yaml.load(file.read())
 377            content[args.test_name] = test_def
 378            file.seek(0)
 379            yaml.dump(content, file)
 380            file.truncate()
 381
 382    @magic_arguments()
 383    @argument(
 384        "environment",
 385        nargs="?",
 386        type=str,
 387        help="The environment to run the plan against",
 388    )
 389    @argument("--start", "-s", type=str, help="Start date to backfill.")
 390    @argument("--end", "-e", type=str, help="End date to backfill.")
 391    @argument("--execution-time", type=str, help="Execution time.")
 392    @argument(
 393        "--create-from",
 394        type=str,
 395        help="The environment to create the target environment from if it doesn't exist. Default: prod.",
 396    )
 397    @argument(
 398        "--skip-tests",
 399        "-t",
 400        action="store_true",
 401        help="Skip the unit tests defined for the model.",
 402    )
 403    @argument(
 404        "--skip-linter",
 405        action="store_true",
 406        help="Skip the linter for the model.",
 407    )
 408    @argument(
 409        "--restate-model",
 410        "-r",
 411        type=str,
 412        nargs="*",
 413        help="Restate data for specified models (and models downstream from the one specified). For production environment, all related model versions will have their intervals wiped, but only the current versions will be backfilled. For development environment, only the current model versions will be affected.",
 414    )
 415    @argument(
 416        "--no-gaps",
 417        "-g",
 418        action="store_true",
 419        help="Ensure that new snapshots have no data gaps when comparing to existing snapshots for matching models in the target environment.",
 420    )
 421    @argument(
 422        "--skip-backfill",
 423        "--dry-run",
 424        action="store_true",
 425        help="Skip the backfill step and only create a virtual update for the plan.",
 426    )
 427    @argument(
 428        "--empty-backfill",
 429        action="store_true",
 430        help="Produce empty backfill. Like --skip-backfill no models will be backfilled, unlike --skip-backfill missing intervals will be recorded as if they were backfilled.",
 431    )
 432    @argument(
 433        "--forward-only",
 434        action="store_true",
 435        help="Create a plan for forward-only changes.",
 436        default=None,
 437    )
 438    @argument(
 439        "--effective-from",
 440        type=str,
 441        help="The effective date from which to apply forward-only changes on production.",
 442    )
 443    @argument(
 444        "--no-prompts",
 445        action="store_true",
 446        help="Disables interactive prompts for the backfill time range. Please note that if this flag is set and there are uncategorized changes, plan creation will fail.",
 447        default=None,
 448    )
 449    @argument(
 450        "--auto-apply",
 451        action="store_true",
 452        help="Automatically applies the new plan after creation.",
 453        default=None,
 454    )
 455    @argument(
 456        "--no-auto-categorization",
 457        action="store_true",
 458        help="Disable automatic change categorization.",
 459        default=None,
 460    )
 461    @argument(
 462        "--include-unmodified",
 463        action="store_true",
 464        help="Include unmodified models in the target environment.",
 465        default=None,
 466    )
 467    @argument(
 468        "--select-model",
 469        type=str,
 470        nargs="*",
 471        help="Select specific model changes that should be included in the plan.",
 472    )
 473    @argument(
 474        "--backfill-model",
 475        type=str,
 476        nargs="*",
 477        help="Backfill only the models whose names match the expression.",
 478    )
 479    @argument(
 480        "--no-diff",
 481        action="store_true",
 482        help="Hide text differences for changed models.",
 483        default=None,
 484    )
 485    @argument(
 486        "--run",
 487        action="store_true",
 488        help="Run latest intervals as part of the plan application (prod environment only).",
 489    )
 490    @argument(
 491        "--ignore-cron",
 492        action="store_true",
 493        help="Run for all missing intervals, ignoring individual cron schedules. Only applies if --run is set.",
 494        default=None,
 495    )
 496    @argument(
 497        "--enable-preview",
 498        action="store_true",
 499        help="Enable preview for forward-only models when targeting a development environment.",
 500        default=None,
 501    )
 502    @argument(
 503        "--diff-rendered",
 504        action="store_true",
 505        help="Output text differences for the rendered versions of the models and standalone audits",
 506    )
 507    @argument(
 508        "--verbose",
 509        "-v",
 510        action="count",
 511        default=0,
 512        help="Verbose output. Use -vv for very verbose.",
 513    )
 514    @line_magic
 515    @pass_sqlmesh_context
 516    def plan(self, context: Context, line: str) -> None:
 517        """Goes through a set of prompts to both establish a plan and apply it"""
 518        args = parse_argstring(self.plan, line)
 519
 520        setattr(context.console, "verbosity", Verbosity(args.verbose))
 521
 522        context.plan(
 523            args.environment,
 524            start=args.start,
 525            end=args.end,
 526            execution_time=args.execution_time,
 527            create_from=args.create_from,
 528            skip_tests=args.skip_tests,
 529            restate_models=args.restate_model,
 530            backfill_models=args.backfill_model,
 531            no_gaps=args.no_gaps,
 532            skip_backfill=args.skip_backfill,
 533            empty_backfill=args.empty_backfill,
 534            forward_only=args.forward_only,
 535            no_prompts=args.no_prompts,
 536            auto_apply=args.auto_apply,
 537            no_auto_categorization=args.no_auto_categorization,
 538            effective_from=args.effective_from,
 539            include_unmodified=args.include_unmodified,
 540            select_models=args.select_model,
 541            no_diff=args.no_diff,
 542            run=args.run,
 543            ignore_cron=args.run,
 544            enable_preview=args.enable_preview,
 545            diff_rendered=args.diff_rendered,
 546        )
 547
 548    @magic_arguments()
 549    @argument(
 550        "environment",
 551        nargs="?",
 552        type=str,
 553        help="The environment to run against",
 554    )
 555    @argument("--start", "-s", type=str, help="Start date to evaluate.")
 556    @argument("--end", "-e", type=str, help="End date to evaluate.")
 557    @argument("--skip-janitor", action="store_true", help="Skip the janitor task.")
 558    @argument(
 559        "--ignore-cron",
 560        action="store_true",
 561        help="Run for all missing intervals, ignoring individual cron schedules.",
 562    )
 563    @argument(
 564        "--select-model",
 565        type=str,
 566        nargs="*",
 567        help="Select specific models to run. Note: this always includes upstream dependencies.",
 568    )
 569    @argument(
 570        "--exit-on-env-update",
 571        type=int,
 572        help="If set, the command will exit with the specified code if the run is interrupted by an update to the target environment.",
 573    )
 574    @argument(
 575        "--no-auto-upstream",
 576        action="store_true",
 577        help="Do not automatically include upstream models. Only applicable when --select-model is used. Note: this may result in missing / invalid data for the selected models.",
 578    )
 579    @line_magic
 580    @pass_sqlmesh_context
 581    def run_dag(self, context: Context, line: str) -> None:
 582        """Evaluate the DAG of models using the built-in scheduler."""
 583        args = parse_argstring(self.run_dag, line)
 584
 585        completion_status = context.run(
 586            args.environment,
 587            start=args.start,
 588            end=args.end,
 589            skip_janitor=args.skip_janitor,
 590            ignore_cron=args.ignore_cron,
 591            select_models=args.select_model,
 592            exit_on_env_update=args.exit_on_env_update,
 593            no_auto_upstream=args.no_auto_upstream,
 594        )
 595        if completion_status.is_failure:
 596            raise SQLMeshError("Error Running DAG. Check logs for details.")
 597
 598    @magic_arguments()
 599    @argument("model", type=str, help="The model.")
 600    @argument("--start", "-s", type=str, help="Start date to render.")
 601    @argument("--end", "-e", type=str, help="End date to render.")
 602    @argument("--execution-time", type=str, help="Execution time.")
 603    @argument(
 604        "--limit",
 605        type=int,
 606        help="The number of rows which the query should be limited to.",
 607    )
 608    @line_magic
 609    @pass_sqlmesh_context
 610    def evaluate(self, context: Context, line: str) -> None:
 611        """Evaluate a model query and fetches a dataframe."""
 612        context.refresh()
 613
 614        snowpark = optional_import("snowflake.snowpark")
 615        args = parse_argstring(self.evaluate, line)
 616
 617        df = context.evaluate(
 618            args.model,
 619            start=args.start,
 620            end=args.end,
 621            execution_time=args.execution_time,
 622            limit=args.limit,
 623        )
 624
 625        if snowpark and isinstance(df, snowpark.DataFrame):
 626            df = df.limit(args.limit or 100).to_pandas()
 627
 628        self.display(df)
 629
 630    @magic_arguments()
 631    @argument("model", type=str, help="The model.")
 632    @argument("--start", "-s", type=str, help="Start date to render.")
 633    @argument("--end", "-e", type=str, help="End date to render.")
 634    @argument("--execution-time", type=str, help="Execution time.")
 635    @argument(
 636        "--expand",
 637        type=t.Union[bool, t.Iterable[str]],
 638        help="Whether or not to use expand materialized models, defaults to False. If True, all referenced models are expanded as raw queries. If a list, only referenced models are expanded as raw queries.",
 639    )
 640    @argument("--dialect", type=str, help="SQL dialect to render.")
 641    @argument("--no-format", action="store_true", help="Disable fancy formatting of the query.")
 642    @format_arguments
 643    @line_magic
 644    @pass_sqlmesh_context
 645    def render(self, context: Context, line: str) -> None:
 646        """Renders a model's query, optionally expanding referenced models."""
 647        context.refresh()
 648        render_opts = vars(parse_argstring(self.render, line))
 649        model = render_opts.pop("model")
 650        dialect = render_opts.pop("dialect", None)
 651
 652        model = context.get_model(model, raise_if_missing=True)
 653
 654        query = context.render(
 655            model,
 656            start=render_opts.pop("start", None),
 657            end=render_opts.pop("end", None),
 658            execution_time=render_opts.pop("execution_time", None),
 659            expand=render_opts.pop("expand", False),
 660        )
 661
 662        no_format = render_opts.pop("no_format", False)
 663
 664        format_config = context.config_for_node(model).format
 665        format_options = {
 666            **format_config.generator_options,
 667            **{k: v for k, v in render_opts.items() if v is not None},
 668        }
 669
 670        sql = query.sql(
 671            pretty=True,
 672            dialect=context.config.dialect if dialect is None else dialect,
 673            **format_options,
 674        )
 675
 676        if no_format:
 677            context.console.log_status_update(sql)
 678        else:
 679            context.console.show_sql(sql)
 680
 681    @magic_arguments()
 682    @argument(
 683        "df_var",
 684        default=None,
 685        nargs="?",
 686        type=str,
 687        help="An optional variable name to store the resulting dataframe.",
 688    )
 689    @cell_magic
 690    @pass_sqlmesh_context
 691    def fetchdf(self, context: Context, line: str, sql: str) -> None:
 692        """Fetches a dataframe from sql, optionally storing it in a variable."""
 693        args = parse_argstring(self.fetchdf, line)
 694        df = context.fetchdf(sql)
 695        if args.df_var:
 696            self._shell.user_ns[args.df_var] = df
 697        self.display(df)
 698
 699    @magic_arguments()
 700    @argument("--file", "-f", type=str, help="An optional file path to write the HTML output to.")
 701    @argument(
 702        "--select-model",
 703        type=str,
 704        nargs="*",
 705        help="Select specific models to include in the dag.",
 706    )
 707    @line_magic
 708    @pass_sqlmesh_context
 709    def dag(self, context: Context, line: str) -> None:
 710        """Displays the HTML DAG."""
 711        args = parse_argstring(self.dag, line)
 712        dag = context.get_dag(args.select_model)
 713        if args.file:
 714            with open(args.file, "w", encoding="utf-8") as file:
 715                file.write(str(dag))
 716        # TODO: Have this go through console instead of calling display directly
 717        self.display(dag)
 718
 719    @magic_arguments()
 720    @line_magic
 721    @pass_sqlmesh_context
 722    def migrate(self, context: Context, line: str) -> None:
 723        """Migrate SQLMesh to the current running version."""
 724        context.migrate()
 725        context.console.log_success("Migration complete")
 726
 727    @magic_arguments()
 728    @argument(
 729        "--strict",
 730        action="store_true",
 731        help="Raise an error if the external model is missing in the database",
 732    )
 733    @line_magic
 734    @pass_sqlmesh_context
 735    def create_external_models(self, context: Context, line: str) -> None:
 736        """Create a schema file containing external model schemas."""
 737        args = parse_argstring(self.create_external_models, line)
 738        context.create_external_models(strict=args.strict)
 739
 740    @magic_arguments()
 741    @argument(
 742        "source_to_target",
 743        type=str,
 744        metavar="SOURCE:TARGET",
 745        help="Source and target in `SOURCE:TARGET` format",
 746    )
 747    @argument(
 748        "--on",
 749        type=str,
 750        nargs="*",
 751        help="The column to join on. Can be specified multiple times. The model grain will be used if not specified.",
 752    )
 753    @argument(
 754        "--skip-columns",
 755        type=str,
 756        nargs="*",
 757        help="The column(s) to skip when comparing the source and target table.",
 758    )
 759    @argument(
 760        "--model",
 761        type=str,
 762        help="The model to diff against when source and target are environments and not tables.",
 763    )
 764    @argument(
 765        "--where",
 766        type=str,
 767        help="An optional where statement to filter results.",
 768    )
 769    @argument(
 770        "--limit",
 771        type=int,
 772        default=20,
 773        help="The limit of the sample dataframe.",
 774    )
 775    @argument(
 776        "--show-sample",
 777        action="store_true",
 778        help="Show a sample of the rows that differ. With many columns, the output can be very wide.",
 779    )
 780    @argument(
 781        "--decimals",
 782        type=int,
 783        default=3,
 784        help="The number of decimal places to keep when comparing floating point columns. Default: 3",
 785    )
 786    @argument(
 787        "--select-model",
 788        type=str,
 789        nargs="*",
 790        help="Specify one or more models to data diff. Use wildcards to diff multiple models. Ex: '*' (all models with applied plan diffs), 'demo.model+' (this and downstream models), 'git:feature_branch' (models with direct modifications in this branch only)",
 791    )
 792    @argument(
 793        "--skip-grain-check",
 794        action="store_true",
 795        help="Disable the check for a primary key (grain) that is missing or is not unique.",
 796    )
 797    @argument(
 798        "--warn-grain-check",
 799        action="store_true",
 800        help="Warn if any selected model is missing a grain, and compute diffs for the remaining models.",
 801    )
 802    @argument(
 803        "--schema-diff-ignore-case",
 804        action="store_true",
 805        help="If set, when performing a schema diff the case of column names is ignored when matching between the two schemas. For example, 'col_a' in the source schema and 'COL_A' in the target schema will be treated as the same column.",
 806    )
 807    @line_magic
 808    @pass_sqlmesh_context
 809    def table_diff(self, context: Context, line: str) -> None:
 810        """Show the diff between two tables.
 811
 812        Can either be two tables or two environments and a model.
 813        """
 814        args = parse_argstring(self.table_diff, line)
 815        source, target = args.source_to_target.split(":")
 816        select_models = {args.model} if args.model else args.select_model or None
 817        context.table_diff(
 818            source=source,
 819            target=target,
 820            on=args.on,
 821            skip_columns=args.skip_columns,
 822            select_models=select_models,
 823            where=args.where,
 824            limit=args.limit,
 825            show_sample=args.show_sample,
 826            decimals=args.decimals,
 827            skip_grain_check=args.skip_grain_check,
 828            warn_grain_check=args.warn_grain_check,
 829            schema_diff_ignore_case=args.schema_diff_ignore_case,
 830        )
 831
 832    @magic_arguments()
 833    @argument(
 834        "model_name",
 835        nargs="?",
 836        type=str,
 837        help="The name of the model to get the table name for.",
 838    )
 839    @argument(
 840        "--environment",
 841        type=str,
 842        help="The environment to source the model version from.",
 843    )
 844    @argument(
 845        "--prod",
 846        action="store_true",
 847        help="If set, return the name of the physical table that will be used in production for the model version promoted in the target environment.",
 848    )
 849    @line_magic
 850    @pass_sqlmesh_context
 851    def table_name(self, context: Context, line: str) -> None:
 852        """Prints the name of the physical table for the given model."""
 853        args = parse_argstring(self.table_name, line)
 854        context.console.log_status_update(
 855            context.table_name(args.model_name, args.environment, args.prod)
 856        )
 857
 858    @magic_arguments()
 859    @argument(
 860        "pipeline",
 861        nargs="?",
 862        type=str,
 863        help="The dlt pipeline to attach for this SQLMesh project.",
 864    )
 865    @argument(
 866        "--table",
 867        "-t",
 868        type=str,
 869        nargs="*",
 870        help="The specific dlt tables to refresh in the SQLMesh models.",
 871    )
 872    @argument(
 873        "--force",
 874        "-f",
 875        action="store_true",
 876        help="If set, existing models are overwritten with the new DLT tables.",
 877    )
 878    @argument(
 879        "--dlt-path",
 880        type=str,
 881        help="The directory where the DLT pipeline resides.",
 882    )
 883    @line_magic
 884    @pass_sqlmesh_context
 885    def dlt_refresh(self, context: Context, line: str) -> None:
 886        """Attaches to a DLT pipeline with the option to update specific or all missing tables in the SQLMesh project."""
 887        from sqlmesh.integrations.dlt import generate_dlt_models
 888
 889        args = parse_argstring(self.dlt_refresh, line)
 890        sqlmesh_models = generate_dlt_models(
 891            context, args.pipeline, list(args.table or []), args.force, args.dlt_path
 892        )
 893        if sqlmesh_models:
 894            model_names = "\n".join([f"- {model_name}" for model_name in sqlmesh_models])
 895            context.console.log_success(f"Updated SQLMesh project with models:\n{model_names}")
 896        else:
 897            context.console.log_success("All SQLMesh models are up to date.")
 898
 899    @magic_arguments()
 900    @argument(
 901        "--read",
 902        type=str,
 903        default="",
 904        help="The input dialect of the sql string.",
 905    )
 906    @argument(
 907        "--write",
 908        type=str,
 909        default="",
 910        help="The output dialect of the sql string.",
 911    )
 912    @line_cell_magic
 913    @pass_sqlmesh_context
 914    def rewrite(self, context: Context, line: str, sql: str) -> None:
 915        """Rewrite a sql expression with semantic references into an executable query.
 916
 917        https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/
 918        """
 919        args = parse_argstring(self.rewrite, line)
 920        context.console.show_sql(
 921            context.rewrite(sql, args.read).sql(
 922                dialect=args.write or context.config.dialect, pretty=True
 923            )
 924        )
 925
 926    @magic_arguments()
 927    @argument(
 928        "--transpile",
 929        "-t",
 930        type=str,
 931        help="Transpile project models to the specified dialect.",
 932    )
 933    @argument(
 934        "--check",
 935        action="store_true",
 936        help="Whether or not to check formatting (but not actually format anything).",
 937        default=None,
 938    )
 939    @argument(
 940        "--append-newline",
 941        action="store_true",
 942        help="Include a newline at the end of the output.",
 943        default=None,
 944    )
 945    @argument(
 946        "--no-rewrite-casts",
 947        action="store_true",
 948        help="Preserve the existing casts, without rewriting them to use the :: syntax.",
 949        default=None,
 950    )
 951    @format_arguments
 952    @line_magic
 953    @pass_sqlmesh_context
 954    def format(self, context: Context, line: str) -> bool:
 955        """Format all SQL models and audits."""
 956        format_opts = vars(parse_argstring(self.format, line))
 957        if format_opts.pop("no_rewrite_casts", None):
 958            format_opts["rewrite_casts"] = False
 959
 960        return context.format(**{k: v for k, v in format_opts.items() if v is not None})
 961
 962    @magic_arguments()
 963    @argument("environment", type=str, help="The environment to diff local state against.")
 964    @line_magic
 965    @pass_sqlmesh_context
 966    def diff(self, context: Context, line: str) -> None:
 967        """Show the diff between the local state and the target environment."""
 968        args = parse_argstring(self.diff, line)
 969        context.diff(args.environment)
 970
 971    @magic_arguments()
 972    @argument("environment", type=str, help="The environment to invalidate.")
 973    @line_magic
 974    @pass_sqlmesh_context
 975    def invalidate(self, context: Context, line: str) -> None:
 976        """Invalidate the target environment, forcing its removal during the next run of the janitor process."""
 977        args = parse_argstring(self.invalidate, line)
 978        context.invalidate_environment(args.environment)
 979
 980    @magic_arguments()
 981    @argument(
 982        "--ignore-ttl",
 983        action="store_true",
 984        help="Cleanup snapshots that are not referenced in any environment, regardless of when they're set to expire",
 985    )
 986    @line_magic
 987    @pass_sqlmesh_context
 988    def janitor(self, context: Context, line: str) -> None:
 989        """Run the janitor process to clean up old environments and expired snapshots."""
 990        args = parse_argstring(self.janitor, line)
 991        context.run_janitor(ignore_ttl=args.ignore_ttl)
 992
 993    @magic_arguments()
 994    @argument("model", type=str)
 995    @argument(
 996        "--query",
 997        "-q",
 998        type=str,
 999        nargs="+",
1000        default=[],
1001        help="Queries that will be used to generate data for the model's dependencies.",
1002    )
1003    @argument(
1004        "--overwrite",
1005        "-o",
1006        action="store_true",
1007        help="When true, the fixture file will be overwritten in case it already exists.",
1008    )
1009    @argument(
1010        "--var",
1011        "-v",
1012        type=str,
1013        nargs="+",
1014        help="Key-value pairs that will define variables needed by the model.",
1015    )
1016    @argument(
1017        "--path",
1018        "-p",
1019        type=str,
1020        help="The file path corresponding to the fixture, relative to the test directory. "
1021        "By default, the fixture will be created under the test directory and the file "
1022        "name will be inferred based on the test's name.",
1023    )
1024    @argument(
1025        "--name",
1026        "-n",
1027        type=str,
1028        help="The name of the test that will be created. By default, it's inferred based on the model's name.",
1029    )
1030    @argument(
1031        "--include-ctes",
1032        action="store_true",
1033        help="When true, CTE fixtures will also be generated.",
1034    )
1035    @line_magic
1036    @pass_sqlmesh_context
1037    def create_test(self, context: Context, line: str) -> None:
1038        """Generate a unit test fixture for a given model."""
1039        args = parse_argstring(self.create_test, line)
1040        queries = iter(args.query)
1041        variables = iter(args.var) if args.var else None
1042        context.create_test(
1043            args.model,
1044            input_queries={k: v.strip('"') for k, v in dict(zip(queries, queries)).items()},
1045            overwrite=args.overwrite,
1046            variables=dict(zip(variables, variables)) if variables else None,
1047            path=args.path,
1048            name=args.name,
1049            include_ctes=args.include_ctes,
1050        )
1051
1052    @magic_arguments()
1053    @argument("tests", nargs="*", type=str)
1054    @argument(
1055        "--pattern",
1056        "-k",
1057        nargs="*",
1058        type=str,
1059        help="Only run tests that match the pattern of substring.",
1060    )
1061    @argument(
1062        "--verbose",
1063        "-v",
1064        action="count",
1065        default=0,
1066        help="Verbose output. Use -vv for very verbose.",
1067    )
1068    @argument(
1069        "--preserve-fixtures",
1070        action="store_true",
1071        help="Preserve the fixture tables in the testing database, useful for debugging.",
1072    )
1073    @line_magic
1074    @pass_sqlmesh_context
1075    def run_test(self, context: Context, line: str) -> None:
1076        """Run unit test(s)."""
1077        args = parse_argstring(self.run_test, line)
1078
1079        context.test(
1080            match_patterns=args.pattern,
1081            tests=args.tests,
1082            verbosity=Verbosity(args.verbose),
1083            preserve_fixtures=args.preserve_fixtures,
1084            stream=StringIO(),  # consume the output instead of redirecting to stdout
1085        )
1086
1087    @magic_arguments()
1088    @argument(
1089        "models", type=str, nargs="*", help="A model to audit. Multiple models can be audited."
1090    )
1091    @argument("--start", "-s", type=str, help="Start date to audit.")
1092    @argument("--end", "-e", type=str, help="End date to audit.")
1093    @argument("--execution-time", type=str, help="Execution time.")
1094    @line_magic
1095    @pass_sqlmesh_context
1096    def audit(self, context: Context, line: str) -> bool:
1097        """Run audit(s)"""
1098        args = parse_argstring(self.audit, line)
1099        return context.audit(
1100            models=args.models, start=args.start, end=args.end, execution_time=args.execution_time
1101        )
1102
1103    @magic_arguments()
1104    @argument("environment", nargs="?", type=str, help="The environment to check intervals for.")
1105    @argument(
1106        "--no-signals",
1107        action="store_true",
1108        help="Disable signal checks and only show missing intervals.",
1109        default=False,
1110    )
1111    @argument(
1112        "--select-model",
1113        type=str,
1114        nargs="*",
1115        help="Select specific model changes that should be included in the plan.",
1116    )
1117    @argument("--start", "-s", type=str, help="Start date of intervals to check for.")
1118    @argument("--end", "-e", type=str, help="End date of intervals to check for.")
1119    @line_magic
1120    @pass_sqlmesh_context
1121    def check_intervals(self, context: Context, line: str) -> None:
1122        """Show missing intervals in an environment, respecting signals."""
1123        args = parse_argstring(self.check_intervals, line)
1124
1125        context.console.show_intervals(
1126            context.check_intervals(
1127                environment=args.environment,
1128                no_signals=args.no_signals,
1129                select_models=args.select_model,
1130                start=args.start,
1131                end=args.end,
1132            )
1133        )
1134
1135    @magic_arguments()
1136    @argument(
1137        "--skip-connection",
1138        action="store_true",
1139        help="Skip the connection test.",
1140        default=False,
1141    )
1142    @argument(
1143        "--verbose",
1144        "-v",
1145        action="count",
1146        default=0,
1147        help="Verbose output. Use -vv for very verbose.",
1148    )
1149    @line_magic
1150    @pass_sqlmesh_context
1151    def info(self, context: Context, line: str) -> None:
1152        """Display SQLMesh project information."""
1153        args = parse_argstring(self.info, line)
1154        context.print_info(skip_connection=args.skip_connection, verbosity=Verbosity(args.verbose))
1155
1156    @magic_arguments()
1157    @line_magic
1158    @pass_sqlmesh_context
1159    def rollback(self, context: Context, line: str) -> None:
1160        """Rollback SQLMesh to the previous migration."""
1161        context.rollback()
1162
1163    @magic_arguments()
1164    @line_magic
1165    @pass_sqlmesh_context
1166    def clean(self, context: Context, line: str) -> None:
1167        """Clears the SQLMesh cache and any build artifacts."""
1168        context.clear_caches()
1169        context.console.log_success("SQLMesh cache and build artifacts cleared")
1170
1171    @magic_arguments()
1172    @line_magic
1173    @pass_sqlmesh_context
1174    def environments(self, context: Context, line: str) -> None:
1175        """Prints the list of SQLMesh environments with its expiry datetime."""
1176        context.print_environment_names()
1177
1178    @magic_arguments()
1179    @argument(
1180        "--models",
1181        "--model",
1182        type=str,
1183        nargs="*",
1184        help="A model to lint. Multiple models can be linted. If no models are specified, every model will be linted.",
1185    )
1186    @line_magic
1187    @pass_sqlmesh_context
1188    def lint(self, context: Context, line: str) -> None:
1189        """Run linter for target model(s)"""
1190        args = parse_argstring(self.lint, line)
1191        context.lint_models(args.models)
1192
1193    @magic_arguments()
1194    @line_magic
1195    @pass_sqlmesh_context
1196    def destroy(self, context: Context, line: str) -> None:
1197        """Removes all project resources, engine-managed objects, state tables and clears the SQLMesh cache."""
1198        context.destroy()

Base class for implementing magic functions.

Shell functions which can be reached as %function_name. All magic functions should accept a string, which they can parse for their own needs. This can make some functions easier to type, eg %cd ../ vs. %cd("../")

Classes providing magic functions need to subclass this class, and they MUST:

  • Use the method decorators @line_magic and @cell_magic to decorate individual methods as magic functions, AND

  • Use the class decorator @magics_class to ensure that the magic methods are properly registered at the instance level upon instance initialization.

See magic_functions for examples of actual implementation classes.

display: Callable
138    @property
139    def display(self) -> t.Callable:
140        from sqlmesh import RuntimeEnv
141
142        if RuntimeEnv.get().is_databricks:
143            # Use Databricks' special display instead of the normal IPython display
144            return self._shell.user_ns["display"]
145        return display
@magic_arguments()
@argument('paths', type=str, nargs='+', default='', help='The path(s) to the SQLMesh project(s).')
@argument('--config', type=str, help='Name of the config object. Only applicable to configuration defined using Python script.')
@argument('--gateway', type=str, help='The name of the gateway.')
@argument('--ignore-warnings', action='store_true', help='Ignore warnings.')
@argument('--debug', action='store_true', help='Enable debug mode.')
@argument('--log-file-dir', type=str, help='The directory to write the log file to.')
@argument('--dotenv', type=str, help='Path to a custom .env file to load environment variables from.')
@line_magic
def context(self, line: str) -> None:
154    @magic_arguments()
155    @argument(
156        "paths",
157        type=str,
158        nargs="+",
159        default="",
160        help="The path(s) to the SQLMesh project(s).",
161    )
162    @argument(
163        "--config",
164        type=str,
165        help="Name of the config object. Only applicable to configuration defined using Python script.",
166    )
167    @argument("--gateway", type=str, help="The name of the gateway.")
168    @argument("--ignore-warnings", action="store_true", help="Ignore warnings.")
169    @argument("--debug", action="store_true", help="Enable debug mode.")
170    @argument("--log-file-dir", type=str, help="The directory to write the log file to.")
171    @argument(
172        "--dotenv", type=str, help="Path to a custom .env file to load environment variables from."
173    )
174    @line_magic
175    def context(self, line: str) -> None:
176        """Sets the context in the user namespace."""
177        from sqlmesh import configure_logging, remove_excess_logs
178
179        args = parse_argstring(self.context, line)
180        log_file_dir = args.log_file_dir
181
182        configure_logging(
183            args.debug,
184            log_file_dir=log_file_dir,
185            ignore_warnings=args.ignore_warnings,
186        )
187        configure_console(ignore_warnings=args.ignore_warnings)
188
189        dotenv_path = Path(args.dotenv) if args.dotenv else None
190        configs = load_configs(
191            args.config, Context.CONFIG_TYPE, args.paths, dotenv_path=dotenv_path
192        )
193        log_limit = list(configs.values())[0].log_limit
194
195        remove_excess_logs(log_file_dir, log_limit)
196
197        try:
198            context = Context(paths=args.paths, config=configs, gateway=args.gateway)
199            self._shell.user_ns["context"] = context
200        except Exception:
201            if args.debug:
202                logger.exception("Failed to initialize SQLMesh context")
203            raise
204
205        context.console.log_success(f"SQLMesh project context set to: {', '.join(args.paths)}")

::

%context [--config CONFIG] [--gateway GATEWAY] [--ignore-warnings] [--debug] [--log-file-dir LOG_FILE_DIR] [--dotenv DOTENV] paths [paths ...]

Sets the context in the user namespace.

positional arguments: paths The path(s) to the SQLMesh project(s).

options: --config CONFIG Name of the config object. Only applicable to configuration defined using Python script. --gateway GATEWAY The name of the gateway. --ignore-warnings Ignore warnings. --debug Enable debug mode. --log-file-dir LOG_FILE_DIR The directory to write the log file to. --dotenv DOTENV Path to a custom .env file to load environment variables from.

@magic_arguments()
@argument('path', type=str, help='The path where the new SQLMesh project should be created.')
@argument('engine', type=str, help=f"Project SQL engine. Supported values: '{', '.join([info[1] for info in sorted(INIT_DISPLAY_INFO_TO_TYPE.values(), key=lambda x: x[0])])}'.")
@argument('--template', '-t', type=str, help='Project template. Supported values: dbt, default, empty.')
@argument('--dlt-pipeline', type=str, help='DLT pipeline for which to generate a SQLMesh project. Use alongside template: dlt')
@argument('--dlt-path', type=str, help='The directory where the DLT pipeline resides. Use alongside template: dlt')
@line_magic
def init(self, line: str) -> None:
207    @magic_arguments()
208    @argument("path", type=str, help="The path where the new SQLMesh project should be created.")
209    @argument(
210        "engine",
211        type=str,
212        help=f"Project SQL engine. Supported values: '{', '.join([info[1] for info in sorted(INIT_DISPLAY_INFO_TO_TYPE.values(), key=lambda x: x[0])])}'.",  # type: ignore
213    )
214    @argument(
215        "--template",
216        "-t",
217        type=str,
218        help="Project template. Supported values: dbt, default, empty.",
219    )
220    @argument(
221        "--dlt-pipeline",
222        type=str,
223        help="DLT pipeline for which to generate a SQLMesh project. Use alongside template: dlt",
224    )
225    @argument(
226        "--dlt-path",
227        type=str,
228        help="The directory where the DLT pipeline resides. Use alongside template: dlt",
229    )
230    @line_magic
231    def init(self, line: str) -> None:
232        """Creates a SQLMesh project scaffold with a default SQL dialect."""
233        args = parse_argstring(self.init, line)
234        try:
235            project_template = ProjectTemplate(
236                args.template.lower() if args.template else "default"
237            )
238        except ValueError:
239            raise MagicError(f"Invalid project template '{args.template}'")
240        init_example_project(
241            path=args.path,
242            engine_type=args.engine,
243            dialect=None,
244            template=project_template,
245            pipeline=args.dlt_pipeline,
246            dlt_path=args.dlt_path,
247        )
248        html = str(
249            h(
250                "div",
251                h(
252                    "span",
253                    {"style": {"color": "green", "font-weight": "bold"}},
254                    "SQLMesh project scaffold created",
255                ),
256            )
257        )
258        self.display(JupyterRenderable(html=html, text=""))

::

%init [--template TEMPLATE] [--dlt-pipeline DLT_PIPELINE] [--dlt-path DLT_PATH] path engine

Creates a SQLMesh project scaffold with a default SQL dialect.

positional arguments: path The path where the new SQLMesh project should be created. engine Project SQL engine. Supported values: 'DuckDB, Snowflake, Databricks, BigQuery, MotherDuck, ClickHouse, Redshift, Spark, Trino, Azure SQL, MSSQL, Postgres, GCP Postgres, MySQL, Athena, RisingWave, Fabric'.

options: --template TEMPLATE, -t TEMPLATE Project template. Supported values: dbt, default, empty. --dlt-pipeline DLT_PIPELINE DLT pipeline for which to generate a SQLMesh project. Use alongside template: dlt --dlt-path DLT_PATH The directory where the DLT pipeline resides. Use alongside template: dlt

@magic_arguments()
@argument('model', type=str, help='The model.')
@argument('--start', '-s', type=str, help='Start date to render.')
@argument('--end', '-e', type=str, help='End date to render.')
@argument('--execution-time', type=str, help='Execution time.')
@argument('--dialect', '-d', type=str, help='The rendered dialect.')
@line_cell_magic
@pass_sqlmesh_context
def model( self, context: sqlmesh.core.context.Context, line: str, sql: Optional[str] = None) -> None:
260    @magic_arguments()
261    @argument("model", type=str, help="The model.")
262    @argument("--start", "-s", type=str, help="Start date to render.")
263    @argument("--end", "-e", type=str, help="End date to render.")
264    @argument("--execution-time", type=str, help="Execution time.")
265    @argument("--dialect", "-d", type=str, help="The rendered dialect.")
266    @line_cell_magic
267    @pass_sqlmesh_context
268    def model(self, context: Context, line: str, sql: t.Optional[str] = None) -> None:
269        """Renders the model and automatically fills in an editable cell with the model definition."""
270        args = parse_argstring(self.model, line)
271
272        model = context.get_model(args.model, raise_if_missing=True)
273        config = context.config_for_node(model)
274
275        if sql:
276            expressions = parse(sql, default_dialect=config.dialect)
277            loaded = load_sql_based_model(
278                expressions,
279                macros=context._macros,
280                jinja_macros=context._jinja_macros,
281                path=model._path,
282                dialect=config.dialect,
283                time_column_format=config.time_column_format,
284                physical_schema_mapping=context.config.physical_schema_mapping,
285                default_catalog=context.default_catalog,
286            )
287
288            if loaded.name == args.model:
289                model = loaded
290        else:
291            if model._path:
292                with open(model._path, "r", encoding="utf-8") as file:
293                    expressions = parse(file.read(), default_dialect=config.dialect)
294
295        formatted = format_model_expressions(
296            expressions,
297            model.dialect,
298            rewrite_casts=not config.format.no_rewrite_casts,
299            **config.format.generator_options,
300        )
301
302        self._shell.set_next_input(
303            "\n".join(
304                [
305                    " ".join(["%%model", line]),
306                    formatted,
307                ]
308            ),
309            replace=True,
310        )
311
312        if model._path:
313            with open(model._path, "w", encoding="utf-8") as file:
314                file.write(formatted)
315
316        if sql:
317            context.console.log_success(f"Model `{args.model}` updated")
318
319        context.upsert_model(model)
320        context.console.show_sql(
321            context.render(
322                model.name,
323                start=args.start,
324                end=args.end,
325                execution_time=args.execution_time,
326            ).sql(pretty=True, dialect=args.dialect or model.dialect)
327        )

Renders the model and automatically fills in an editable cell with the model definition.

@magic_arguments()
@argument('model', type=str, help='The model.')
@argument('test_name', type=str, nargs='?', default=None, help='The test name to display')
@argument('--ls', action='store_true', help='List tests associated with a model')
@line_cell_magic
@pass_sqlmesh_context
def test( self, context: sqlmesh.core.context.Context, line: str, test_def_raw: Optional[str] = None) -> None:
329    @magic_arguments()
330    @argument("model", type=str, help="The model.")
331    @argument("test_name", type=str, nargs="?", default=None, help="The test name to display")
332    @argument("--ls", action="store_true", help="List tests associated with a model")
333    @line_cell_magic
334    @pass_sqlmesh_context
335    def test(self, context: Context, line: str, test_def_raw: t.Optional[str] = None) -> None:
336        """Allow the user to list tests for a model, output a specific test, and then write their changes back"""
337        args = parse_argstring(self.test, line)
338        if not args.test_name and not args.ls:
339            raise MagicError("Must provide either test name or `--ls` to list tests")
340
341        test_meta = context.select_tests()
342
343        tests: t.Dict[str, t.Dict[str, ModelTestMetadata]] = defaultdict(dict)
344        for model_test_metadata in test_meta:
345            model = model_test_metadata.body.get("model")
346            if not model:
347                context.console.log_error(
348                    f"Test found that does not have `model` defined: {model_test_metadata.path}"
349                )
350            else:
351                tests[model][model_test_metadata.test_name] = model_test_metadata
352
353        model = context.get_model(args.model, raise_if_missing=True)
354
355        if args.ls:
356            # TODO: Provide better UI for displaying tests
357            for test_name in tests[model.name]:
358                context.console.log_status_update(test_name)
359            return
360
361        test = tests[model.name][args.test_name]
362        test_def = yaml.load(test_def_raw) if test_def_raw else test.body
363        test_def_output = yaml.dump(test_def)
364
365        self._shell.set_next_input(
366            "\n".join(
367                [
368                    " ".join(["%%test", line]),
369                    test_def_output,
370                ]
371            ),
372            replace=True,
373        )
374
375        with open(test.path, "r+", encoding="utf-8") as file:
376            content = yaml.load(file.read())
377            content[args.test_name] = test_def
378            file.seek(0)
379            yaml.dump(content, file)
380            file.truncate()

Allow the user to list tests for a model, output a specific test, and then write their changes back

@magic_arguments()
@argument('environment', nargs='?', type=str, help='The environment to run the plan against')
@argument('--start', '-s', type=str, help='Start date to backfill.')
@argument('--end', '-e', type=str, help='End date to backfill.')
@argument('--execution-time', type=str, help='Execution time.')
@argument('--create-from', type=str, help="The environment to create the target environment from if it doesn't exist. Default: prod.")
@argument('--skip-tests', '-t', action='store_true', help='Skip the unit tests defined for the model.')
@argument('--skip-linter', action='store_true', help='Skip the linter for the model.')
@argument('--restate-model', '-r', type=str, nargs='*', help='Restate data for specified models (and models downstream from the one specified). For production environment, all related model versions will have their intervals wiped, but only the current versions will be backfilled. For development environment, only the current model versions will be affected.')
@argument('--no-gaps', '-g', action='store_true', help='Ensure that new snapshots have no data gaps when comparing to existing snapshots for matching models in the target environment.')
@argument('--skip-backfill', '--dry-run', action='store_true', help='Skip the backfill step and only create a virtual update for the plan.')
@argument('--empty-backfill', action='store_true', help='Produce empty backfill. Like --skip-backfill no models will be backfilled, unlike --skip-backfill missing intervals will be recorded as if they were backfilled.')
@argument('--forward-only', action='store_true', help='Create a plan for forward-only changes.', default=None)
@argument('--effective-from', type=str, help='The effective date from which to apply forward-only changes on production.')
@argument('--no-prompts', action='store_true', help='Disables interactive prompts for the backfill time range. Please note that if this flag is set and there are uncategorized changes, plan creation will fail.', default=None)
@argument('--auto-apply', action='store_true', help='Automatically applies the new plan after creation.', default=None)
@argument('--no-auto-categorization', action='store_true', help='Disable automatic change categorization.', default=None)
@argument('--include-unmodified', action='store_true', help='Include unmodified models in the target environment.', default=None)
@argument('--select-model', type=str, nargs='*', help='Select specific model changes that should be included in the plan.')
@argument('--backfill-model', type=str, nargs='*', help='Backfill only the models whose names match the expression.')
@argument('--no-diff', action='store_true', help='Hide text differences for changed models.', default=None)
@argument('--run', action='store_true', help='Run latest intervals as part of the plan application (prod environment only).')
@argument('--ignore-cron', action='store_true', help='Run for all missing intervals, ignoring individual cron schedules. Only applies if --run is set.', default=None)
@argument('--enable-preview', action='store_true', help='Enable preview for forward-only models when targeting a development environment.', default=None)
@argument('--diff-rendered', action='store_true', help='Output text differences for the rendered versions of the models and standalone audits')
@argument('--verbose', '-v', action='count', default=0, help='Verbose output. Use -vv for very verbose.')
@line_magic
@pass_sqlmesh_context
def plan(self, context: sqlmesh.core.context.Context, line: str) -> None:
382    @magic_arguments()
383    @argument(
384        "environment",
385        nargs="?",
386        type=str,
387        help="The environment to run the plan against",
388    )
389    @argument("--start", "-s", type=str, help="Start date to backfill.")
390    @argument("--end", "-e", type=str, help="End date to backfill.")
391    @argument("--execution-time", type=str, help="Execution time.")
392    @argument(
393        "--create-from",
394        type=str,
395        help="The environment to create the target environment from if it doesn't exist. Default: prod.",
396    )
397    @argument(
398        "--skip-tests",
399        "-t",
400        action="store_true",
401        help="Skip the unit tests defined for the model.",
402    )
403    @argument(
404        "--skip-linter",
405        action="store_true",
406        help="Skip the linter for the model.",
407    )
408    @argument(
409        "--restate-model",
410        "-r",
411        type=str,
412        nargs="*",
413        help="Restate data for specified models (and models downstream from the one specified). For production environment, all related model versions will have their intervals wiped, but only the current versions will be backfilled. For development environment, only the current model versions will be affected.",
414    )
415    @argument(
416        "--no-gaps",
417        "-g",
418        action="store_true",
419        help="Ensure that new snapshots have no data gaps when comparing to existing snapshots for matching models in the target environment.",
420    )
421    @argument(
422        "--skip-backfill",
423        "--dry-run",
424        action="store_true",
425        help="Skip the backfill step and only create a virtual update for the plan.",
426    )
427    @argument(
428        "--empty-backfill",
429        action="store_true",
430        help="Produce empty backfill. Like --skip-backfill no models will be backfilled, unlike --skip-backfill missing intervals will be recorded as if they were backfilled.",
431    )
432    @argument(
433        "--forward-only",
434        action="store_true",
435        help="Create a plan for forward-only changes.",
436        default=None,
437    )
438    @argument(
439        "--effective-from",
440        type=str,
441        help="The effective date from which to apply forward-only changes on production.",
442    )
443    @argument(
444        "--no-prompts",
445        action="store_true",
446        help="Disables interactive prompts for the backfill time range. Please note that if this flag is set and there are uncategorized changes, plan creation will fail.",
447        default=None,
448    )
449    @argument(
450        "--auto-apply",
451        action="store_true",
452        help="Automatically applies the new plan after creation.",
453        default=None,
454    )
455    @argument(
456        "--no-auto-categorization",
457        action="store_true",
458        help="Disable automatic change categorization.",
459        default=None,
460    )
461    @argument(
462        "--include-unmodified",
463        action="store_true",
464        help="Include unmodified models in the target environment.",
465        default=None,
466    )
467    @argument(
468        "--select-model",
469        type=str,
470        nargs="*",
471        help="Select specific model changes that should be included in the plan.",
472    )
473    @argument(
474        "--backfill-model",
475        type=str,
476        nargs="*",
477        help="Backfill only the models whose names match the expression.",
478    )
479    @argument(
480        "--no-diff",
481        action="store_true",
482        help="Hide text differences for changed models.",
483        default=None,
484    )
485    @argument(
486        "--run",
487        action="store_true",
488        help="Run latest intervals as part of the plan application (prod environment only).",
489    )
490    @argument(
491        "--ignore-cron",
492        action="store_true",
493        help="Run for all missing intervals, ignoring individual cron schedules. Only applies if --run is set.",
494        default=None,
495    )
496    @argument(
497        "--enable-preview",
498        action="store_true",
499        help="Enable preview for forward-only models when targeting a development environment.",
500        default=None,
501    )
502    @argument(
503        "--diff-rendered",
504        action="store_true",
505        help="Output text differences for the rendered versions of the models and standalone audits",
506    )
507    @argument(
508        "--verbose",
509        "-v",
510        action="count",
511        default=0,
512        help="Verbose output. Use -vv for very verbose.",
513    )
514    @line_magic
515    @pass_sqlmesh_context
516    def plan(self, context: Context, line: str) -> None:
517        """Goes through a set of prompts to both establish a plan and apply it"""
518        args = parse_argstring(self.plan, line)
519
520        setattr(context.console, "verbosity", Verbosity(args.verbose))
521
522        context.plan(
523            args.environment,
524            start=args.start,
525            end=args.end,
526            execution_time=args.execution_time,
527            create_from=args.create_from,
528            skip_tests=args.skip_tests,
529            restate_models=args.restate_model,
530            backfill_models=args.backfill_model,
531            no_gaps=args.no_gaps,
532            skip_backfill=args.skip_backfill,
533            empty_backfill=args.empty_backfill,
534            forward_only=args.forward_only,
535            no_prompts=args.no_prompts,
536            auto_apply=args.auto_apply,
537            no_auto_categorization=args.no_auto_categorization,
538            effective_from=args.effective_from,
539            include_unmodified=args.include_unmodified,
540            select_models=args.select_model,
541            no_diff=args.no_diff,
542            run=args.run,
543            ignore_cron=args.run,
544            enable_preview=args.enable_preview,
545            diff_rendered=args.diff_rendered,
546        )

Goes through a set of prompts to both establish a plan and apply it

@magic_arguments()
@argument('environment', nargs='?', type=str, help='The environment to run against')
@argument('--start', '-s', type=str, help='Start date to evaluate.')
@argument('--end', '-e', type=str, help='End date to evaluate.')
@argument('--skip-janitor', action='store_true', help='Skip the janitor task.')
@argument('--ignore-cron', action='store_true', help='Run for all missing intervals, ignoring individual cron schedules.')
@argument('--select-model', type=str, nargs='*', help='Select specific models to run. Note: this always includes upstream dependencies.')
@argument('--exit-on-env-update', type=int, help='If set, the command will exit with the specified code if the run is interrupted by an update to the target environment.')
@argument('--no-auto-upstream', action='store_true', help='Do not automatically include upstream models. Only applicable when --select-model is used. Note: this may result in missing / invalid data for the selected models.')
@line_magic
@pass_sqlmesh_context
def run_dag(self, context: sqlmesh.core.context.Context, line: str) -> None:
548    @magic_arguments()
549    @argument(
550        "environment",
551        nargs="?",
552        type=str,
553        help="The environment to run against",
554    )
555    @argument("--start", "-s", type=str, help="Start date to evaluate.")
556    @argument("--end", "-e", type=str, help="End date to evaluate.")
557    @argument("--skip-janitor", action="store_true", help="Skip the janitor task.")
558    @argument(
559        "--ignore-cron",
560        action="store_true",
561        help="Run for all missing intervals, ignoring individual cron schedules.",
562    )
563    @argument(
564        "--select-model",
565        type=str,
566        nargs="*",
567        help="Select specific models to run. Note: this always includes upstream dependencies.",
568    )
569    @argument(
570        "--exit-on-env-update",
571        type=int,
572        help="If set, the command will exit with the specified code if the run is interrupted by an update to the target environment.",
573    )
574    @argument(
575        "--no-auto-upstream",
576        action="store_true",
577        help="Do not automatically include upstream models. Only applicable when --select-model is used. Note: this may result in missing / invalid data for the selected models.",
578    )
579    @line_magic
580    @pass_sqlmesh_context
581    def run_dag(self, context: Context, line: str) -> None:
582        """Evaluate the DAG of models using the built-in scheduler."""
583        args = parse_argstring(self.run_dag, line)
584
585        completion_status = context.run(
586            args.environment,
587            start=args.start,
588            end=args.end,
589            skip_janitor=args.skip_janitor,
590            ignore_cron=args.ignore_cron,
591            select_models=args.select_model,
592            exit_on_env_update=args.exit_on_env_update,
593            no_auto_upstream=args.no_auto_upstream,
594        )
595        if completion_status.is_failure:
596            raise SQLMeshError("Error Running DAG. Check logs for details.")

Evaluate the DAG of models using the built-in scheduler.

@magic_arguments()
@argument('model', type=str, help='The model.')
@argument('--start', '-s', type=str, help='Start date to render.')
@argument('--end', '-e', type=str, help='End date to render.')
@argument('--execution-time', type=str, help='Execution time.')
@argument('--limit', type=int, help='The number of rows which the query should be limited to.')
@line_magic
@pass_sqlmesh_context
def evaluate(self, context: sqlmesh.core.context.Context, line: str) -> None:
598    @magic_arguments()
599    @argument("model", type=str, help="The model.")
600    @argument("--start", "-s", type=str, help="Start date to render.")
601    @argument("--end", "-e", type=str, help="End date to render.")
602    @argument("--execution-time", type=str, help="Execution time.")
603    @argument(
604        "--limit",
605        type=int,
606        help="The number of rows which the query should be limited to.",
607    )
608    @line_magic
609    @pass_sqlmesh_context
610    def evaluate(self, context: Context, line: str) -> None:
611        """Evaluate a model query and fetches a dataframe."""
612        context.refresh()
613
614        snowpark = optional_import("snowflake.snowpark")
615        args = parse_argstring(self.evaluate, line)
616
617        df = context.evaluate(
618            args.model,
619            start=args.start,
620            end=args.end,
621            execution_time=args.execution_time,
622            limit=args.limit,
623        )
624
625        if snowpark and isinstance(df, snowpark.DataFrame):
626            df = df.limit(args.limit or 100).to_pandas()
627
628        self.display(df)

Evaluate a model query and fetches a dataframe.

@magic_arguments()
@argument('model', type=str, help='The model.')
@argument('--start', '-s', type=str, help='Start date to render.')
@argument('--end', '-e', type=str, help='End date to render.')
@argument('--execution-time', type=str, help='Execution time.')
@argument('--expand', type=t.Union[bool, t.Iterable[str]], help='Whether or not to use expand materialized models, defaults to False. If True, all referenced models are expanded as raw queries. If a list, only referenced models are expanded as raw queries.')
@argument('--dialect', type=str, help='SQL dialect to render.')
@argument('--no-format', action='store_true', help='Disable fancy formatting of the query.')
@format_arguments
@line_magic
@pass_sqlmesh_context
def render(self, context: sqlmesh.core.context.Context, line: str) -> None:
630    @magic_arguments()
631    @argument("model", type=str, help="The model.")
632    @argument("--start", "-s", type=str, help="Start date to render.")
633    @argument("--end", "-e", type=str, help="End date to render.")
634    @argument("--execution-time", type=str, help="Execution time.")
635    @argument(
636        "--expand",
637        type=t.Union[bool, t.Iterable[str]],
638        help="Whether or not to use expand materialized models, defaults to False. If True, all referenced models are expanded as raw queries. If a list, only referenced models are expanded as raw queries.",
639    )
640    @argument("--dialect", type=str, help="SQL dialect to render.")
641    @argument("--no-format", action="store_true", help="Disable fancy formatting of the query.")
642    @format_arguments
643    @line_magic
644    @pass_sqlmesh_context
645    def render(self, context: Context, line: str) -> None:
646        """Renders a model's query, optionally expanding referenced models."""
647        context.refresh()
648        render_opts = vars(parse_argstring(self.render, line))
649        model = render_opts.pop("model")
650        dialect = render_opts.pop("dialect", None)
651
652        model = context.get_model(model, raise_if_missing=True)
653
654        query = context.render(
655            model,
656            start=render_opts.pop("start", None),
657            end=render_opts.pop("end", None),
658            execution_time=render_opts.pop("execution_time", None),
659            expand=render_opts.pop("expand", False),
660        )
661
662        no_format = render_opts.pop("no_format", False)
663
664        format_config = context.config_for_node(model).format
665        format_options = {
666            **format_config.generator_options,
667            **{k: v for k, v in render_opts.items() if v is not None},
668        }
669
670        sql = query.sql(
671            pretty=True,
672            dialect=context.config.dialect if dialect is None else dialect,
673            **format_options,
674        )
675
676        if no_format:
677            context.console.log_status_update(sql)
678        else:
679            context.console.show_sql(sql)

Renders a model's query, optionally expanding referenced models.

@magic_arguments()
@argument('df_var', default=None, nargs='?', type=str, help='An optional variable name to store the resulting dataframe.')
@cell_magic
@pass_sqlmesh_context
def fetchdf(self, context: sqlmesh.core.context.Context, line: str, sql: str) -> None:
681    @magic_arguments()
682    @argument(
683        "df_var",
684        default=None,
685        nargs="?",
686        type=str,
687        help="An optional variable name to store the resulting dataframe.",
688    )
689    @cell_magic
690    @pass_sqlmesh_context
691    def fetchdf(self, context: Context, line: str, sql: str) -> None:
692        """Fetches a dataframe from sql, optionally storing it in a variable."""
693        args = parse_argstring(self.fetchdf, line)
694        df = context.fetchdf(sql)
695        if args.df_var:
696            self._shell.user_ns[args.df_var] = df
697        self.display(df)

Fetches a dataframe from sql, optionally storing it in a variable.

@magic_arguments()
@argument('--file', '-f', type=str, help='An optional file path to write the HTML output to.')
@argument('--select-model', type=str, nargs='*', help='Select specific models to include in the dag.')
@line_magic
@pass_sqlmesh_context
def dag(self, context: sqlmesh.core.context.Context, line: str) -> None:
699    @magic_arguments()
700    @argument("--file", "-f", type=str, help="An optional file path to write the HTML output to.")
701    @argument(
702        "--select-model",
703        type=str,
704        nargs="*",
705        help="Select specific models to include in the dag.",
706    )
707    @line_magic
708    @pass_sqlmesh_context
709    def dag(self, context: Context, line: str) -> None:
710        """Displays the HTML DAG."""
711        args = parse_argstring(self.dag, line)
712        dag = context.get_dag(args.select_model)
713        if args.file:
714            with open(args.file, "w", encoding="utf-8") as file:
715                file.write(str(dag))
716        # TODO: Have this go through console instead of calling display directly
717        self.display(dag)

Displays the HTML DAG.

@magic_arguments()
@line_magic
@pass_sqlmesh_context
def migrate(self, context: sqlmesh.core.context.Context, line: str) -> None:
719    @magic_arguments()
720    @line_magic
721    @pass_sqlmesh_context
722    def migrate(self, context: Context, line: str) -> None:
723        """Migrate SQLMesh to the current running version."""
724        context.migrate()
725        context.console.log_success("Migration complete")

Migrate SQLMesh to the current running version.

@magic_arguments()
@argument('--strict', action='store_true', help='Raise an error if the external model is missing in the database')
@line_magic
@pass_sqlmesh_context
def create_external_models(self, context: sqlmesh.core.context.Context, line: str) -> None:
727    @magic_arguments()
728    @argument(
729        "--strict",
730        action="store_true",
731        help="Raise an error if the external model is missing in the database",
732    )
733    @line_magic
734    @pass_sqlmesh_context
735    def create_external_models(self, context: Context, line: str) -> None:
736        """Create a schema file containing external model schemas."""
737        args = parse_argstring(self.create_external_models, line)
738        context.create_external_models(strict=args.strict)

Create a schema file containing external model schemas.

@magic_arguments()
@argument('source_to_target', type=str, metavar='SOURCE:TARGET', help='Source and target in `SOURCE:TARGET` format')
@argument('--on', type=str, nargs='*', help='The column to join on. Can be specified multiple times. The model grain will be used if not specified.')
@argument('--skip-columns', type=str, nargs='*', help='The column(s) to skip when comparing the source and target table.')
@argument('--model', type=str, help='The model to diff against when source and target are environments and not tables.')
@argument('--where', type=str, help='An optional where statement to filter results.')
@argument('--limit', type=int, default=20, help='The limit of the sample dataframe.')
@argument('--show-sample', action='store_true', help='Show a sample of the rows that differ. With many columns, the output can be very wide.')
@argument('--decimals', type=int, default=3, help='The number of decimal places to keep when comparing floating point columns. Default: 3')
@argument('--select-model', type=str, nargs='*', help="Specify one or more models to data diff. Use wildcards to diff multiple models. Ex: '*' (all models with applied plan diffs), 'demo.model+' (this and downstream models), 'git:feature_branch' (models with direct modifications in this branch only)")
@argument('--skip-grain-check', action='store_true', help='Disable the check for a primary key (grain) that is missing or is not unique.')
@argument('--warn-grain-check', action='store_true', help='Warn if any selected model is missing a grain, and compute diffs for the remaining models.')
@argument('--schema-diff-ignore-case', action='store_true', help="If set, when performing a schema diff the case of column names is ignored when matching between the two schemas. For example, 'col_a' in the source schema and 'COL_A' in the target schema will be treated as the same column.")
@line_magic
@pass_sqlmesh_context
def table_diff(self, context: sqlmesh.core.context.Context, line: str) -> None:
740    @magic_arguments()
741    @argument(
742        "source_to_target",
743        type=str,
744        metavar="SOURCE:TARGET",
745        help="Source and target in `SOURCE:TARGET` format",
746    )
747    @argument(
748        "--on",
749        type=str,
750        nargs="*",
751        help="The column to join on. Can be specified multiple times. The model grain will be used if not specified.",
752    )
753    @argument(
754        "--skip-columns",
755        type=str,
756        nargs="*",
757        help="The column(s) to skip when comparing the source and target table.",
758    )
759    @argument(
760        "--model",
761        type=str,
762        help="The model to diff against when source and target are environments and not tables.",
763    )
764    @argument(
765        "--where",
766        type=str,
767        help="An optional where statement to filter results.",
768    )
769    @argument(
770        "--limit",
771        type=int,
772        default=20,
773        help="The limit of the sample dataframe.",
774    )
775    @argument(
776        "--show-sample",
777        action="store_true",
778        help="Show a sample of the rows that differ. With many columns, the output can be very wide.",
779    )
780    @argument(
781        "--decimals",
782        type=int,
783        default=3,
784        help="The number of decimal places to keep when comparing floating point columns. Default: 3",
785    )
786    @argument(
787        "--select-model",
788        type=str,
789        nargs="*",
790        help="Specify one or more models to data diff. Use wildcards to diff multiple models. Ex: '*' (all models with applied plan diffs), 'demo.model+' (this and downstream models), 'git:feature_branch' (models with direct modifications in this branch only)",
791    )
792    @argument(
793        "--skip-grain-check",
794        action="store_true",
795        help="Disable the check for a primary key (grain) that is missing or is not unique.",
796    )
797    @argument(
798        "--warn-grain-check",
799        action="store_true",
800        help="Warn if any selected model is missing a grain, and compute diffs for the remaining models.",
801    )
802    @argument(
803        "--schema-diff-ignore-case",
804        action="store_true",
805        help="If set, when performing a schema diff the case of column names is ignored when matching between the two schemas. For example, 'col_a' in the source schema and 'COL_A' in the target schema will be treated as the same column.",
806    )
807    @line_magic
808    @pass_sqlmesh_context
809    def table_diff(self, context: Context, line: str) -> None:
810        """Show the diff between two tables.
811
812        Can either be two tables or two environments and a model.
813        """
814        args = parse_argstring(self.table_diff, line)
815        source, target = args.source_to_target.split(":")
816        select_models = {args.model} if args.model else args.select_model or None
817        context.table_diff(
818            source=source,
819            target=target,
820            on=args.on,
821            skip_columns=args.skip_columns,
822            select_models=select_models,
823            where=args.where,
824            limit=args.limit,
825            show_sample=args.show_sample,
826            decimals=args.decimals,
827            skip_grain_check=args.skip_grain_check,
828            warn_grain_check=args.warn_grain_check,
829            schema_diff_ignore_case=args.schema_diff_ignore_case,
830        )

Show the diff between two tables.

Can either be two tables or two environments and a model.

@magic_arguments()
@argument('model_name', nargs='?', type=str, help='The name of the model to get the table name for.')
@argument('--environment', type=str, help='The environment to source the model version from.')
@argument('--prod', action='store_true', help='If set, return the name of the physical table that will be used in production for the model version promoted in the target environment.')
@line_magic
@pass_sqlmesh_context
def table_name(self, context: sqlmesh.core.context.Context, line: str) -> None:
832    @magic_arguments()
833    @argument(
834        "model_name",
835        nargs="?",
836        type=str,
837        help="The name of the model to get the table name for.",
838    )
839    @argument(
840        "--environment",
841        type=str,
842        help="The environment to source the model version from.",
843    )
844    @argument(
845        "--prod",
846        action="store_true",
847        help="If set, return the name of the physical table that will be used in production for the model version promoted in the target environment.",
848    )
849    @line_magic
850    @pass_sqlmesh_context
851    def table_name(self, context: Context, line: str) -> None:
852        """Prints the name of the physical table for the given model."""
853        args = parse_argstring(self.table_name, line)
854        context.console.log_status_update(
855            context.table_name(args.model_name, args.environment, args.prod)
856        )

Prints the name of the physical table for the given model.

@magic_arguments()
@argument('pipeline', nargs='?', type=str, help='The dlt pipeline to attach for this SQLMesh project.')
@argument('--table', '-t', type=str, nargs='*', help='The specific dlt tables to refresh in the SQLMesh models.')
@argument('--force', '-f', action='store_true', help='If set, existing models are overwritten with the new DLT tables.')
@argument('--dlt-path', type=str, help='The directory where the DLT pipeline resides.')
@line_magic
@pass_sqlmesh_context
def dlt_refresh(self, context: sqlmesh.core.context.Context, line: str) -> None:
858    @magic_arguments()
859    @argument(
860        "pipeline",
861        nargs="?",
862        type=str,
863        help="The dlt pipeline to attach for this SQLMesh project.",
864    )
865    @argument(
866        "--table",
867        "-t",
868        type=str,
869        nargs="*",
870        help="The specific dlt tables to refresh in the SQLMesh models.",
871    )
872    @argument(
873        "--force",
874        "-f",
875        action="store_true",
876        help="If set, existing models are overwritten with the new DLT tables.",
877    )
878    @argument(
879        "--dlt-path",
880        type=str,
881        help="The directory where the DLT pipeline resides.",
882    )
883    @line_magic
884    @pass_sqlmesh_context
885    def dlt_refresh(self, context: Context, line: str) -> None:
886        """Attaches to a DLT pipeline with the option to update specific or all missing tables in the SQLMesh project."""
887        from sqlmesh.integrations.dlt import generate_dlt_models
888
889        args = parse_argstring(self.dlt_refresh, line)
890        sqlmesh_models = generate_dlt_models(
891            context, args.pipeline, list(args.table or []), args.force, args.dlt_path
892        )
893        if sqlmesh_models:
894            model_names = "\n".join([f"- {model_name}" for model_name in sqlmesh_models])
895            context.console.log_success(f"Updated SQLMesh project with models:\n{model_names}")
896        else:
897            context.console.log_success("All SQLMesh models are up to date.")

Attaches to a DLT pipeline with the option to update specific or all missing tables in the SQLMesh project.

@magic_arguments()
@argument('--read', type=str, default='', help='The input dialect of the sql string.')
@argument('--write', type=str, default='', help='The output dialect of the sql string.')
@line_cell_magic
@pass_sqlmesh_context
def rewrite(self, context: sqlmesh.core.context.Context, line: str, sql: str) -> None:
899    @magic_arguments()
900    @argument(
901        "--read",
902        type=str,
903        default="",
904        help="The input dialect of the sql string.",
905    )
906    @argument(
907        "--write",
908        type=str,
909        default="",
910        help="The output dialect of the sql string.",
911    )
912    @line_cell_magic
913    @pass_sqlmesh_context
914    def rewrite(self, context: Context, line: str, sql: str) -> None:
915        """Rewrite a sql expression with semantic references into an executable query.
916
917        https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/
918        """
919        args = parse_argstring(self.rewrite, line)
920        context.console.show_sql(
921            context.rewrite(sql, args.read).sql(
922                dialect=args.write or context.config.dialect, pretty=True
923            )
924        )

Rewrite a sql expression with semantic references into an executable query.

https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/

@magic_arguments()
@argument('--transpile', '-t', type=str, help='Transpile project models to the specified dialect.')
@argument('--check', action='store_true', help='Whether or not to check formatting (but not actually format anything).', default=None)
@argument('--append-newline', action='store_true', help='Include a newline at the end of the output.', default=None)
@argument('--no-rewrite-casts', action='store_true', help='Preserve the existing casts, without rewriting them to use the :: syntax.', default=None)
@format_arguments
@line_magic
@pass_sqlmesh_context
def format(self, context: sqlmesh.core.context.Context, line: str) -> bool:
926    @magic_arguments()
927    @argument(
928        "--transpile",
929        "-t",
930        type=str,
931        help="Transpile project models to the specified dialect.",
932    )
933    @argument(
934        "--check",
935        action="store_true",
936        help="Whether or not to check formatting (but not actually format anything).",
937        default=None,
938    )
939    @argument(
940        "--append-newline",
941        action="store_true",
942        help="Include a newline at the end of the output.",
943        default=None,
944    )
945    @argument(
946        "--no-rewrite-casts",
947        action="store_true",
948        help="Preserve the existing casts, without rewriting them to use the :: syntax.",
949        default=None,
950    )
951    @format_arguments
952    @line_magic
953    @pass_sqlmesh_context
954    def format(self, context: Context, line: str) -> bool:
955        """Format all SQL models and audits."""
956        format_opts = vars(parse_argstring(self.format, line))
957        if format_opts.pop("no_rewrite_casts", None):
958            format_opts["rewrite_casts"] = False
959
960        return context.format(**{k: v for k, v in format_opts.items() if v is not None})

Format all SQL models and audits.

@magic_arguments()
@argument('environment', type=str, help='The environment to diff local state against.')
@line_magic
@pass_sqlmesh_context
def diff(self, context: sqlmesh.core.context.Context, line: str) -> None:
962    @magic_arguments()
963    @argument("environment", type=str, help="The environment to diff local state against.")
964    @line_magic
965    @pass_sqlmesh_context
966    def diff(self, context: Context, line: str) -> None:
967        """Show the diff between the local state and the target environment."""
968        args = parse_argstring(self.diff, line)
969        context.diff(args.environment)

Show the diff between the local state and the target environment.

@magic_arguments()
@argument('environment', type=str, help='The environment to invalidate.')
@line_magic
@pass_sqlmesh_context
def invalidate(self, context: sqlmesh.core.context.Context, line: str) -> None:
971    @magic_arguments()
972    @argument("environment", type=str, help="The environment to invalidate.")
973    @line_magic
974    @pass_sqlmesh_context
975    def invalidate(self, context: Context, line: str) -> None:
976        """Invalidate the target environment, forcing its removal during the next run of the janitor process."""
977        args = parse_argstring(self.invalidate, line)
978        context.invalidate_environment(args.environment)

Invalidate the target environment, forcing its removal during the next run of the janitor process.

@magic_arguments()
@argument('--ignore-ttl', action='store_true', help="Cleanup snapshots that are not referenced in any environment, regardless of when they're set to expire")
@line_magic
@pass_sqlmesh_context
def janitor(self, context: sqlmesh.core.context.Context, line: str) -> None:
980    @magic_arguments()
981    @argument(
982        "--ignore-ttl",
983        action="store_true",
984        help="Cleanup snapshots that are not referenced in any environment, regardless of when they're set to expire",
985    )
986    @line_magic
987    @pass_sqlmesh_context
988    def janitor(self, context: Context, line: str) -> None:
989        """Run the janitor process to clean up old environments and expired snapshots."""
990        args = parse_argstring(self.janitor, line)
991        context.run_janitor(ignore_ttl=args.ignore_ttl)

Run the janitor process to clean up old environments and expired snapshots.

@magic_arguments()
@argument('model', type=str)
@argument('--query', '-q', type=str, nargs='+', default=[], help="Queries that will be used to generate data for the model's dependencies.")
@argument('--overwrite', '-o', action='store_true', help='When true, the fixture file will be overwritten in case it already exists.')
@argument('--var', '-v', type=str, nargs='+', help='Key-value pairs that will define variables needed by the model.')
@argument('--path', '-p', type=str, help="The file path corresponding to the fixture, relative to the test directory. By default, the fixture will be created under the test directory and the file name will be inferred based on the test's name.")
@argument('--name', '-n', type=str, help="The name of the test that will be created. By default, it's inferred based on the model's name.")
@argument('--include-ctes', action='store_true', help='When true, CTE fixtures will also be generated.')
@line_magic
@pass_sqlmesh_context
def create_test(self, context: sqlmesh.core.context.Context, line: str) -> None:
 993    @magic_arguments()
 994    @argument("model", type=str)
 995    @argument(
 996        "--query",
 997        "-q",
 998        type=str,
 999        nargs="+",
1000        default=[],
1001        help="Queries that will be used to generate data for the model's dependencies.",
1002    )
1003    @argument(
1004        "--overwrite",
1005        "-o",
1006        action="store_true",
1007        help="When true, the fixture file will be overwritten in case it already exists.",
1008    )
1009    @argument(
1010        "--var",
1011        "-v",
1012        type=str,
1013        nargs="+",
1014        help="Key-value pairs that will define variables needed by the model.",
1015    )
1016    @argument(
1017        "--path",
1018        "-p",
1019        type=str,
1020        help="The file path corresponding to the fixture, relative to the test directory. "
1021        "By default, the fixture will be created under the test directory and the file "
1022        "name will be inferred based on the test's name.",
1023    )
1024    @argument(
1025        "--name",
1026        "-n",
1027        type=str,
1028        help="The name of the test that will be created. By default, it's inferred based on the model's name.",
1029    )
1030    @argument(
1031        "--include-ctes",
1032        action="store_true",
1033        help="When true, CTE fixtures will also be generated.",
1034    )
1035    @line_magic
1036    @pass_sqlmesh_context
1037    def create_test(self, context: Context, line: str) -> None:
1038        """Generate a unit test fixture for a given model."""
1039        args = parse_argstring(self.create_test, line)
1040        queries = iter(args.query)
1041        variables = iter(args.var) if args.var else None
1042        context.create_test(
1043            args.model,
1044            input_queries={k: v.strip('"') for k, v in dict(zip(queries, queries)).items()},
1045            overwrite=args.overwrite,
1046            variables=dict(zip(variables, variables)) if variables else None,
1047            path=args.path,
1048            name=args.name,
1049            include_ctes=args.include_ctes,
1050        )

Generate a unit test fixture for a given model.

@magic_arguments()
@argument('tests', nargs='*', type=str)
@argument('--pattern', '-k', nargs='*', type=str, help='Only run tests that match the pattern of substring.')
@argument('--verbose', '-v', action='count', default=0, help='Verbose output. Use -vv for very verbose.')
@argument('--preserve-fixtures', action='store_true', help='Preserve the fixture tables in the testing database, useful for debugging.')
@line_magic
@pass_sqlmesh_context
def run_test(self, context: sqlmesh.core.context.Context, line: str) -> None:
1052    @magic_arguments()
1053    @argument("tests", nargs="*", type=str)
1054    @argument(
1055        "--pattern",
1056        "-k",
1057        nargs="*",
1058        type=str,
1059        help="Only run tests that match the pattern of substring.",
1060    )
1061    @argument(
1062        "--verbose",
1063        "-v",
1064        action="count",
1065        default=0,
1066        help="Verbose output. Use -vv for very verbose.",
1067    )
1068    @argument(
1069        "--preserve-fixtures",
1070        action="store_true",
1071        help="Preserve the fixture tables in the testing database, useful for debugging.",
1072    )
1073    @line_magic
1074    @pass_sqlmesh_context
1075    def run_test(self, context: Context, line: str) -> None:
1076        """Run unit test(s)."""
1077        args = parse_argstring(self.run_test, line)
1078
1079        context.test(
1080            match_patterns=args.pattern,
1081            tests=args.tests,
1082            verbosity=Verbosity(args.verbose),
1083            preserve_fixtures=args.preserve_fixtures,
1084            stream=StringIO(),  # consume the output instead of redirecting to stdout
1085        )

Run unit test(s).

@magic_arguments()
@argument('models', type=str, nargs='*', help='A model to audit. Multiple models can be audited.')
@argument('--start', '-s', type=str, help='Start date to audit.')
@argument('--end', '-e', type=str, help='End date to audit.')
@argument('--execution-time', type=str, help='Execution time.')
@line_magic
@pass_sqlmesh_context
def audit(self, context: sqlmesh.core.context.Context, line: str) -> bool:
1087    @magic_arguments()
1088    @argument(
1089        "models", type=str, nargs="*", help="A model to audit. Multiple models can be audited."
1090    )
1091    @argument("--start", "-s", type=str, help="Start date to audit.")
1092    @argument("--end", "-e", type=str, help="End date to audit.")
1093    @argument("--execution-time", type=str, help="Execution time.")
1094    @line_magic
1095    @pass_sqlmesh_context
1096    def audit(self, context: Context, line: str) -> bool:
1097        """Run audit(s)"""
1098        args = parse_argstring(self.audit, line)
1099        return context.audit(
1100            models=args.models, start=args.start, end=args.end, execution_time=args.execution_time
1101        )

Run audit(s)

@magic_arguments()
@argument('environment', nargs='?', type=str, help='The environment to check intervals for.')
@argument('--no-signals', action='store_true', help='Disable signal checks and only show missing intervals.', default=False)
@argument('--select-model', type=str, nargs='*', help='Select specific model changes that should be included in the plan.')
@argument('--start', '-s', type=str, help='Start date of intervals to check for.')
@argument('--end', '-e', type=str, help='End date of intervals to check for.')
@line_magic
@pass_sqlmesh_context
def check_intervals(self, context: sqlmesh.core.context.Context, line: str) -> None:
1103    @magic_arguments()
1104    @argument("environment", nargs="?", type=str, help="The environment to check intervals for.")
1105    @argument(
1106        "--no-signals",
1107        action="store_true",
1108        help="Disable signal checks and only show missing intervals.",
1109        default=False,
1110    )
1111    @argument(
1112        "--select-model",
1113        type=str,
1114        nargs="*",
1115        help="Select specific model changes that should be included in the plan.",
1116    )
1117    @argument("--start", "-s", type=str, help="Start date of intervals to check for.")
1118    @argument("--end", "-e", type=str, help="End date of intervals to check for.")
1119    @line_magic
1120    @pass_sqlmesh_context
1121    def check_intervals(self, context: Context, line: str) -> None:
1122        """Show missing intervals in an environment, respecting signals."""
1123        args = parse_argstring(self.check_intervals, line)
1124
1125        context.console.show_intervals(
1126            context.check_intervals(
1127                environment=args.environment,
1128                no_signals=args.no_signals,
1129                select_models=args.select_model,
1130                start=args.start,
1131                end=args.end,
1132            )
1133        )

Show missing intervals in an environment, respecting signals.

@magic_arguments()
@argument('--skip-connection', action='store_true', help='Skip the connection test.', default=False)
@argument('--verbose', '-v', action='count', default=0, help='Verbose output. Use -vv for very verbose.')
@line_magic
@pass_sqlmesh_context
def info(self, context: sqlmesh.core.context.Context, line: str) -> None:
1135    @magic_arguments()
1136    @argument(
1137        "--skip-connection",
1138        action="store_true",
1139        help="Skip the connection test.",
1140        default=False,
1141    )
1142    @argument(
1143        "--verbose",
1144        "-v",
1145        action="count",
1146        default=0,
1147        help="Verbose output. Use -vv for very verbose.",
1148    )
1149    @line_magic
1150    @pass_sqlmesh_context
1151    def info(self, context: Context, line: str) -> None:
1152        """Display SQLMesh project information."""
1153        args = parse_argstring(self.info, line)
1154        context.print_info(skip_connection=args.skip_connection, verbosity=Verbosity(args.verbose))

Display SQLMesh project information.

@magic_arguments()
@line_magic
@pass_sqlmesh_context
def rollback(self, context: sqlmesh.core.context.Context, line: str) -> None:
1156    @magic_arguments()
1157    @line_magic
1158    @pass_sqlmesh_context
1159    def rollback(self, context: Context, line: str) -> None:
1160        """Rollback SQLMesh to the previous migration."""
1161        context.rollback()

Rollback SQLMesh to the previous migration.

@magic_arguments()
@line_magic
@pass_sqlmesh_context
def clean(self, context: sqlmesh.core.context.Context, line: str) -> None:
1163    @magic_arguments()
1164    @line_magic
1165    @pass_sqlmesh_context
1166    def clean(self, context: Context, line: str) -> None:
1167        """Clears the SQLMesh cache and any build artifacts."""
1168        context.clear_caches()
1169        context.console.log_success("SQLMesh cache and build artifacts cleared")

Clears the SQLMesh cache and any build artifacts.

@magic_arguments()
@line_magic
@pass_sqlmesh_context
def environments(self, context: sqlmesh.core.context.Context, line: str) -> None:
1171    @magic_arguments()
1172    @line_magic
1173    @pass_sqlmesh_context
1174    def environments(self, context: Context, line: str) -> None:
1175        """Prints the list of SQLMesh environments with its expiry datetime."""
1176        context.print_environment_names()

Prints the list of SQLMesh environments with its expiry datetime.

@magic_arguments()
@argument('--models', '--model', type=str, nargs='*', help='A model to lint. Multiple models can be linted. If no models are specified, every model will be linted.')
@line_magic
@pass_sqlmesh_context
def lint(self, context: sqlmesh.core.context.Context, line: str) -> None:
1178    @magic_arguments()
1179    @argument(
1180        "--models",
1181        "--model",
1182        type=str,
1183        nargs="*",
1184        help="A model to lint. Multiple models can be linted. If no models are specified, every model will be linted.",
1185    )
1186    @line_magic
1187    @pass_sqlmesh_context
1188    def lint(self, context: Context, line: str) -> None:
1189        """Run linter for target model(s)"""
1190        args = parse_argstring(self.lint, line)
1191        context.lint_models(args.models)

Run linter for target model(s)

@magic_arguments()
@line_magic
@pass_sqlmesh_context
def destroy(self, context: sqlmesh.core.context.Context, line: str) -> None:
1193    @magic_arguments()
1194    @line_magic
1195    @pass_sqlmesh_context
1196    def destroy(self, context: Context, line: str) -> None:
1197        """Removes all project resources, engine-managed objects, state tables and clears the SQLMesh cache."""
1198        context.destroy()

Removes all project resources, engine-managed objects, state tables and clears the SQLMesh cache.

magics = {'line': {'context': 'context', 'init': 'init', 'model': 'model', 'test': 'test', 'plan': 'plan', 'run_dag': 'run_dag', 'evaluate': 'evaluate', 'render': 'render', 'dag': 'dag', 'migrate': 'migrate', 'create_external_models': 'create_external_models', 'table_diff': 'table_diff', 'table_name': 'table_name', 'dlt_refresh': 'dlt_refresh', 'rewrite': 'rewrite', 'format': 'format', 'diff': 'diff', 'invalidate': 'invalidate', 'janitor': 'janitor', 'create_test': 'create_test', 'run_test': 'run_test', 'audit': 'audit', 'check_intervals': 'check_intervals', 'info': 'info', 'rollback': 'rollback', 'clean': 'clean', 'environments': 'environments', 'lint': 'lint', 'destroy': 'destroy'}, 'cell': {'model': 'model', 'test': 'test', 'fetchdf': 'fetchdf', 'rewrite': 'rewrite'}}
registered = True
Inherited Members
IPython.core.magic.Magics
Magics
options_table
shell
arg_err
format_latex
parse_options
default_option
traitlets.config.configurable.Configurable
config
parent
section_names
update_config
class_get_help
class_get_trait_help
class_print_help
class_config_section
class_config_rst_doc
traitlets.traitlets.HasTraits
setup_instance
cross_validation_lock
hold_trait_notifications
notify_change
on_trait_change
observe
unobserve
unobserve_all
add_traits
set_trait
class_trait_names
class_traits
class_own_traits
has_trait
trait_has_value
trait_values
trait_defaults
trait_names
traits
trait_metadata
class_own_trait_events
trait_events
def register_magics() -> None:
1201def register_magics() -> None:
1202    try:
1203        shell = get_ipython()  # type: ignore
1204        shell.register_magics(SQLMeshMagics)
1205    except NameError:
1206        pass