Edit on GitHub

sqlmesh.magics

  1from __future__ import annotations
  2
  3import functools
  4import logging
  5import typing as t
  6from collections import defaultdict
  7
  8from hyperscript import h
  9from IPython.core.display import display
 10from IPython.core.magic import (
 11    Magics,
 12    cell_magic,
 13    line_cell_magic,
 14    line_magic,
 15    magics_class,
 16)
 17from IPython.core.magic_arguments import argument, magic_arguments, parse_argstring
 18from rich.jupyter import JupyterRenderable
 19
 20from sqlmesh.cli.example_project import ProjectTemplate, init_example_project
 21from sqlmesh.core import constants as c
 22from sqlmesh.core.config import load_configs
 23from sqlmesh.core.console import get_console
 24from sqlmesh.core.context import Context
 25from sqlmesh.core.dialect import format_model_expressions, parse
 26from sqlmesh.core.model import load_sql_based_model
 27from sqlmesh.core.test import ModelTestMetadata, get_all_model_tests
 28from sqlmesh.utils import sqlglot_dialects, yaml
 29from sqlmesh.utils.errors import MagicError, MissingContextException, SQLMeshError
 30
 31logger = logging.getLogger(__name__)
 32
 33CONTEXT_VARIABLE_NAMES = [
 34    "context",
 35    "ctx",
 36    "sqlmesh",
 37]
 38
 39
 40def pass_sqlmesh_context(func: t.Callable) -> t.Callable:
 41    @functools.wraps(func)
 42    def wrapper(self: SQLMeshMagics, *args: t.Any, **kwargs: t.Any) -> None:
 43        for variable_name in CONTEXT_VARIABLE_NAMES:
 44            context = self._shell.user_ns.get(variable_name)
 45            if isinstance(context, Context):
 46                break
 47        else:
 48            raise MissingContextException(
 49                f"Context must be defined and initialized with one of these names: {', '.join(CONTEXT_VARIABLE_NAMES)}"
 50            )
 51        old_console = context.console
 52        context.console = get_console(display=self.display)
 53        context.refresh()
 54        func(self, context, *args, **kwargs)
 55        context.console = old_console
 56
 57    return wrapper
 58
 59
 60@magics_class
 61class SQLMeshMagics(Magics):
 62    @property
 63    def display(self) -> t.Callable:
 64        from sqlmesh import RuntimeEnv
 65
 66        if RuntimeEnv.get().is_databricks:
 67            # Use Databricks' special display instead of the normal IPython display
 68            return self._shell.user_ns["display"]
 69        return display
 70
 71    @property
 72    def _shell(self) -> t.Any:
 73        # Make mypy happy.
 74        if not self.shell:
 75            raise RuntimeError("IPython Magics are in invalid state")
 76        return self.shell
 77
 78    @magic_arguments()
 79    @argument(
 80        "paths",
 81        type=str,
 82        nargs="+",
 83        default="",
 84        help="The path(s) to the SQLMesh project(s).",
 85    )
 86    @argument(
 87        "--config",
 88        type=str,
 89        help="Name of the config object. Only applicable to configuration defined using Python script.",
 90    )
 91    @argument("--gateway", type=str, help="The name of the gateway.")
 92    @argument("--ignore-warnings", action="store_true", help="Ignore warnings.")
 93    @argument("--debug", action="store_true", help="Enable debug mode.")
 94    @argument("--log-file-dir", type=str, help="The directory to write the log file to.")
 95    @line_magic
 96    def context(self, line: str) -> None:
 97        """Sets the context in the user namespace."""
 98        from sqlmesh import configure_logging
 99
100        args = parse_argstring(self.context, line)
101        configs = load_configs(args.config, Context.CONFIG_TYPE, args.paths)
102        log_limit = list(configs.values())[0].log_limit
103        configure_logging(
104            args.debug, args.ignore_warnings, log_limit=log_limit, log_file_dir=args.log_file_dir
105        )
106        try:
107            context = Context(paths=args.paths, config=configs, gateway=args.gateway)
108            self._shell.user_ns["context"] = context
109        except Exception:
110            if args.debug:
111                logger.exception("Failed to initialize SQLMesh context")
112            raise
113        context.console.log_success(f"SQLMesh project context set to: {', '.join(args.paths)}")
114
115    @magic_arguments()
116    @argument("path", type=str, help="The path where the new SQLMesh project should be created.")
117    @argument(
118        "sql_dialect",
119        type=str,
120        help=f"Default model SQL dialect. Supported values: {sqlglot_dialects()}.",
121    )
122    @argument(
123        "--template",
124        "-t",
125        type=str,
126        help="Project template. Supported values: airflow, dbt, default, empty.",
127    )
128    @line_magic
129    def init(self, line: str) -> None:
130        """Creates a SQLMesh project scaffold with a default SQL dialect."""
131        args = parse_argstring(self.init, line)
132        try:
133            project_template = ProjectTemplate(
134                args.template.lower() if args.template else "default"
135            )
136        except ValueError:
137            raise MagicError(f"Invalid project template '{args.template}'")
138        init_example_project(args.path, args.sql_dialect, project_template)
139        html = str(
140            h(
141                "div",
142                h(
143                    "span",
144                    {"style": {"color": "green", "font-weight": "bold"}},
145                    "SQLMesh project scaffold created",
146                ),
147            )
148        )
149        self.display(JupyterRenderable(html=html, text=""))
150
151    @magic_arguments()
152    @argument("model", type=str, help="The model.")
153    @argument("--start", "-s", type=str, help="Start date to render.")
154    @argument("--end", "-e", type=str, help="End date to render.")
155    @argument("--execution-time", type=str, help="Execution time.")
156    @argument("--dialect", "-d", type=str, help="The rendered dialect.")
157    @line_cell_magic
158    @pass_sqlmesh_context
159    def model(self, context: Context, line: str, sql: t.Optional[str] = None) -> None:
160        """Renders the model and automatically fills in an editable cell with the model definition."""
161        args = parse_argstring(self.model, line)
162
163        model = context.get_model(args.model, raise_if_missing=True)
164        config = context.config_for_node(model)
165
166        if sql:
167            expressions = parse(sql, default_dialect=config.dialect)
168            loaded = load_sql_based_model(
169                expressions,
170                macros=context._macros,
171                jinja_macros=context._jinja_macros,
172                path=model._path,
173                dialect=config.dialect,
174                time_column_format=config.time_column_format,
175                physical_schema_override=context.config.physical_schema_override,
176                default_catalog=context.default_catalog,
177            )
178
179            if loaded.name == args.model:
180                model = loaded
181        else:
182            with open(model._path, "r", encoding="utf-8") as file:
183                expressions = parse(file.read(), default_dialect=config.dialect)
184
185        formatted = format_model_expressions(
186            expressions, model.dialect, **config.format.generator_options
187        )
188
189        self._shell.set_next_input(
190            "\n".join(
191                [
192                    " ".join(["%%model", line]),
193                    formatted,
194                ]
195            ),
196            replace=True,
197        )
198
199        with open(model._path, "w", encoding="utf-8") as file:
200            file.write(formatted)
201
202        if sql:
203            context.console.log_success(f"Model `{args.model}` updated")
204
205        context.upsert_model(model)
206        context.console.show_sql(
207            context.render(
208                model.name,
209                start=args.start,
210                end=args.end,
211                execution_time=args.execution_time,
212            ).sql(pretty=True, dialect=args.dialect or model.dialect)
213        )
214
215    @magic_arguments()
216    @argument("model", type=str, help="The model.")
217    @argument("test_name", type=str, nargs="?", default=None, help="The test name to display")
218    @argument("--ls", action="store_true", help="List tests associated with a model")
219    @line_cell_magic
220    @pass_sqlmesh_context
221    def test(self, context: Context, line: str, test_def_raw: t.Optional[str] = None) -> None:
222        """Allow the user to list tests for a model, output a specific test, and then write their changes back"""
223        args = parse_argstring(self.test, line)
224        if not args.test_name and not args.ls:
225            raise MagicError("Must provide either test name or `--ls` to list tests")
226
227        test_meta = []
228
229        for path, config in context.configs.items():
230            test_meta.extend(
231                get_all_model_tests(
232                    path / c.TESTS,
233                    ignore_patterns=config.ignore_patterns,
234                )
235            )
236
237        tests: t.Dict[str, t.Dict[str, ModelTestMetadata]] = defaultdict(dict)
238        for model_test_metadata in test_meta:
239            model = model_test_metadata.body.get("model")
240            if not model:
241                context.console.log_error(
242                    f"Test found that does not have `model` defined: {model_test_metadata.path}"
243                )
244            else:
245                tests[model][model_test_metadata.test_name] = model_test_metadata
246
247        model = context.get_model(args.model, raise_if_missing=True)
248
249        if args.ls:
250            # TODO: Provide better UI for displaying tests
251            for test_name in tests[model.name]:
252                context.console.log_status_update(test_name)
253            return
254
255        test = tests[model.name][args.test_name]
256        test_def = yaml.load(test_def_raw) if test_def_raw else test.body
257        test_def_output = yaml.dump(test_def)
258
259        self._shell.set_next_input(
260            "\n".join(
261                [
262                    " ".join(["%%test", line]),
263                    test_def_output,
264                ]
265            ),
266            replace=True,
267        )
268
269        with open(test.path, "r+", encoding="utf-8") as file:
270            content = yaml.load(file.read())
271            content[args.test_name] = test_def
272            file.seek(0)
273            yaml.dump(content, file)
274            file.truncate()
275
276    @magic_arguments()
277    @argument(
278        "environment",
279        nargs="?",
280        type=str,
281        help="The environment to run the plan against",
282    )
283    @argument("--start", "-s", type=str, help="Start date to backfill.")
284    @argument("--end", "-e", type=str, help="End date to backfill.")
285    @argument("--execution-time", type=str, help="Execution time.")
286    @argument(
287        "--create-from",
288        type=str,
289        help="The environment to create the target environment from if it doesn't exist. Default: prod.",
290    )
291    @argument(
292        "--skip-tests",
293        "-t",
294        action="store_true",
295        help="Skip the unit tests defined for the model.",
296    )
297    @argument(
298        "--restate-model",
299        "-r",
300        type=str,
301        nargs="*",
302        help="Restate data for specified models (and models downstream from the one specified). For production environment, all related model versions will have their intervals wiped, but only the current versions will be backfilled. For development environment, only the current model versions will be affected.",
303    )
304    @argument(
305        "--no-gaps",
306        "-g",
307        action="store_true",
308        help="Ensure that new snapshots have no data gaps when comparing to existing snapshots for matching models in the target environment.",
309    )
310    @argument(
311        "--skip-backfill",
312        action="store_true",
313        help="Skip the backfill step.",
314    )
315    @argument(
316        "--forward-only",
317        action="store_true",
318        help="Create a plan for forward-only changes.",
319        default=None,
320    )
321    @argument(
322        "--effective-from",
323        type=str,
324        help="The effective date from which to apply forward-only changes on production.",
325    )
326    @argument(
327        "--no-prompts",
328        action="store_true",
329        help="Disables interactive prompts for the backfill time range. Please note that if this flag is set and there are uncategorized changes, plan creation will fail.",
330        default=None,
331    )
332    @argument(
333        "--auto-apply",
334        action="store_true",
335        help="Automatically applies the new plan after creation.",
336        default=None,
337    )
338    @argument(
339        "--no-auto-categorization",
340        action="store_true",
341        help="Disable automatic change categorization.",
342        default=None,
343    )
344    @argument(
345        "--include-unmodified",
346        action="store_true",
347        help="Include unmodified models in the target environment.",
348        default=None,
349    )
350    @argument(
351        "--select-model",
352        type=str,
353        nargs="*",
354        help="Select specific model changes that should be included in the plan.",
355    )
356    @argument(
357        "--backfill-model",
358        type=str,
359        nargs="*",
360        help="Backfill only the models whose names match the expression. This is supported only when targeting a development environment.",
361    )
362    @argument(
363        "--no-diff",
364        action="store_true",
365        help="Hide text differences for changed models.",
366        default=None,
367    )
368    @argument(
369        "--run",
370        action="store_true",
371        help="Run latest intervals as part of the plan application (prod environment only).",
372    )
373    @argument(
374        "--enable-preview",
375        action="store_true",
376        help="Enable preview for forward-only models when targeting a development environment.",
377        default=None,
378    )
379    @line_magic
380    @pass_sqlmesh_context
381    def plan(self, context: Context, line: str) -> None:
382        """Goes through a set of prompts to both establish a plan and apply it"""
383        args = parse_argstring(self.plan, line)
384
385        context.plan(
386            args.environment,
387            start=args.start,
388            end=args.end,
389            execution_time=args.execution_time,
390            create_from=args.create_from,
391            skip_tests=args.skip_tests,
392            restate_models=args.restate_model,
393            backfill_models=args.backfill_model,
394            no_gaps=args.no_gaps,
395            skip_backfill=args.skip_backfill,
396            forward_only=args.forward_only,
397            no_prompts=args.no_prompts,
398            auto_apply=args.auto_apply,
399            no_auto_categorization=args.no_auto_categorization,
400            effective_from=args.effective_from,
401            include_unmodified=args.include_unmodified,
402            select_models=args.select_model,
403            no_diff=args.no_diff,
404            run=args.run,
405            enable_preview=args.enable_preview,
406        )
407
408    @magic_arguments()
409    @argument(
410        "environment",
411        nargs="?",
412        type=str,
413        help="The environment to run against",
414    )
415    @argument("--start", "-s", type=str, help="Start date to evaluate.")
416    @argument("--end", "-e", type=str, help="End date to evaluate.")
417    @argument("--skip-janitor", action="store_true", help="Skip the janitor task.")
418    @argument(
419        "--ignore-cron",
420        action="store_true",
421        help="Run for all missing intervals, ignoring individual cron schedules.",
422    )
423    @line_magic
424    @pass_sqlmesh_context
425    def run_dag(self, context: Context, line: str) -> None:
426        """Evaluate the DAG of models using the built-in scheduler."""
427        args = parse_argstring(self.run_dag, line)
428
429        success = context.run(
430            args.environment,
431            start=args.start,
432            end=args.end,
433            skip_janitor=args.skip_janitor,
434            ignore_cron=args.ignore_cron,
435        )
436        if not success:
437            raise SQLMeshError("Error Running DAG. Check logs for details.")
438
439    @magic_arguments()
440    @argument("model", type=str, help="The model.")
441    @argument("--start", "-s", type=str, help="Start date to render.")
442    @argument("--end", "-e", type=str, help="End date to render.")
443    @argument("--execution-time", type=str, help="Execution time.")
444    @argument(
445        "--limit",
446        type=int,
447        help="The number of rows which the query should be limited to.",
448    )
449    @line_magic
450    @pass_sqlmesh_context
451    def evaluate(self, context: Context, line: str) -> None:
452        """Evaluate a model query and fetches a dataframe."""
453        context.refresh()
454        args = parse_argstring(self.evaluate, line)
455
456        df = context.evaluate(
457            args.model,
458            start=args.start,
459            end=args.end,
460            execution_time=args.execution_time,
461            limit=args.limit,
462        )
463        self.display(df)
464
465    @magic_arguments()
466    @argument("model", type=str, help="The model.")
467    @argument("--start", "-s", type=str, help="Start date to render.")
468    @argument("--end", "-e", type=str, help="End date to render.")
469    @argument("--execution-time", type=str, help="Execution time.")
470    @argument(
471        "--expand",
472        type=t.Union[bool, t.Iterable[str]],
473        help="Whether or not to use expand materialized models, defaults to False. If True, all referenced models are expanded as raw queries. If a list, only referenced models are expanded as raw queries.",
474    )
475    @argument("--dialect", type=str, help="SQL dialect to render.")
476    @argument("--no-format", action="store_true", help="Disable fancy formatting of the query.")
477    @line_magic
478    @pass_sqlmesh_context
479    def render(self, context: Context, line: str) -> None:
480        """Renders a model's query, optionally expanding referenced models."""
481        context.refresh()
482        args = parse_argstring(self.render, line)
483
484        query = context.render(
485            args.model,
486            start=args.start,
487            end=args.end,
488            execution_time=args.execution_time,
489            expand=args.expand,
490        )
491
492        sql = query.sql(pretty=True, dialect=args.dialect or context.config.dialect)
493        if args.no_format:
494            context.console.log_status_update(sql)
495        else:
496            context.console.show_sql(sql)
497
498    @magic_arguments()
499    @argument(
500        "df_var",
501        default=None,
502        nargs="?",
503        type=str,
504        help="An optional variable name to store the resulting dataframe.",
505    )
506    @cell_magic
507    @pass_sqlmesh_context
508    def fetchdf(self, context: Context, line: str, sql: str) -> None:
509        """Fetches a dataframe from sql, optionally storing it in a variable."""
510        args = parse_argstring(self.fetchdf, line)
511        df = context.fetchdf(sql)
512        if args.df_var:
513            self._shell.user_ns[args.df_var] = df
514        self.display(df)
515
516    @magic_arguments()
517    @argument("--file", "-f", type=str, help="An optional file path to write the HTML output to.")
518    @argument(
519        "--select-model",
520        type=str,
521        nargs="*",
522        help="Select specific models to include in the dag.",
523    )
524    @line_magic
525    @pass_sqlmesh_context
526    def dag(self, context: Context, line: str) -> None:
527        """Displays the HTML DAG."""
528        args = parse_argstring(self.dag, line)
529        dag = context.get_dag(args.select_model)
530        if args.file:
531            with open(args.file, "w", encoding="utf-8") as file:
532                file.write(str(dag))
533        # TODO: Have this go through console instead of calling display directly
534        self.display(dag)
535
536    @magic_arguments()
537    @line_magic
538    @pass_sqlmesh_context
539    def migrate(self, context: Context, line: str) -> None:
540        """Migrate SQLMesh to the current running version."""
541        context.migrate()
542        context.console.log_success("Migration complete")
543
544    @magic_arguments()
545    @line_magic
546    @pass_sqlmesh_context
547    def create_external_models(self, context: Context, line: str) -> None:
548        """Create a schema file containing external model schemas."""
549        context.create_external_models()
550
551    @magic_arguments()
552    @argument(
553        "source_to_target",
554        type=str,
555        metavar="SOURCE:TARGET",
556        help="Source and target in `SOURCE:TARGET` format",
557    )
558    @argument(
559        "--on",
560        type=str,
561        nargs="*",
562        help="The column to join on. Can be specified multiple times. The model grain will be used if not specified.",
563    )
564    @argument(
565        "--model",
566        type=str,
567        help="The model to diff against when source and target are environments and not tables.",
568    )
569    @argument(
570        "--where",
571        type=str,
572        help="An optional where statement to filter results.",
573    )
574    @argument(
575        "--limit",
576        type=int,
577        default=20,
578        help="The limit of the sample dataframe.",
579    )
580    @argument(
581        "--show-sample",
582        action="store_true",
583        help="Show a sample of the rows that differ. With many columns, the output can be very wide.",
584    )
585    @line_magic
586    @pass_sqlmesh_context
587    def table_diff(self, context: Context, line: str) -> None:
588        """Show the diff between two tables.
589
590        Can either be two tables or two environments and a model.
591        """
592        args = parse_argstring(self.table_diff, line)
593        source, target = args.source_to_target.split(":")
594        context.table_diff(
595            source=source,
596            target=target,
597            on=args.on,
598            model_or_snapshot=args.model,
599            where=args.where,
600            limit=args.limit,
601            show_sample=args.show_sample,
602        )
603
604    @magic_arguments()
605    @argument(
606        "model_name",
607        nargs="?",
608        type=str,
609        help="The name of the model to get the table name for.",
610    )
611    @argument(
612        "--dev",
613        action="store_true",
614        help="Print the name of the snapshot table used for previews in development environments.",
615    )
616    @line_magic
617    @pass_sqlmesh_context
618    def table_name(self, context: Context, line: str) -> None:
619        """Prints the name of the physical table for the given model."""
620        args = parse_argstring(self.table_name, line)
621        context.console.log_status_update(context.table_name(args.model_name, args.dev))
622
623    @magic_arguments()
624    @argument(
625        "--read",
626        type=str,
627        default="",
628        help="The input dialect of the sql string.",
629    )
630    @argument(
631        "--write",
632        type=str,
633        default="",
634        help="The output dialect of the sql string.",
635    )
636    @line_cell_magic
637    @pass_sqlmesh_context
638    def rewrite(self, context: Context, line: str, sql: str) -> None:
639        """Rewrite a sql expression with semantic references into an executable query.
640
641        https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/
642        """
643        args = parse_argstring(self.rewrite, line)
644        context.console.show_sql(
645            context.rewrite(sql, args.read).sql(
646                dialect=args.write or context.config.dialect, pretty=True
647            )
648        )
649
650    @magic_arguments()
651    @argument(
652        "--transpile",
653        "-t",
654        type=str,
655        help="Transpile project models to the specified dialect.",
656    )
657    @argument(
658        "--append-newline",
659        action="store_true",
660        help="Whether or not to append a newline to the end of the file.",
661        default=None,
662    )
663    @argument(
664        "--normalize",
665        action="store_true",
666        help="Whether or not to normalize identifiers to lowercase.",
667        default=None,
668    )
669    @argument(
670        "--pad",
671        type=int,
672        help="Determines the pad size in a formatted string.",
673    )
674    @argument(
675        "--indent",
676        type=int,
677        help="Determines the indentation size in a formatted string.",
678    )
679    @argument(
680        "--normalize-functions",
681        type=str,
682        help="Whether or not to normalize all function names. Possible values are: 'upper', 'lower'",
683    )
684    @argument(
685        "--leading-comma",
686        action="store_true",
687        help="Determines whether or not the comma is leading or trailing in select expressions. Default is trailing.",
688        default=None,
689    )
690    @argument(
691        "--max-text-width",
692        type=int,
693        help="The max number of characters in a segment before creating new lines in pretty mode.",
694    )
695    @line_magic
696    @pass_sqlmesh_context
697    def format(self, context: Context, line: str) -> None:
698        """Format all SQL models."""
699        args = parse_argstring(self.format, line)
700        context.format(**{k: v for k, v in vars(args).items() if v is not None})
701
702    @magic_arguments()
703    @argument("environment", type=str, help="The environment to diff local state against.")
704    @line_magic
705    @pass_sqlmesh_context
706    def diff(self, context: Context, line: str) -> None:
707        """Show the diff between the local state and the target environment."""
708        args = parse_argstring(self.diff, line)
709        context.diff(args.environment)
710
711    @magic_arguments()
712    @argument("environment", type=str, help="The environment to invalidate.")
713    @line_magic
714    @pass_sqlmesh_context
715    def invalidate(self, context: Context, line: str) -> None:
716        """Invalidate the target environment, forcing its removal during the next run of the janitor process."""
717        args = parse_argstring(self.invalidate, line)
718        context.invalidate_environment(args.environment)
719
720    @magic_arguments()
721    @argument("model", type=str)
722    @argument(
723        "--query",
724        "-q",
725        type=str,
726        nargs="+",
727        required=True,
728        help="Queries that will be used to generate data for the model's dependencies.",
729    )
730    @argument(
731        "--overwrite",
732        "-o",
733        action="store_true",
734        help="When true, the fixture file will be overwritten in case it already exists.",
735    )
736    @argument(
737        "--var",
738        "-v",
739        type=str,
740        nargs="+",
741        help="Key-value pairs that will define variables needed by the model.",
742    )
743    @argument(
744        "--path",
745        "-p",
746        type=str,
747        help="The file path corresponding to the fixture, relative to the test directory. "
748        "By default, the fixture will be created under the test directory and the file "
749        "name will be inferred based on the test's name.",
750    )
751    @argument(
752        "--name",
753        "-n",
754        type=str,
755        help="The name of the test that will be created. By default, it's inferred based on the model's name.",
756    )
757    @argument(
758        "--include-ctes",
759        action="store_true",
760        help="When true, CTE fixtures will also be generated.",
761    )
762    @line_magic
763    @pass_sqlmesh_context
764    def create_test(self, context: Context, line: str) -> None:
765        """Generate a unit test fixture for a given model."""
766        args = parse_argstring(self.create_test, line)
767        queries = iter(args.query)
768        variables = iter(args.var) if args.var else None
769        context.create_test(
770            args.model,
771            input_queries={k: v.strip('"') for k, v in dict(zip(queries, queries)).items()},
772            overwrite=args.overwrite,
773            variables=dict(zip(variables, variables)) if variables else None,
774            path=args.path,
775            name=args.name,
776            include_ctes=args.include_ctes,
777        )
778
779    @magic_arguments()
780    @argument("tests", nargs="*", type=str)
781    @argument(
782        "--pattern",
783        "-k",
784        nargs="*",
785        type=str,
786        help="Only run tests that match the pattern of substring.",
787    )
788    @argument("--verbose", "-v", action="store_true", help="Verbose output.")
789    @argument(
790        "--preserve-fixtures",
791        action="store_true",
792        help="Preserve the fixture tables in the testing database, useful for debugging.",
793    )
794    @line_magic
795    @pass_sqlmesh_context
796    def run_test(self, context: Context, line: str) -> None:
797        """Run unit test(s)."""
798        args = parse_argstring(self.run_test, line)
799        context.test(
800            match_patterns=args.pattern,
801            tests=args.tests,
802            verbose=args.verbose,
803            preserve_fixtures=args.preserve_fixtures,
804        )
805
806    @magic_arguments()
807    @argument(
808        "models", type=str, nargs="*", help="A model to audit. Multiple models can be audited."
809    )
810    @argument("--start", "-s", type=str, help="Start date to audit.")
811    @argument("--end", "-e", type=str, help="End date to audit.")
812    @argument("--execution-time", type=str, help="Execution time.")
813    @line_magic
814    @pass_sqlmesh_context
815    def audit(self, context: Context, line: str) -> None:
816        """Run audit(s)"""
817        args = parse_argstring(self.audit, line)
818        context.audit(
819            models=args.models, start=args.start, end=args.end, execution_time=args.execution_time
820        )
821
822    @magic_arguments()
823    @line_magic
824    @pass_sqlmesh_context
825    def info(self, context: Context, line: str) -> None:
826        """Display SQLMesh project information."""
827        context.print_info()
828
829    @magic_arguments()
830    @line_magic
831    @pass_sqlmesh_context
832    def rollback(self, context: Context, line: str) -> None:
833        """Rollback SQLMesh to the previous migration."""
834        context.rollback()
835
836    @magic_arguments()
837    @line_magic
838    @pass_sqlmesh_context
839    def clean(self, context: Context, line: str) -> None:
840        """Clears the SQLMesh cache and any build artifacts."""
841        context.clear_caches()
842        context.console.log_success("SQLMesh cache and build artifacts cleared")
843
844
845def register_magics() -> None:
846    try:
847        shell = get_ipython()  # type: ignore
848        shell.register_magics(SQLMeshMagics)
849    except NameError:
850        pass
def pass_sqlmesh_context(func: Callable) -> Callable:
41def pass_sqlmesh_context(func: t.Callable) -> t.Callable:
42    @functools.wraps(func)
43    def wrapper(self: SQLMeshMagics, *args: t.Any, **kwargs: t.Any) -> None:
44        for variable_name in CONTEXT_VARIABLE_NAMES:
45            context = self._shell.user_ns.get(variable_name)
46            if isinstance(context, Context):
47                break
48        else:
49            raise MissingContextException(
50                f"Context must be defined and initialized with one of these names: {', '.join(CONTEXT_VARIABLE_NAMES)}"
51            )
52        old_console = context.console
53        context.console = get_console(display=self.display)
54        context.refresh()
55        func(self, context, *args, **kwargs)
56        context.console = old_console
57
58    return wrapper
class SQLMeshMagics(IPython.core.magic.Magics):
 62class SQLMeshMagics(Magics):
 63    @property
 64    def display(self) -> t.Callable:
 65        from sqlmesh import RuntimeEnv
 66
 67        if RuntimeEnv.get().is_databricks:
 68            # Use Databricks' special display instead of the normal IPython display
 69            return self._shell.user_ns["display"]
 70        return display
 71
 72    @property
 73    def _shell(self) -> t.Any:
 74        # Make mypy happy.
 75        if not self.shell:
 76            raise RuntimeError("IPython Magics are in invalid state")
 77        return self.shell
 78
 79    @magic_arguments()
 80    @argument(
 81        "paths",
 82        type=str,
 83        nargs="+",
 84        default="",
 85        help="The path(s) to the SQLMesh project(s).",
 86    )
 87    @argument(
 88        "--config",
 89        type=str,
 90        help="Name of the config object. Only applicable to configuration defined using Python script.",
 91    )
 92    @argument("--gateway", type=str, help="The name of the gateway.")
 93    @argument("--ignore-warnings", action="store_true", help="Ignore warnings.")
 94    @argument("--debug", action="store_true", help="Enable debug mode.")
 95    @argument("--log-file-dir", type=str, help="The directory to write the log file to.")
 96    @line_magic
 97    def context(self, line: str) -> None:
 98        """Sets the context in the user namespace."""
 99        from sqlmesh import configure_logging
100
101        args = parse_argstring(self.context, line)
102        configs = load_configs(args.config, Context.CONFIG_TYPE, args.paths)
103        log_limit = list(configs.values())[0].log_limit
104        configure_logging(
105            args.debug, args.ignore_warnings, log_limit=log_limit, log_file_dir=args.log_file_dir
106        )
107        try:
108            context = Context(paths=args.paths, config=configs, gateway=args.gateway)
109            self._shell.user_ns["context"] = context
110        except Exception:
111            if args.debug:
112                logger.exception("Failed to initialize SQLMesh context")
113            raise
114        context.console.log_success(f"SQLMesh project context set to: {', '.join(args.paths)}")
115
116    @magic_arguments()
117    @argument("path", type=str, help="The path where the new SQLMesh project should be created.")
118    @argument(
119        "sql_dialect",
120        type=str,
121        help=f"Default model SQL dialect. Supported values: {sqlglot_dialects()}.",
122    )
123    @argument(
124        "--template",
125        "-t",
126        type=str,
127        help="Project template. Supported values: airflow, dbt, default, empty.",
128    )
129    @line_magic
130    def init(self, line: str) -> None:
131        """Creates a SQLMesh project scaffold with a default SQL dialect."""
132        args = parse_argstring(self.init, line)
133        try:
134            project_template = ProjectTemplate(
135                args.template.lower() if args.template else "default"
136            )
137        except ValueError:
138            raise MagicError(f"Invalid project template '{args.template}'")
139        init_example_project(args.path, args.sql_dialect, project_template)
140        html = str(
141            h(
142                "div",
143                h(
144                    "span",
145                    {"style": {"color": "green", "font-weight": "bold"}},
146                    "SQLMesh project scaffold created",
147                ),
148            )
149        )
150        self.display(JupyterRenderable(html=html, text=""))
151
152    @magic_arguments()
153    @argument("model", type=str, help="The model.")
154    @argument("--start", "-s", type=str, help="Start date to render.")
155    @argument("--end", "-e", type=str, help="End date to render.")
156    @argument("--execution-time", type=str, help="Execution time.")
157    @argument("--dialect", "-d", type=str, help="The rendered dialect.")
158    @line_cell_magic
159    @pass_sqlmesh_context
160    def model(self, context: Context, line: str, sql: t.Optional[str] = None) -> None:
161        """Renders the model and automatically fills in an editable cell with the model definition."""
162        args = parse_argstring(self.model, line)
163
164        model = context.get_model(args.model, raise_if_missing=True)
165        config = context.config_for_node(model)
166
167        if sql:
168            expressions = parse(sql, default_dialect=config.dialect)
169            loaded = load_sql_based_model(
170                expressions,
171                macros=context._macros,
172                jinja_macros=context._jinja_macros,
173                path=model._path,
174                dialect=config.dialect,
175                time_column_format=config.time_column_format,
176                physical_schema_override=context.config.physical_schema_override,
177                default_catalog=context.default_catalog,
178            )
179
180            if loaded.name == args.model:
181                model = loaded
182        else:
183            with open(model._path, "r", encoding="utf-8") as file:
184                expressions = parse(file.read(), default_dialect=config.dialect)
185
186        formatted = format_model_expressions(
187            expressions, model.dialect, **config.format.generator_options
188        )
189
190        self._shell.set_next_input(
191            "\n".join(
192                [
193                    " ".join(["%%model", line]),
194                    formatted,
195                ]
196            ),
197            replace=True,
198        )
199
200        with open(model._path, "w", encoding="utf-8") as file:
201            file.write(formatted)
202
203        if sql:
204            context.console.log_success(f"Model `{args.model}` updated")
205
206        context.upsert_model(model)
207        context.console.show_sql(
208            context.render(
209                model.name,
210                start=args.start,
211                end=args.end,
212                execution_time=args.execution_time,
213            ).sql(pretty=True, dialect=args.dialect or model.dialect)
214        )
215
216    @magic_arguments()
217    @argument("model", type=str, help="The model.")
218    @argument("test_name", type=str, nargs="?", default=None, help="The test name to display")
219    @argument("--ls", action="store_true", help="List tests associated with a model")
220    @line_cell_magic
221    @pass_sqlmesh_context
222    def test(self, context: Context, line: str, test_def_raw: t.Optional[str] = None) -> None:
223        """Allow the user to list tests for a model, output a specific test, and then write their changes back"""
224        args = parse_argstring(self.test, line)
225        if not args.test_name and not args.ls:
226            raise MagicError("Must provide either test name or `--ls` to list tests")
227
228        test_meta = []
229
230        for path, config in context.configs.items():
231            test_meta.extend(
232                get_all_model_tests(
233                    path / c.TESTS,
234                    ignore_patterns=config.ignore_patterns,
235                )
236            )
237
238        tests: t.Dict[str, t.Dict[str, ModelTestMetadata]] = defaultdict(dict)
239        for model_test_metadata in test_meta:
240            model = model_test_metadata.body.get("model")
241            if not model:
242                context.console.log_error(
243                    f"Test found that does not have `model` defined: {model_test_metadata.path}"
244                )
245            else:
246                tests[model][model_test_metadata.test_name] = model_test_metadata
247
248        model = context.get_model(args.model, raise_if_missing=True)
249
250        if args.ls:
251            # TODO: Provide better UI for displaying tests
252            for test_name in tests[model.name]:
253                context.console.log_status_update(test_name)
254            return
255
256        test = tests[model.name][args.test_name]
257        test_def = yaml.load(test_def_raw) if test_def_raw else test.body
258        test_def_output = yaml.dump(test_def)
259
260        self._shell.set_next_input(
261            "\n".join(
262                [
263                    " ".join(["%%test", line]),
264                    test_def_output,
265                ]
266            ),
267            replace=True,
268        )
269
270        with open(test.path, "r+", encoding="utf-8") as file:
271            content = yaml.load(file.read())
272            content[args.test_name] = test_def
273            file.seek(0)
274            yaml.dump(content, file)
275            file.truncate()
276
277    @magic_arguments()
278    @argument(
279        "environment",
280        nargs="?",
281        type=str,
282        help="The environment to run the plan against",
283    )
284    @argument("--start", "-s", type=str, help="Start date to backfill.")
285    @argument("--end", "-e", type=str, help="End date to backfill.")
286    @argument("--execution-time", type=str, help="Execution time.")
287    @argument(
288        "--create-from",
289        type=str,
290        help="The environment to create the target environment from if it doesn't exist. Default: prod.",
291    )
292    @argument(
293        "--skip-tests",
294        "-t",
295        action="store_true",
296        help="Skip the unit tests defined for the model.",
297    )
298    @argument(
299        "--restate-model",
300        "-r",
301        type=str,
302        nargs="*",
303        help="Restate data for specified models (and models downstream from the one specified). For production environment, all related model versions will have their intervals wiped, but only the current versions will be backfilled. For development environment, only the current model versions will be affected.",
304    )
305    @argument(
306        "--no-gaps",
307        "-g",
308        action="store_true",
309        help="Ensure that new snapshots have no data gaps when comparing to existing snapshots for matching models in the target environment.",
310    )
311    @argument(
312        "--skip-backfill",
313        action="store_true",
314        help="Skip the backfill step.",
315    )
316    @argument(
317        "--forward-only",
318        action="store_true",
319        help="Create a plan for forward-only changes.",
320        default=None,
321    )
322    @argument(
323        "--effective-from",
324        type=str,
325        help="The effective date from which to apply forward-only changes on production.",
326    )
327    @argument(
328        "--no-prompts",
329        action="store_true",
330        help="Disables interactive prompts for the backfill time range. Please note that if this flag is set and there are uncategorized changes, plan creation will fail.",
331        default=None,
332    )
333    @argument(
334        "--auto-apply",
335        action="store_true",
336        help="Automatically applies the new plan after creation.",
337        default=None,
338    )
339    @argument(
340        "--no-auto-categorization",
341        action="store_true",
342        help="Disable automatic change categorization.",
343        default=None,
344    )
345    @argument(
346        "--include-unmodified",
347        action="store_true",
348        help="Include unmodified models in the target environment.",
349        default=None,
350    )
351    @argument(
352        "--select-model",
353        type=str,
354        nargs="*",
355        help="Select specific model changes that should be included in the plan.",
356    )
357    @argument(
358        "--backfill-model",
359        type=str,
360        nargs="*",
361        help="Backfill only the models whose names match the expression. This is supported only when targeting a development environment.",
362    )
363    @argument(
364        "--no-diff",
365        action="store_true",
366        help="Hide text differences for changed models.",
367        default=None,
368    )
369    @argument(
370        "--run",
371        action="store_true",
372        help="Run latest intervals as part of the plan application (prod environment only).",
373    )
374    @argument(
375        "--enable-preview",
376        action="store_true",
377        help="Enable preview for forward-only models when targeting a development environment.",
378        default=None,
379    )
380    @line_magic
381    @pass_sqlmesh_context
382    def plan(self, context: Context, line: str) -> None:
383        """Goes through a set of prompts to both establish a plan and apply it"""
384        args = parse_argstring(self.plan, line)
385
386        context.plan(
387            args.environment,
388            start=args.start,
389            end=args.end,
390            execution_time=args.execution_time,
391            create_from=args.create_from,
392            skip_tests=args.skip_tests,
393            restate_models=args.restate_model,
394            backfill_models=args.backfill_model,
395            no_gaps=args.no_gaps,
396            skip_backfill=args.skip_backfill,
397            forward_only=args.forward_only,
398            no_prompts=args.no_prompts,
399            auto_apply=args.auto_apply,
400            no_auto_categorization=args.no_auto_categorization,
401            effective_from=args.effective_from,
402            include_unmodified=args.include_unmodified,
403            select_models=args.select_model,
404            no_diff=args.no_diff,
405            run=args.run,
406            enable_preview=args.enable_preview,
407        )
408
409    @magic_arguments()
410    @argument(
411        "environment",
412        nargs="?",
413        type=str,
414        help="The environment to run against",
415    )
416    @argument("--start", "-s", type=str, help="Start date to evaluate.")
417    @argument("--end", "-e", type=str, help="End date to evaluate.")
418    @argument("--skip-janitor", action="store_true", help="Skip the janitor task.")
419    @argument(
420        "--ignore-cron",
421        action="store_true",
422        help="Run for all missing intervals, ignoring individual cron schedules.",
423    )
424    @line_magic
425    @pass_sqlmesh_context
426    def run_dag(self, context: Context, line: str) -> None:
427        """Evaluate the DAG of models using the built-in scheduler."""
428        args = parse_argstring(self.run_dag, line)
429
430        success = context.run(
431            args.environment,
432            start=args.start,
433            end=args.end,
434            skip_janitor=args.skip_janitor,
435            ignore_cron=args.ignore_cron,
436        )
437        if not success:
438            raise SQLMeshError("Error Running DAG. Check logs for details.")
439
440    @magic_arguments()
441    @argument("model", type=str, help="The model.")
442    @argument("--start", "-s", type=str, help="Start date to render.")
443    @argument("--end", "-e", type=str, help="End date to render.")
444    @argument("--execution-time", type=str, help="Execution time.")
445    @argument(
446        "--limit",
447        type=int,
448        help="The number of rows which the query should be limited to.",
449    )
450    @line_magic
451    @pass_sqlmesh_context
452    def evaluate(self, context: Context, line: str) -> None:
453        """Evaluate a model query and fetches a dataframe."""
454        context.refresh()
455        args = parse_argstring(self.evaluate, line)
456
457        df = context.evaluate(
458            args.model,
459            start=args.start,
460            end=args.end,
461            execution_time=args.execution_time,
462            limit=args.limit,
463        )
464        self.display(df)
465
466    @magic_arguments()
467    @argument("model", type=str, help="The model.")
468    @argument("--start", "-s", type=str, help="Start date to render.")
469    @argument("--end", "-e", type=str, help="End date to render.")
470    @argument("--execution-time", type=str, help="Execution time.")
471    @argument(
472        "--expand",
473        type=t.Union[bool, t.Iterable[str]],
474        help="Whether or not to use expand materialized models, defaults to False. If True, all referenced models are expanded as raw queries. If a list, only referenced models are expanded as raw queries.",
475    )
476    @argument("--dialect", type=str, help="SQL dialect to render.")
477    @argument("--no-format", action="store_true", help="Disable fancy formatting of the query.")
478    @line_magic
479    @pass_sqlmesh_context
480    def render(self, context: Context, line: str) -> None:
481        """Renders a model's query, optionally expanding referenced models."""
482        context.refresh()
483        args = parse_argstring(self.render, line)
484
485        query = context.render(
486            args.model,
487            start=args.start,
488            end=args.end,
489            execution_time=args.execution_time,
490            expand=args.expand,
491        )
492
493        sql = query.sql(pretty=True, dialect=args.dialect or context.config.dialect)
494        if args.no_format:
495            context.console.log_status_update(sql)
496        else:
497            context.console.show_sql(sql)
498
499    @magic_arguments()
500    @argument(
501        "df_var",
502        default=None,
503        nargs="?",
504        type=str,
505        help="An optional variable name to store the resulting dataframe.",
506    )
507    @cell_magic
508    @pass_sqlmesh_context
509    def fetchdf(self, context: Context, line: str, sql: str) -> None:
510        """Fetches a dataframe from sql, optionally storing it in a variable."""
511        args = parse_argstring(self.fetchdf, line)
512        df = context.fetchdf(sql)
513        if args.df_var:
514            self._shell.user_ns[args.df_var] = df
515        self.display(df)
516
517    @magic_arguments()
518    @argument("--file", "-f", type=str, help="An optional file path to write the HTML output to.")
519    @argument(
520        "--select-model",
521        type=str,
522        nargs="*",
523        help="Select specific models to include in the dag.",
524    )
525    @line_magic
526    @pass_sqlmesh_context
527    def dag(self, context: Context, line: str) -> None:
528        """Displays the HTML DAG."""
529        args = parse_argstring(self.dag, line)
530        dag = context.get_dag(args.select_model)
531        if args.file:
532            with open(args.file, "w", encoding="utf-8") as file:
533                file.write(str(dag))
534        # TODO: Have this go through console instead of calling display directly
535        self.display(dag)
536
537    @magic_arguments()
538    @line_magic
539    @pass_sqlmesh_context
540    def migrate(self, context: Context, line: str) -> None:
541        """Migrate SQLMesh to the current running version."""
542        context.migrate()
543        context.console.log_success("Migration complete")
544
545    @magic_arguments()
546    @line_magic
547    @pass_sqlmesh_context
548    def create_external_models(self, context: Context, line: str) -> None:
549        """Create a schema file containing external model schemas."""
550        context.create_external_models()
551
552    @magic_arguments()
553    @argument(
554        "source_to_target",
555        type=str,
556        metavar="SOURCE:TARGET",
557        help="Source and target in `SOURCE:TARGET` format",
558    )
559    @argument(
560        "--on",
561        type=str,
562        nargs="*",
563        help="The column to join on. Can be specified multiple times. The model grain will be used if not specified.",
564    )
565    @argument(
566        "--model",
567        type=str,
568        help="The model to diff against when source and target are environments and not tables.",
569    )
570    @argument(
571        "--where",
572        type=str,
573        help="An optional where statement to filter results.",
574    )
575    @argument(
576        "--limit",
577        type=int,
578        default=20,
579        help="The limit of the sample dataframe.",
580    )
581    @argument(
582        "--show-sample",
583        action="store_true",
584        help="Show a sample of the rows that differ. With many columns, the output can be very wide.",
585    )
586    @line_magic
587    @pass_sqlmesh_context
588    def table_diff(self, context: Context, line: str) -> None:
589        """Show the diff between two tables.
590
591        Can either be two tables or two environments and a model.
592        """
593        args = parse_argstring(self.table_diff, line)
594        source, target = args.source_to_target.split(":")
595        context.table_diff(
596            source=source,
597            target=target,
598            on=args.on,
599            model_or_snapshot=args.model,
600            where=args.where,
601            limit=args.limit,
602            show_sample=args.show_sample,
603        )
604
605    @magic_arguments()
606    @argument(
607        "model_name",
608        nargs="?",
609        type=str,
610        help="The name of the model to get the table name for.",
611    )
612    @argument(
613        "--dev",
614        action="store_true",
615        help="Print the name of the snapshot table used for previews in development environments.",
616    )
617    @line_magic
618    @pass_sqlmesh_context
619    def table_name(self, context: Context, line: str) -> None:
620        """Prints the name of the physical table for the given model."""
621        args = parse_argstring(self.table_name, line)
622        context.console.log_status_update(context.table_name(args.model_name, args.dev))
623
624    @magic_arguments()
625    @argument(
626        "--read",
627        type=str,
628        default="",
629        help="The input dialect of the sql string.",
630    )
631    @argument(
632        "--write",
633        type=str,
634        default="",
635        help="The output dialect of the sql string.",
636    )
637    @line_cell_magic
638    @pass_sqlmesh_context
639    def rewrite(self, context: Context, line: str, sql: str) -> None:
640        """Rewrite a sql expression with semantic references into an executable query.
641
642        https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/
643        """
644        args = parse_argstring(self.rewrite, line)
645        context.console.show_sql(
646            context.rewrite(sql, args.read).sql(
647                dialect=args.write or context.config.dialect, pretty=True
648            )
649        )
650
651    @magic_arguments()
652    @argument(
653        "--transpile",
654        "-t",
655        type=str,
656        help="Transpile project models to the specified dialect.",
657    )
658    @argument(
659        "--append-newline",
660        action="store_true",
661        help="Whether or not to append a newline to the end of the file.",
662        default=None,
663    )
664    @argument(
665        "--normalize",
666        action="store_true",
667        help="Whether or not to normalize identifiers to lowercase.",
668        default=None,
669    )
670    @argument(
671        "--pad",
672        type=int,
673        help="Determines the pad size in a formatted string.",
674    )
675    @argument(
676        "--indent",
677        type=int,
678        help="Determines the indentation size in a formatted string.",
679    )
680    @argument(
681        "--normalize-functions",
682        type=str,
683        help="Whether or not to normalize all function names. Possible values are: 'upper', 'lower'",
684    )
685    @argument(
686        "--leading-comma",
687        action="store_true",
688        help="Determines whether or not the comma is leading or trailing in select expressions. Default is trailing.",
689        default=None,
690    )
691    @argument(
692        "--max-text-width",
693        type=int,
694        help="The max number of characters in a segment before creating new lines in pretty mode.",
695    )
696    @line_magic
697    @pass_sqlmesh_context
698    def format(self, context: Context, line: str) -> None:
699        """Format all SQL models."""
700        args = parse_argstring(self.format, line)
701        context.format(**{k: v for k, v in vars(args).items() if v is not None})
702
703    @magic_arguments()
704    @argument("environment", type=str, help="The environment to diff local state against.")
705    @line_magic
706    @pass_sqlmesh_context
707    def diff(self, context: Context, line: str) -> None:
708        """Show the diff between the local state and the target environment."""
709        args = parse_argstring(self.diff, line)
710        context.diff(args.environment)
711
712    @magic_arguments()
713    @argument("environment", type=str, help="The environment to invalidate.")
714    @line_magic
715    @pass_sqlmesh_context
716    def invalidate(self, context: Context, line: str) -> None:
717        """Invalidate the target environment, forcing its removal during the next run of the janitor process."""
718        args = parse_argstring(self.invalidate, line)
719        context.invalidate_environment(args.environment)
720
721    @magic_arguments()
722    @argument("model", type=str)
723    @argument(
724        "--query",
725        "-q",
726        type=str,
727        nargs="+",
728        required=True,
729        help="Queries that will be used to generate data for the model's dependencies.",
730    )
731    @argument(
732        "--overwrite",
733        "-o",
734        action="store_true",
735        help="When true, the fixture file will be overwritten in case it already exists.",
736    )
737    @argument(
738        "--var",
739        "-v",
740        type=str,
741        nargs="+",
742        help="Key-value pairs that will define variables needed by the model.",
743    )
744    @argument(
745        "--path",
746        "-p",
747        type=str,
748        help="The file path corresponding to the fixture, relative to the test directory. "
749        "By default, the fixture will be created under the test directory and the file "
750        "name will be inferred based on the test's name.",
751    )
752    @argument(
753        "--name",
754        "-n",
755        type=str,
756        help="The name of the test that will be created. By default, it's inferred based on the model's name.",
757    )
758    @argument(
759        "--include-ctes",
760        action="store_true",
761        help="When true, CTE fixtures will also be generated.",
762    )
763    @line_magic
764    @pass_sqlmesh_context
765    def create_test(self, context: Context, line: str) -> None:
766        """Generate a unit test fixture for a given model."""
767        args = parse_argstring(self.create_test, line)
768        queries = iter(args.query)
769        variables = iter(args.var) if args.var else None
770        context.create_test(
771            args.model,
772            input_queries={k: v.strip('"') for k, v in dict(zip(queries, queries)).items()},
773            overwrite=args.overwrite,
774            variables=dict(zip(variables, variables)) if variables else None,
775            path=args.path,
776            name=args.name,
777            include_ctes=args.include_ctes,
778        )
779
780    @magic_arguments()
781    @argument("tests", nargs="*", type=str)
782    @argument(
783        "--pattern",
784        "-k",
785        nargs="*",
786        type=str,
787        help="Only run tests that match the pattern of substring.",
788    )
789    @argument("--verbose", "-v", action="store_true", help="Verbose output.")
790    @argument(
791        "--preserve-fixtures",
792        action="store_true",
793        help="Preserve the fixture tables in the testing database, useful for debugging.",
794    )
795    @line_magic
796    @pass_sqlmesh_context
797    def run_test(self, context: Context, line: str) -> None:
798        """Run unit test(s)."""
799        args = parse_argstring(self.run_test, line)
800        context.test(
801            match_patterns=args.pattern,
802            tests=args.tests,
803            verbose=args.verbose,
804            preserve_fixtures=args.preserve_fixtures,
805        )
806
807    @magic_arguments()
808    @argument(
809        "models", type=str, nargs="*", help="A model to audit. Multiple models can be audited."
810    )
811    @argument("--start", "-s", type=str, help="Start date to audit.")
812    @argument("--end", "-e", type=str, help="End date to audit.")
813    @argument("--execution-time", type=str, help="Execution time.")
814    @line_magic
815    @pass_sqlmesh_context
816    def audit(self, context: Context, line: str) -> None:
817        """Run audit(s)"""
818        args = parse_argstring(self.audit, line)
819        context.audit(
820            models=args.models, start=args.start, end=args.end, execution_time=args.execution_time
821        )
822
823    @magic_arguments()
824    @line_magic
825    @pass_sqlmesh_context
826    def info(self, context: Context, line: str) -> None:
827        """Display SQLMesh project information."""
828        context.print_info()
829
830    @magic_arguments()
831    @line_magic
832    @pass_sqlmesh_context
833    def rollback(self, context: Context, line: str) -> None:
834        """Rollback SQLMesh to the previous migration."""
835        context.rollback()
836
837    @magic_arguments()
838    @line_magic
839    @pass_sqlmesh_context
840    def clean(self, context: Context, line: str) -> None:
841        """Clears the SQLMesh cache and any build artifacts."""
842        context.clear_caches()
843        context.console.log_success("SQLMesh cache and build artifacts cleared")

Base class for implementing magic functions.

Shell functions which can be reached as %function_name. All magic functions should accept a string, which they can parse for their own needs. This can make some functions easier to type, eg %cd ../ vs. %cd("../")

Classes providing magic functions need to subclass this class, and they MUST:

  • Use the method decorators @line_magic and @cell_magic to decorate individual methods as magic functions, AND

  • Use the class decorator @magics_class to ensure that the magic methods are properly registered at the instance level upon instance initialization.

See magic_functions for examples of actual implementation classes.

@magic_arguments()
@argument('paths', type=str, nargs='+', default='', help='The path(s) to the SQLMesh project(s).')
@argument('--config', type=str, help='Name of the config object. Only applicable to configuration defined using Python script.')
@argument('--gateway', type=str, help='The name of the gateway.')
@argument('--ignore-warnings', action='store_true', help='Ignore warnings.')
@argument('--debug', action='store_true', help='Enable debug mode.')
@argument('--log-file-dir', type=str, help='The directory to write the log file to.')
@line_magic
def context(self, line: str) -> None:
 79    @magic_arguments()
 80    @argument(
 81        "paths",
 82        type=str,
 83        nargs="+",
 84        default="",
 85        help="The path(s) to the SQLMesh project(s).",
 86    )
 87    @argument(
 88        "--config",
 89        type=str,
 90        help="Name of the config object. Only applicable to configuration defined using Python script.",
 91    )
 92    @argument("--gateway", type=str, help="The name of the gateway.")
 93    @argument("--ignore-warnings", action="store_true", help="Ignore warnings.")
 94    @argument("--debug", action="store_true", help="Enable debug mode.")
 95    @argument("--log-file-dir", type=str, help="The directory to write the log file to.")
 96    @line_magic
 97    def context(self, line: str) -> None:
 98        """Sets the context in the user namespace."""
 99        from sqlmesh import configure_logging
100
101        args = parse_argstring(self.context, line)
102        configs = load_configs(args.config, Context.CONFIG_TYPE, args.paths)
103        log_limit = list(configs.values())[0].log_limit
104        configure_logging(
105            args.debug, args.ignore_warnings, log_limit=log_limit, log_file_dir=args.log_file_dir
106        )
107        try:
108            context = Context(paths=args.paths, config=configs, gateway=args.gateway)
109            self._shell.user_ns["context"] = context
110        except Exception:
111            if args.debug:
112                logger.exception("Failed to initialize SQLMesh context")
113            raise
114        context.console.log_success(f"SQLMesh project context set to: {', '.join(args.paths)}")

Sets the context in the user namespace.

@magic_arguments()
@argument('path', type=str, help='The path where the new SQLMesh project should be created.')
@argument('sql_dialect', type=str, help=f'Default model SQL dialect. Supported values: {sqlglot_dialects()}.')
@argument('--template', '-t', type=str, help='Project template. Supported values: airflow, dbt, default, empty.')
@line_magic
def init(self, line: str) -> None:
116    @magic_arguments()
117    @argument("path", type=str, help="The path where the new SQLMesh project should be created.")
118    @argument(
119        "sql_dialect",
120        type=str,
121        help=f"Default model SQL dialect. Supported values: {sqlglot_dialects()}.",
122    )
123    @argument(
124        "--template",
125        "-t",
126        type=str,
127        help="Project template. Supported values: airflow, dbt, default, empty.",
128    )
129    @line_magic
130    def init(self, line: str) -> None:
131        """Creates a SQLMesh project scaffold with a default SQL dialect."""
132        args = parse_argstring(self.init, line)
133        try:
134            project_template = ProjectTemplate(
135                args.template.lower() if args.template else "default"
136            )
137        except ValueError:
138            raise MagicError(f"Invalid project template '{args.template}'")
139        init_example_project(args.path, args.sql_dialect, project_template)
140        html = str(
141            h(
142                "div",
143                h(
144                    "span",
145                    {"style": {"color": "green", "font-weight": "bold"}},
146                    "SQLMesh project scaffold created",
147                ),
148            )
149        )
150        self.display(JupyterRenderable(html=html, text=""))

Creates a SQLMesh project scaffold with a default SQL dialect.

@magic_arguments()
@argument('model', type=str, help='The model.')
@argument('--start', '-s', type=str, help='Start date to render.')
@argument('--end', '-e', type=str, help='End date to render.')
@argument('--execution-time', type=str, help='Execution time.')
@argument('--dialect', '-d', type=str, help='The rendered dialect.')
@line_cell_magic
@pass_sqlmesh_context
def model( self, context: sqlmesh.core.context.Context, line: str, sql: Union[str, NoneType] = None) -> None:
152    @magic_arguments()
153    @argument("model", type=str, help="The model.")
154    @argument("--start", "-s", type=str, help="Start date to render.")
155    @argument("--end", "-e", type=str, help="End date to render.")
156    @argument("--execution-time", type=str, help="Execution time.")
157    @argument("--dialect", "-d", type=str, help="The rendered dialect.")
158    @line_cell_magic
159    @pass_sqlmesh_context
160    def model(self, context: Context, line: str, sql: t.Optional[str] = None) -> None:
161        """Renders the model and automatically fills in an editable cell with the model definition."""
162        args = parse_argstring(self.model, line)
163
164        model = context.get_model(args.model, raise_if_missing=True)
165        config = context.config_for_node(model)
166
167        if sql:
168            expressions = parse(sql, default_dialect=config.dialect)
169            loaded = load_sql_based_model(
170                expressions,
171                macros=context._macros,
172                jinja_macros=context._jinja_macros,
173                path=model._path,
174                dialect=config.dialect,
175                time_column_format=config.time_column_format,
176                physical_schema_override=context.config.physical_schema_override,
177                default_catalog=context.default_catalog,
178            )
179
180            if loaded.name == args.model:
181                model = loaded
182        else:
183            with open(model._path, "r", encoding="utf-8") as file:
184                expressions = parse(file.read(), default_dialect=config.dialect)
185
186        formatted = format_model_expressions(
187            expressions, model.dialect, **config.format.generator_options
188        )
189
190        self._shell.set_next_input(
191            "\n".join(
192                [
193                    " ".join(["%%model", line]),
194                    formatted,
195                ]
196            ),
197            replace=True,
198        )
199
200        with open(model._path, "w", encoding="utf-8") as file:
201            file.write(formatted)
202
203        if sql:
204            context.console.log_success(f"Model `{args.model}` updated")
205
206        context.upsert_model(model)
207        context.console.show_sql(
208            context.render(
209                model.name,
210                start=args.start,
211                end=args.end,
212                execution_time=args.execution_time,
213            ).sql(pretty=True, dialect=args.dialect or model.dialect)
214        )

Renders the model and automatically fills in an editable cell with the model definition.

@magic_arguments()
@argument('model', type=str, help='The model.')
@argument('test_name', type=str, nargs='?', default=None, help='The test name to display')
@argument('--ls', action='store_true', help='List tests associated with a model')
@line_cell_magic
@pass_sqlmesh_context
def test( self, context: sqlmesh.core.context.Context, line: str, test_def_raw: Union[str, NoneType] = None) -> None:
216    @magic_arguments()
217    @argument("model", type=str, help="The model.")
218    @argument("test_name", type=str, nargs="?", default=None, help="The test name to display")
219    @argument("--ls", action="store_true", help="List tests associated with a model")
220    @line_cell_magic
221    @pass_sqlmesh_context
222    def test(self, context: Context, line: str, test_def_raw: t.Optional[str] = None) -> None:
223        """Allow the user to list tests for a model, output a specific test, and then write their changes back"""
224        args = parse_argstring(self.test, line)
225        if not args.test_name and not args.ls:
226            raise MagicError("Must provide either test name or `--ls` to list tests")
227
228        test_meta = []
229
230        for path, config in context.configs.items():
231            test_meta.extend(
232                get_all_model_tests(
233                    path / c.TESTS,
234                    ignore_patterns=config.ignore_patterns,
235                )
236            )
237
238        tests: t.Dict[str, t.Dict[str, ModelTestMetadata]] = defaultdict(dict)
239        for model_test_metadata in test_meta:
240            model = model_test_metadata.body.get("model")
241            if not model:
242                context.console.log_error(
243                    f"Test found that does not have `model` defined: {model_test_metadata.path}"
244                )
245            else:
246                tests[model][model_test_metadata.test_name] = model_test_metadata
247
248        model = context.get_model(args.model, raise_if_missing=True)
249
250        if args.ls:
251            # TODO: Provide better UI for displaying tests
252            for test_name in tests[model.name]:
253                context.console.log_status_update(test_name)
254            return
255
256        test = tests[model.name][args.test_name]
257        test_def = yaml.load(test_def_raw) if test_def_raw else test.body
258        test_def_output = yaml.dump(test_def)
259
260        self._shell.set_next_input(
261            "\n".join(
262                [
263                    " ".join(["%%test", line]),
264                    test_def_output,
265                ]
266            ),
267            replace=True,
268        )
269
270        with open(test.path, "r+", encoding="utf-8") as file:
271            content = yaml.load(file.read())
272            content[args.test_name] = test_def
273            file.seek(0)
274            yaml.dump(content, file)
275            file.truncate()

Allow the user to list tests for a model, output a specific test, and then write their changes back

@magic_arguments()
@argument('environment', nargs='?', type=str, help='The environment to run the plan against')
@argument('--start', '-s', type=str, help='Start date to backfill.')
@argument('--end', '-e', type=str, help='End date to backfill.')
@argument('--execution-time', type=str, help='Execution time.')
@argument('--create-from', type=str, help="The environment to create the target environment from if it doesn't exist. Default: prod.")
@argument('--skip-tests', '-t', action='store_true', help='Skip the unit tests defined for the model.')
@argument('--restate-model', '-r', type=str, nargs='*', help='Restate data for specified models (and models downstream from the one specified). For production environment, all related model versions will have their intervals wiped, but only the current versions will be backfilled. For development environment, only the current model versions will be affected.')
@argument('--no-gaps', '-g', action='store_true', help='Ensure that new snapshots have no data gaps when comparing to existing snapshots for matching models in the target environment.')
@argument('--skip-backfill', action='store_true', help='Skip the backfill step.')
@argument('--forward-only', action='store_true', help='Create a plan for forward-only changes.', default=None)
@argument('--effective-from', type=str, help='The effective date from which to apply forward-only changes on production.')
@argument('--no-prompts', action='store_true', help='Disables interactive prompts for the backfill time range. Please note that if this flag is set and there are uncategorized changes, plan creation will fail.', default=None)
@argument('--auto-apply', action='store_true', help='Automatically applies the new plan after creation.', default=None)
@argument('--no-auto-categorization', action='store_true', help='Disable automatic change categorization.', default=None)
@argument('--include-unmodified', action='store_true', help='Include unmodified models in the target environment.', default=None)
@argument('--select-model', type=str, nargs='*', help='Select specific model changes that should be included in the plan.')
@argument('--backfill-model', type=str, nargs='*', help='Backfill only the models whose names match the expression. This is supported only when targeting a development environment.')
@argument('--no-diff', action='store_true', help='Hide text differences for changed models.', default=None)
@argument('--run', action='store_true', help='Run latest intervals as part of the plan application (prod environment only).')
@argument('--enable-preview', action='store_true', help='Enable preview for forward-only models when targeting a development environment.', default=None)
@line_magic
@pass_sqlmesh_context
def plan(self, context: sqlmesh.core.context.Context, line: str) -> None:
277    @magic_arguments()
278    @argument(
279        "environment",
280        nargs="?",
281        type=str,
282        help="The environment to run the plan against",
283    )
284    @argument("--start", "-s", type=str, help="Start date to backfill.")
285    @argument("--end", "-e", type=str, help="End date to backfill.")
286    @argument("--execution-time", type=str, help="Execution time.")
287    @argument(
288        "--create-from",
289        type=str,
290        help="The environment to create the target environment from if it doesn't exist. Default: prod.",
291    )
292    @argument(
293        "--skip-tests",
294        "-t",
295        action="store_true",
296        help="Skip the unit tests defined for the model.",
297    )
298    @argument(
299        "--restate-model",
300        "-r",
301        type=str,
302        nargs="*",
303        help="Restate data for specified models (and models downstream from the one specified). For production environment, all related model versions will have their intervals wiped, but only the current versions will be backfilled. For development environment, only the current model versions will be affected.",
304    )
305    @argument(
306        "--no-gaps",
307        "-g",
308        action="store_true",
309        help="Ensure that new snapshots have no data gaps when comparing to existing snapshots for matching models in the target environment.",
310    )
311    @argument(
312        "--skip-backfill",
313        action="store_true",
314        help="Skip the backfill step.",
315    )
316    @argument(
317        "--forward-only",
318        action="store_true",
319        help="Create a plan for forward-only changes.",
320        default=None,
321    )
322    @argument(
323        "--effective-from",
324        type=str,
325        help="The effective date from which to apply forward-only changes on production.",
326    )
327    @argument(
328        "--no-prompts",
329        action="store_true",
330        help="Disables interactive prompts for the backfill time range. Please note that if this flag is set and there are uncategorized changes, plan creation will fail.",
331        default=None,
332    )
333    @argument(
334        "--auto-apply",
335        action="store_true",
336        help="Automatically applies the new plan after creation.",
337        default=None,
338    )
339    @argument(
340        "--no-auto-categorization",
341        action="store_true",
342        help="Disable automatic change categorization.",
343        default=None,
344    )
345    @argument(
346        "--include-unmodified",
347        action="store_true",
348        help="Include unmodified models in the target environment.",
349        default=None,
350    )
351    @argument(
352        "--select-model",
353        type=str,
354        nargs="*",
355        help="Select specific model changes that should be included in the plan.",
356    )
357    @argument(
358        "--backfill-model",
359        type=str,
360        nargs="*",
361        help="Backfill only the models whose names match the expression. This is supported only when targeting a development environment.",
362    )
363    @argument(
364        "--no-diff",
365        action="store_true",
366        help="Hide text differences for changed models.",
367        default=None,
368    )
369    @argument(
370        "--run",
371        action="store_true",
372        help="Run latest intervals as part of the plan application (prod environment only).",
373    )
374    @argument(
375        "--enable-preview",
376        action="store_true",
377        help="Enable preview for forward-only models when targeting a development environment.",
378        default=None,
379    )
380    @line_magic
381    @pass_sqlmesh_context
382    def plan(self, context: Context, line: str) -> None:
383        """Goes through a set of prompts to both establish a plan and apply it"""
384        args = parse_argstring(self.plan, line)
385
386        context.plan(
387            args.environment,
388            start=args.start,
389            end=args.end,
390            execution_time=args.execution_time,
391            create_from=args.create_from,
392            skip_tests=args.skip_tests,
393            restate_models=args.restate_model,
394            backfill_models=args.backfill_model,
395            no_gaps=args.no_gaps,
396            skip_backfill=args.skip_backfill,
397            forward_only=args.forward_only,
398            no_prompts=args.no_prompts,
399            auto_apply=args.auto_apply,
400            no_auto_categorization=args.no_auto_categorization,
401            effective_from=args.effective_from,
402            include_unmodified=args.include_unmodified,
403            select_models=args.select_model,
404            no_diff=args.no_diff,
405            run=args.run,
406            enable_preview=args.enable_preview,
407        )

Goes through a set of prompts to both establish a plan and apply it

@magic_arguments()
@argument('environment', nargs='?', type=str, help='The environment to run against')
@argument('--start', '-s', type=str, help='Start date to evaluate.')
@argument('--end', '-e', type=str, help='End date to evaluate.')
@argument('--skip-janitor', action='store_true', help='Skip the janitor task.')
@argument('--ignore-cron', action='store_true', help='Run for all missing intervals, ignoring individual cron schedules.')
@line_magic
@pass_sqlmesh_context
def run_dag(self, context: sqlmesh.core.context.Context, line: str) -> None:
409    @magic_arguments()
410    @argument(
411        "environment",
412        nargs="?",
413        type=str,
414        help="The environment to run against",
415    )
416    @argument("--start", "-s", type=str, help="Start date to evaluate.")
417    @argument("--end", "-e", type=str, help="End date to evaluate.")
418    @argument("--skip-janitor", action="store_true", help="Skip the janitor task.")
419    @argument(
420        "--ignore-cron",
421        action="store_true",
422        help="Run for all missing intervals, ignoring individual cron schedules.",
423    )
424    @line_magic
425    @pass_sqlmesh_context
426    def run_dag(self, context: Context, line: str) -> None:
427        """Evaluate the DAG of models using the built-in scheduler."""
428        args = parse_argstring(self.run_dag, line)
429
430        success = context.run(
431            args.environment,
432            start=args.start,
433            end=args.end,
434            skip_janitor=args.skip_janitor,
435            ignore_cron=args.ignore_cron,
436        )
437        if not success:
438            raise SQLMeshError("Error Running DAG. Check logs for details.")

Evaluate the DAG of models using the built-in scheduler.

@magic_arguments()
@argument('model', type=str, help='The model.')
@argument('--start', '-s', type=str, help='Start date to render.')
@argument('--end', '-e', type=str, help='End date to render.')
@argument('--execution-time', type=str, help='Execution time.')
@argument('--limit', type=int, help='The number of rows which the query should be limited to.')
@line_magic
@pass_sqlmesh_context
def evaluate(self, context: sqlmesh.core.context.Context, line: str) -> None:
440    @magic_arguments()
441    @argument("model", type=str, help="The model.")
442    @argument("--start", "-s", type=str, help="Start date to render.")
443    @argument("--end", "-e", type=str, help="End date to render.")
444    @argument("--execution-time", type=str, help="Execution time.")
445    @argument(
446        "--limit",
447        type=int,
448        help="The number of rows which the query should be limited to.",
449    )
450    @line_magic
451    @pass_sqlmesh_context
452    def evaluate(self, context: Context, line: str) -> None:
453        """Evaluate a model query and fetches a dataframe."""
454        context.refresh()
455        args = parse_argstring(self.evaluate, line)
456
457        df = context.evaluate(
458            args.model,
459            start=args.start,
460            end=args.end,
461            execution_time=args.execution_time,
462            limit=args.limit,
463        )
464        self.display(df)

Evaluate a model query and fetches a dataframe.

@magic_arguments()
@argument('model', type=str, help='The model.')
@argument('--start', '-s', type=str, help='Start date to render.')
@argument('--end', '-e', type=str, help='End date to render.')
@argument('--execution-time', type=str, help='Execution time.')
@argument('--expand', type=t.Union[(bool, t.Iterable[str])], help='Whether or not to use expand materialized models, defaults to False. If True, all referenced models are expanded as raw queries. If a list, only referenced models are expanded as raw queries.')
@argument('--dialect', type=str, help='SQL dialect to render.')
@argument('--no-format', action='store_true', help='Disable fancy formatting of the query.')
@line_magic
@pass_sqlmesh_context
def render(self, context: sqlmesh.core.context.Context, line: str) -> None:
466    @magic_arguments()
467    @argument("model", type=str, help="The model.")
468    @argument("--start", "-s", type=str, help="Start date to render.")
469    @argument("--end", "-e", type=str, help="End date to render.")
470    @argument("--execution-time", type=str, help="Execution time.")
471    @argument(
472        "--expand",
473        type=t.Union[bool, t.Iterable[str]],
474        help="Whether or not to use expand materialized models, defaults to False. If True, all referenced models are expanded as raw queries. If a list, only referenced models are expanded as raw queries.",
475    )
476    @argument("--dialect", type=str, help="SQL dialect to render.")
477    @argument("--no-format", action="store_true", help="Disable fancy formatting of the query.")
478    @line_magic
479    @pass_sqlmesh_context
480    def render(self, context: Context, line: str) -> None:
481        """Renders a model's query, optionally expanding referenced models."""
482        context.refresh()
483        args = parse_argstring(self.render, line)
484
485        query = context.render(
486            args.model,
487            start=args.start,
488            end=args.end,
489            execution_time=args.execution_time,
490            expand=args.expand,
491        )
492
493        sql = query.sql(pretty=True, dialect=args.dialect or context.config.dialect)
494        if args.no_format:
495            context.console.log_status_update(sql)
496        else:
497            context.console.show_sql(sql)

Renders a model's query, optionally expanding referenced models.

@magic_arguments()
@argument('df_var', default=None, nargs='?', type=str, help='An optional variable name to store the resulting dataframe.')
@cell_magic
@pass_sqlmesh_context
def fetchdf(self, context: sqlmesh.core.context.Context, line: str, sql: str) -> None:
499    @magic_arguments()
500    @argument(
501        "df_var",
502        default=None,
503        nargs="?",
504        type=str,
505        help="An optional variable name to store the resulting dataframe.",
506    )
507    @cell_magic
508    @pass_sqlmesh_context
509    def fetchdf(self, context: Context, line: str, sql: str) -> None:
510        """Fetches a dataframe from sql, optionally storing it in a variable."""
511        args = parse_argstring(self.fetchdf, line)
512        df = context.fetchdf(sql)
513        if args.df_var:
514            self._shell.user_ns[args.df_var] = df
515        self.display(df)

Fetches a dataframe from sql, optionally storing it in a variable.

@magic_arguments()
@argument('--file', '-f', type=str, help='An optional file path to write the HTML output to.')
@argument('--select-model', type=str, nargs='*', help='Select specific models to include in the dag.')
@line_magic
@pass_sqlmesh_context
def dag(self, context: sqlmesh.core.context.Context, line: str) -> None:
517    @magic_arguments()
518    @argument("--file", "-f", type=str, help="An optional file path to write the HTML output to.")
519    @argument(
520        "--select-model",
521        type=str,
522        nargs="*",
523        help="Select specific models to include in the dag.",
524    )
525    @line_magic
526    @pass_sqlmesh_context
527    def dag(self, context: Context, line: str) -> None:
528        """Displays the HTML DAG."""
529        args = parse_argstring(self.dag, line)
530        dag = context.get_dag(args.select_model)
531        if args.file:
532            with open(args.file, "w", encoding="utf-8") as file:
533                file.write(str(dag))
534        # TODO: Have this go through console instead of calling display directly
535        self.display(dag)

Displays the HTML DAG.

@magic_arguments()
@line_magic
@pass_sqlmesh_context
def migrate(self, context: sqlmesh.core.context.Context, line: str) -> None:
537    @magic_arguments()
538    @line_magic
539    @pass_sqlmesh_context
540    def migrate(self, context: Context, line: str) -> None:
541        """Migrate SQLMesh to the current running version."""
542        context.migrate()
543        context.console.log_success("Migration complete")

Migrate SQLMesh to the current running version.

@magic_arguments()
@line_magic
@pass_sqlmesh_context
def create_external_models(self, context: sqlmesh.core.context.Context, line: str) -> None:
545    @magic_arguments()
546    @line_magic
547    @pass_sqlmesh_context
548    def create_external_models(self, context: Context, line: str) -> None:
549        """Create a schema file containing external model schemas."""
550        context.create_external_models()

Create a schema file containing external model schemas.

@magic_arguments()
@argument('source_to_target', type=str, metavar='SOURCE:TARGET', help='Source and target in `SOURCE:TARGET` format')
@argument('--on', type=str, nargs='*', help='The column to join on. Can be specified multiple times. The model grain will be used if not specified.')
@argument('--model', type=str, help='The model to diff against when source and target are environments and not tables.')
@argument('--where', type=str, help='An optional where statement to filter results.')
@argument('--limit', type=int, default=20, help='The limit of the sample dataframe.')
@argument('--show-sample', action='store_true', help='Show a sample of the rows that differ. With many columns, the output can be very wide.')
@line_magic
@pass_sqlmesh_context
def table_diff(self, context: sqlmesh.core.context.Context, line: str) -> None:
552    @magic_arguments()
553    @argument(
554        "source_to_target",
555        type=str,
556        metavar="SOURCE:TARGET",
557        help="Source and target in `SOURCE:TARGET` format",
558    )
559    @argument(
560        "--on",
561        type=str,
562        nargs="*",
563        help="The column to join on. Can be specified multiple times. The model grain will be used if not specified.",
564    )
565    @argument(
566        "--model",
567        type=str,
568        help="The model to diff against when source and target are environments and not tables.",
569    )
570    @argument(
571        "--where",
572        type=str,
573        help="An optional where statement to filter results.",
574    )
575    @argument(
576        "--limit",
577        type=int,
578        default=20,
579        help="The limit of the sample dataframe.",
580    )
581    @argument(
582        "--show-sample",
583        action="store_true",
584        help="Show a sample of the rows that differ. With many columns, the output can be very wide.",
585    )
586    @line_magic
587    @pass_sqlmesh_context
588    def table_diff(self, context: Context, line: str) -> None:
589        """Show the diff between two tables.
590
591        Can either be two tables or two environments and a model.
592        """
593        args = parse_argstring(self.table_diff, line)
594        source, target = args.source_to_target.split(":")
595        context.table_diff(
596            source=source,
597            target=target,
598            on=args.on,
599            model_or_snapshot=args.model,
600            where=args.where,
601            limit=args.limit,
602            show_sample=args.show_sample,
603        )

Show the diff between two tables.

Can either be two tables or two environments and a model.

@magic_arguments()
@argument('model_name', nargs='?', type=str, help='The name of the model to get the table name for.')
@argument('--dev', action='store_true', help='Print the name of the snapshot table used for previews in development environments.')
@line_magic
@pass_sqlmesh_context
def table_name(self, context: sqlmesh.core.context.Context, line: str) -> None:
605    @magic_arguments()
606    @argument(
607        "model_name",
608        nargs="?",
609        type=str,
610        help="The name of the model to get the table name for.",
611    )
612    @argument(
613        "--dev",
614        action="store_true",
615        help="Print the name of the snapshot table used for previews in development environments.",
616    )
617    @line_magic
618    @pass_sqlmesh_context
619    def table_name(self, context: Context, line: str) -> None:
620        """Prints the name of the physical table for the given model."""
621        args = parse_argstring(self.table_name, line)
622        context.console.log_status_update(context.table_name(args.model_name, args.dev))

Prints the name of the physical table for the given model.

@magic_arguments()
@argument('--read', type=str, default='', help='The input dialect of the sql string.')
@argument('--write', type=str, default='', help='The output dialect of the sql string.')
@line_cell_magic
@pass_sqlmesh_context
def rewrite(self, context: sqlmesh.core.context.Context, line: str, sql: str) -> None:
624    @magic_arguments()
625    @argument(
626        "--read",
627        type=str,
628        default="",
629        help="The input dialect of the sql string.",
630    )
631    @argument(
632        "--write",
633        type=str,
634        default="",
635        help="The output dialect of the sql string.",
636    )
637    @line_cell_magic
638    @pass_sqlmesh_context
639    def rewrite(self, context: Context, line: str, sql: str) -> None:
640        """Rewrite a sql expression with semantic references into an executable query.
641
642        https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/
643        """
644        args = parse_argstring(self.rewrite, line)
645        context.console.show_sql(
646            context.rewrite(sql, args.read).sql(
647                dialect=args.write or context.config.dialect, pretty=True
648            )
649        )

Rewrite a sql expression with semantic references into an executable query.

https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/

@magic_arguments()
@argument('--transpile', '-t', type=str, help='Transpile project models to the specified dialect.')
@argument('--append-newline', action='store_true', help='Whether or not to append a newline to the end of the file.', default=None)
@argument('--normalize', action='store_true', help='Whether or not to normalize identifiers to lowercase.', default=None)
@argument('--pad', type=int, help='Determines the pad size in a formatted string.')
@argument('--indent', type=int, help='Determines the indentation size in a formatted string.')
@argument('--normalize-functions', type=str, help="Whether or not to normalize all function names. Possible values are: 'upper', 'lower'")
@argument('--leading-comma', action='store_true', help='Determines whether or not the comma is leading or trailing in select expressions. Default is trailing.', default=None)
@argument('--max-text-width', type=int, help='The max number of characters in a segment before creating new lines in pretty mode.')
@line_magic
@pass_sqlmesh_context
def format(self, context: sqlmesh.core.context.Context, line: str) -> None:
651    @magic_arguments()
652    @argument(
653        "--transpile",
654        "-t",
655        type=str,
656        help="Transpile project models to the specified dialect.",
657    )
658    @argument(
659        "--append-newline",
660        action="store_true",
661        help="Whether or not to append a newline to the end of the file.",
662        default=None,
663    )
664    @argument(
665        "--normalize",
666        action="store_true",
667        help="Whether or not to normalize identifiers to lowercase.",
668        default=None,
669    )
670    @argument(
671        "--pad",
672        type=int,
673        help="Determines the pad size in a formatted string.",
674    )
675    @argument(
676        "--indent",
677        type=int,
678        help="Determines the indentation size in a formatted string.",
679    )
680    @argument(
681        "--normalize-functions",
682        type=str,
683        help="Whether or not to normalize all function names. Possible values are: 'upper', 'lower'",
684    )
685    @argument(
686        "--leading-comma",
687        action="store_true",
688        help="Determines whether or not the comma is leading or trailing in select expressions. Default is trailing.",
689        default=None,
690    )
691    @argument(
692        "--max-text-width",
693        type=int,
694        help="The max number of characters in a segment before creating new lines in pretty mode.",
695    )
696    @line_magic
697    @pass_sqlmesh_context
698    def format(self, context: Context, line: str) -> None:
699        """Format all SQL models."""
700        args = parse_argstring(self.format, line)
701        context.format(**{k: v for k, v in vars(args).items() if v is not None})

Format all SQL models.

@magic_arguments()
@argument('environment', type=str, help='The environment to diff local state against.')
@line_magic
@pass_sqlmesh_context
def diff(self, context: sqlmesh.core.context.Context, line: str) -> None:
703    @magic_arguments()
704    @argument("environment", type=str, help="The environment to diff local state against.")
705    @line_magic
706    @pass_sqlmesh_context
707    def diff(self, context: Context, line: str) -> None:
708        """Show the diff between the local state and the target environment."""
709        args = parse_argstring(self.diff, line)
710        context.diff(args.environment)

Show the diff between the local state and the target environment.

@magic_arguments()
@argument('environment', type=str, help='The environment to invalidate.')
@line_magic
@pass_sqlmesh_context
def invalidate(self, context: sqlmesh.core.context.Context, line: str) -> None:
712    @magic_arguments()
713    @argument("environment", type=str, help="The environment to invalidate.")
714    @line_magic
715    @pass_sqlmesh_context
716    def invalidate(self, context: Context, line: str) -> None:
717        """Invalidate the target environment, forcing its removal during the next run of the janitor process."""
718        args = parse_argstring(self.invalidate, line)
719        context.invalidate_environment(args.environment)

Invalidate the target environment, forcing its removal during the next run of the janitor process.

@magic_arguments()
@argument('model', type=str)
@argument('--query', '-q', type=str, nargs='+', required=True, help="Queries that will be used to generate data for the model's dependencies.")
@argument('--overwrite', '-o', action='store_true', help='When true, the fixture file will be overwritten in case it already exists.')
@argument('--var', '-v', type=str, nargs='+', help='Key-value pairs that will define variables needed by the model.')
@argument('--path', '-p', type=str, help="The file path corresponding to the fixture, relative to the test directory. By default, the fixture will be created under the test directory and the file name will be inferred based on the test's name.")
@argument('--name', '-n', type=str, help="The name of the test that will be created. By default, it's inferred based on the model's name.")
@argument('--include-ctes', action='store_true', help='When true, CTE fixtures will also be generated.')
@line_magic
@pass_sqlmesh_context
def create_test(self, context: sqlmesh.core.context.Context, line: str) -> None:
721    @magic_arguments()
722    @argument("model", type=str)
723    @argument(
724        "--query",
725        "-q",
726        type=str,
727        nargs="+",
728        required=True,
729        help="Queries that will be used to generate data for the model's dependencies.",
730    )
731    @argument(
732        "--overwrite",
733        "-o",
734        action="store_true",
735        help="When true, the fixture file will be overwritten in case it already exists.",
736    )
737    @argument(
738        "--var",
739        "-v",
740        type=str,
741        nargs="+",
742        help="Key-value pairs that will define variables needed by the model.",
743    )
744    @argument(
745        "--path",
746        "-p",
747        type=str,
748        help="The file path corresponding to the fixture, relative to the test directory. "
749        "By default, the fixture will be created under the test directory and the file "
750        "name will be inferred based on the test's name.",
751    )
752    @argument(
753        "--name",
754        "-n",
755        type=str,
756        help="The name of the test that will be created. By default, it's inferred based on the model's name.",
757    )
758    @argument(
759        "--include-ctes",
760        action="store_true",
761        help="When true, CTE fixtures will also be generated.",
762    )
763    @line_magic
764    @pass_sqlmesh_context
765    def create_test(self, context: Context, line: str) -> None:
766        """Generate a unit test fixture for a given model."""
767        args = parse_argstring(self.create_test, line)
768        queries = iter(args.query)
769        variables = iter(args.var) if args.var else None
770        context.create_test(
771            args.model,
772            input_queries={k: v.strip('"') for k, v in dict(zip(queries, queries)).items()},
773            overwrite=args.overwrite,
774            variables=dict(zip(variables, variables)) if variables else None,
775            path=args.path,
776            name=args.name,
777            include_ctes=args.include_ctes,
778        )

Generate a unit test fixture for a given model.

@magic_arguments()
@argument('tests', nargs='*', type=str)
@argument('--pattern', '-k', nargs='*', type=str, help='Only run tests that match the pattern of substring.')
@argument('--verbose', '-v', action='store_true', help='Verbose output.')
@argument('--preserve-fixtures', action='store_true', help='Preserve the fixture tables in the testing database, useful for debugging.')
@line_magic
@pass_sqlmesh_context
def run_test(self, context: sqlmesh.core.context.Context, line: str) -> None:
780    @magic_arguments()
781    @argument("tests", nargs="*", type=str)
782    @argument(
783        "--pattern",
784        "-k",
785        nargs="*",
786        type=str,
787        help="Only run tests that match the pattern of substring.",
788    )
789    @argument("--verbose", "-v", action="store_true", help="Verbose output.")
790    @argument(
791        "--preserve-fixtures",
792        action="store_true",
793        help="Preserve the fixture tables in the testing database, useful for debugging.",
794    )
795    @line_magic
796    @pass_sqlmesh_context
797    def run_test(self, context: Context, line: str) -> None:
798        """Run unit test(s)."""
799        args = parse_argstring(self.run_test, line)
800        context.test(
801            match_patterns=args.pattern,
802            tests=args.tests,
803            verbose=args.verbose,
804            preserve_fixtures=args.preserve_fixtures,
805        )

Run unit test(s).

@magic_arguments()
@argument('models', type=str, nargs='*', help='A model to audit. Multiple models can be audited.')
@argument('--start', '-s', type=str, help='Start date to audit.')
@argument('--end', '-e', type=str, help='End date to audit.')
@argument('--execution-time', type=str, help='Execution time.')
@line_magic
@pass_sqlmesh_context
def audit(self, context: sqlmesh.core.context.Context, line: str) -> None:
807    @magic_arguments()
808    @argument(
809        "models", type=str, nargs="*", help="A model to audit. Multiple models can be audited."
810    )
811    @argument("--start", "-s", type=str, help="Start date to audit.")
812    @argument("--end", "-e", type=str, help="End date to audit.")
813    @argument("--execution-time", type=str, help="Execution time.")
814    @line_magic
815    @pass_sqlmesh_context
816    def audit(self, context: Context, line: str) -> None:
817        """Run audit(s)"""
818        args = parse_argstring(self.audit, line)
819        context.audit(
820            models=args.models, start=args.start, end=args.end, execution_time=args.execution_time
821        )

Run audit(s)

@magic_arguments()
@line_magic
@pass_sqlmesh_context
def info(self, context: sqlmesh.core.context.Context, line: str) -> None:
823    @magic_arguments()
824    @line_magic
825    @pass_sqlmesh_context
826    def info(self, context: Context, line: str) -> None:
827        """Display SQLMesh project information."""
828        context.print_info()

Display SQLMesh project information.

@magic_arguments()
@line_magic
@pass_sqlmesh_context
def rollback(self, context: sqlmesh.core.context.Context, line: str) -> None:
830    @magic_arguments()
831    @line_magic
832    @pass_sqlmesh_context
833    def rollback(self, context: Context, line: str) -> None:
834        """Rollback SQLMesh to the previous migration."""
835        context.rollback()

Rollback SQLMesh to the previous migration.

@magic_arguments()
@line_magic
@pass_sqlmesh_context
def clean(self, context: sqlmesh.core.context.Context, line: str) -> None:
837    @magic_arguments()
838    @line_magic
839    @pass_sqlmesh_context
840    def clean(self, context: Context, line: str) -> None:
841        """Clears the SQLMesh cache and any build artifacts."""
842        context.clear_caches()
843        context.console.log_success("SQLMesh cache and build artifacts cleared")

Clears the SQLMesh cache and any build artifacts.

Inherited Members
IPython.core.magic.Magics
Magics
arg_err
format_latex
parse_options
default_option
traitlets.config.configurable.Configurable
config
parent
section_names
update_config
class_get_help
class_get_trait_help
class_print_help
class_config_section
class_config_rst_doc
traitlets.traitlets.HasTraits
setup_instance
cross_validation_lock
hold_trait_notifications
notify_change
on_trait_change
observe
unobserve
unobserve_all
add_traits
set_trait
class_trait_names
class_traits
class_own_traits
has_trait
trait_has_value
trait_values
trait_defaults
trait_names
traits
trait_metadata
class_own_trait_events
trait_events
def register_magics() -> None:
846def register_magics() -> None:
847    try:
848        shell = get_ipython()  # type: ignore
849        shell.register_magics(SQLMeshMagics)
850    except NameError:
851        pass