sqlmesh.magics
1from __future__ import annotations 2 3from io import StringIO 4 5import functools 6import logging 7import typing as t 8from argparse import Namespace, SUPPRESS 9from collections import defaultdict 10from copy import deepcopy 11from pathlib import Path 12 13from hyperscript import h 14 15try: 16 from IPython.core.display import display # type: ignore 17except ImportError: 18 from IPython.display import display 19 20from IPython.core.magic import ( 21 Magics, 22 cell_magic, 23 line_cell_magic, 24 line_magic, 25 magics_class, 26) 27from IPython.core.magic_arguments import argument, magic_arguments, parse_argstring 28from IPython.utils.process import arg_split 29from rich.jupyter import JupyterRenderable 30from sqlmesh.cli.project_init import ProjectTemplate, init_example_project 31from sqlmesh.core import analytics 32from sqlmesh.core.config import load_configs 33from sqlmesh.core.config.connection import INIT_DISPLAY_INFO_TO_TYPE 34from sqlmesh.core.console import create_console, set_console, configure_console 35from sqlmesh.core.context import Context 36from sqlmesh.core.dialect import format_model_expressions, parse 37from sqlmesh.core.model import load_sql_based_model 38from sqlmesh.core.test import ModelTestMetadata 39from sqlmesh.utils import yaml, Verbosity, optional_import 40from sqlmesh.utils.errors import MagicError, MissingContextException, SQLMeshError 41 42logger = logging.getLogger(__name__) 43 44CONTEXT_VARIABLE_NAMES = [ 45 "context", 46 "ctx", 47 "sqlmesh", 48] 49 50 51def pass_sqlmesh_context(func: t.Callable) -> t.Callable: 52 @functools.wraps(func) 53 def wrapper(self: SQLMeshMagics, *args: t.Any, **kwargs: t.Any) -> None: 54 for variable_name in CONTEXT_VARIABLE_NAMES: 55 context = self._shell.user_ns.get(variable_name) 56 if isinstance(context, Context): 57 break 58 else: 59 raise MissingContextException( 60 f"Context must be defined and initialized with one of these names: {', '.join(CONTEXT_VARIABLE_NAMES)}" 61 ) 62 old_console = context.console 63 new_console = create_console(display=self.display) 64 context.console = new_console 65 set_console(new_console) 66 context.refresh() 67 68 magic_name = func.__name__ 69 bound_method = getattr(self, magic_name, None) 70 if bound_method: 71 args_split = arg_split(args[0]) 72 parser = bound_method.parser 73 74 original_parser_actions = deepcopy(parser._actions) 75 original_parser_defaults = parser._defaults 76 77 # Temporarily supress default values, otherwise any missing arg would be set and affect analytics 78 parser._defaults = {} 79 for action in parser._actions: 80 action.default = SUPPRESS 81 82 parsed_args, _ = parser.parse_known_args(args_split, Namespace()) 83 84 parser._actions = original_parser_actions 85 parser._defaults = original_parser_defaults 86 87 command_args = {k for k, v in parsed_args.__dict__.items() if v is not None} 88 analytics.collector.on_magic_command(command_name=magic_name, command_args=command_args) 89 90 func(self, context, *args, **kwargs) 91 92 context.console = old_console 93 set_console(old_console) 94 95 return wrapper 96 97 98def format_arguments(func: t.Callable) -> t.Callable: 99 """Decorator to add common format arguments to magic commands.""" 100 func = argument( 101 "--normalize", 102 action="store_true", 103 help="Whether or not to normalize identifiers to lowercase.", 104 default=None, 105 )(func) 106 func = argument( 107 "--pad", 108 type=int, 109 help="Determines the pad size in a formatted string.", 110 )(func) 111 func = argument( 112 "--indent", 113 type=int, 114 help="Determines the indentation size in a formatted string.", 115 )(func) 116 func = argument( 117 "--normalize-functions", 118 type=str, 119 help="Whether or not to normalize all function names. Possible values are: 'upper', 'lower'", 120 )(func) 121 func = argument( 122 "--leading-comma", 123 action="store_true", 124 help="Determines whether or not the comma is leading or trailing in select expressions. Default is trailing.", 125 default=None, 126 )(func) 127 func = argument( 128 "--max-text-width", 129 type=int, 130 help="The max number of characters in a segment before creating new lines in pretty mode.", 131 )(func) 132 return func 133 134 135@magics_class 136class SQLMeshMagics(Magics): 137 @property 138 def display(self) -> t.Callable: 139 from sqlmesh import RuntimeEnv 140 141 if RuntimeEnv.get().is_databricks: 142 # Use Databricks' special display instead of the normal IPython display 143 return self._shell.user_ns["display"] 144 return display 145 146 @property 147 def _shell(self) -> t.Any: 148 # Make mypy happy. 149 if not self.shell: 150 raise RuntimeError("IPython Magics are in invalid state") 151 return self.shell 152 153 @magic_arguments() 154 @argument( 155 "paths", 156 type=str, 157 nargs="+", 158 default="", 159 help="The path(s) to the SQLMesh project(s).", 160 ) 161 @argument( 162 "--config", 163 type=str, 164 help="Name of the config object. Only applicable to configuration defined using Python script.", 165 ) 166 @argument("--gateway", type=str, help="The name of the gateway.") 167 @argument("--ignore-warnings", action="store_true", help="Ignore warnings.") 168 @argument("--debug", action="store_true", help="Enable debug mode.") 169 @argument("--log-file-dir", type=str, help="The directory to write the log file to.") 170 @argument( 171 "--dotenv", type=str, help="Path to a custom .env file to load environment variables from." 172 ) 173 @line_magic 174 def context(self, line: str) -> None: 175 """Sets the context in the user namespace.""" 176 from sqlmesh import configure_logging, remove_excess_logs 177 178 args = parse_argstring(self.context, line) 179 log_file_dir = args.log_file_dir 180 181 configure_logging( 182 args.debug, 183 log_file_dir=log_file_dir, 184 ignore_warnings=args.ignore_warnings, 185 ) 186 configure_console(ignore_warnings=args.ignore_warnings) 187 188 dotenv_path = Path(args.dotenv) if args.dotenv else None 189 configs = load_configs( 190 args.config, Context.CONFIG_TYPE, args.paths, dotenv_path=dotenv_path 191 ) 192 log_limit = list(configs.values())[0].log_limit 193 194 remove_excess_logs(log_file_dir, log_limit) 195 196 try: 197 context = Context(paths=args.paths, config=configs, gateway=args.gateway) 198 self._shell.user_ns["context"] = context 199 except Exception: 200 if args.debug: 201 logger.exception("Failed to initialize SQLMesh context") 202 raise 203 204 context.console.log_success(f"SQLMesh project context set to: {', '.join(args.paths)}") 205 206 @magic_arguments() 207 @argument("path", type=str, help="The path where the new SQLMesh project should be created.") 208 @argument( 209 "engine", 210 type=str, 211 help=f"Project SQL engine. Supported values: '{', '.join([info[1] for info in sorted(INIT_DISPLAY_INFO_TO_TYPE.values(), key=lambda x: x[0])])}'.", # type: ignore 212 ) 213 @argument( 214 "--template", 215 "-t", 216 type=str, 217 help="Project template. Supported values: dbt, default, empty.", 218 ) 219 @argument( 220 "--dlt-pipeline", 221 type=str, 222 help="DLT pipeline for which to generate a SQLMesh project. Use alongside template: dlt", 223 ) 224 @argument( 225 "--dlt-path", 226 type=str, 227 help="The directory where the DLT pipeline resides. Use alongside template: dlt", 228 ) 229 @line_magic 230 def init(self, line: str) -> None: 231 """Creates a SQLMesh project scaffold with a default SQL dialect.""" 232 args = parse_argstring(self.init, line) 233 try: 234 project_template = ProjectTemplate( 235 args.template.lower() if args.template else "default" 236 ) 237 except ValueError: 238 raise MagicError(f"Invalid project template '{args.template}'") 239 init_example_project( 240 path=args.path, 241 engine_type=args.engine, 242 dialect=None, 243 template=project_template, 244 pipeline=args.dlt_pipeline, 245 dlt_path=args.dlt_path, 246 ) 247 html = str( 248 h( 249 "div", 250 h( 251 "span", 252 {"style": {"color": "green", "font-weight": "bold"}}, 253 "SQLMesh project scaffold created", 254 ), 255 ) 256 ) 257 self.display(JupyterRenderable(html=html, text="")) 258 259 @magic_arguments() 260 @argument("model", type=str, help="The model.") 261 @argument("--start", "-s", type=str, help="Start date to render.") 262 @argument("--end", "-e", type=str, help="End date to render.") 263 @argument("--execution-time", type=str, help="Execution time.") 264 @argument("--dialect", "-d", type=str, help="The rendered dialect.") 265 @line_cell_magic 266 @pass_sqlmesh_context 267 def model(self, context: Context, line: str, sql: t.Optional[str] = None) -> None: 268 """Renders the model and automatically fills in an editable cell with the model definition.""" 269 args = parse_argstring(self.model, line) 270 271 model = context.get_model(args.model, raise_if_missing=True) 272 config = context.config_for_node(model) 273 274 if sql: 275 expressions = parse(sql, default_dialect=config.dialect) 276 loaded = load_sql_based_model( 277 expressions, 278 macros=context._macros, 279 jinja_macros=context._jinja_macros, 280 path=model._path, 281 dialect=config.dialect, 282 time_column_format=config.time_column_format, 283 physical_schema_mapping=context.config.physical_schema_mapping, 284 default_catalog=context.default_catalog, 285 ) 286 287 if loaded.name == args.model: 288 model = loaded 289 else: 290 if model._path: 291 with open(model._path, "r", encoding="utf-8") as file: 292 expressions = parse(file.read(), default_dialect=config.dialect) 293 294 formatted = format_model_expressions( 295 expressions, 296 model.dialect, 297 rewrite_casts=not config.format.no_rewrite_casts, 298 **config.format.generator_options, 299 ) 300 301 self._shell.set_next_input( 302 "\n".join( 303 [ 304 " ".join(["%%model", line]), 305 formatted, 306 ] 307 ), 308 replace=True, 309 ) 310 311 if model._path: 312 with open(model._path, "w", encoding="utf-8") as file: 313 file.write(formatted) 314 315 if sql: 316 context.console.log_success(f"Model `{args.model}` updated") 317 318 context.upsert_model(model) 319 context.console.show_sql( 320 context.render( 321 model.name, 322 start=args.start, 323 end=args.end, 324 execution_time=args.execution_time, 325 ).sql(pretty=True, dialect=args.dialect or model.dialect) 326 ) 327 328 @magic_arguments() 329 @argument("model", type=str, help="The model.") 330 @argument("test_name", type=str, nargs="?", default=None, help="The test name to display") 331 @argument("--ls", action="store_true", help="List tests associated with a model") 332 @line_cell_magic 333 @pass_sqlmesh_context 334 def test(self, context: Context, line: str, test_def_raw: t.Optional[str] = None) -> None: 335 """Allow the user to list tests for a model, output a specific test, and then write their changes back""" 336 args = parse_argstring(self.test, line) 337 if not args.test_name and not args.ls: 338 raise MagicError("Must provide either test name or `--ls` to list tests") 339 340 test_meta = context.select_tests() 341 342 tests: t.Dict[str, t.Dict[str, ModelTestMetadata]] = defaultdict(dict) 343 for model_test_metadata in test_meta: 344 model = model_test_metadata.body.get("model") 345 if not model: 346 context.console.log_error( 347 f"Test found that does not have `model` defined: {model_test_metadata.path}" 348 ) 349 else: 350 tests[model][model_test_metadata.test_name] = model_test_metadata 351 352 model = context.get_model(args.model, raise_if_missing=True) 353 354 if args.ls: 355 # TODO: Provide better UI for displaying tests 356 for test_name in tests[model.name]: 357 context.console.log_status_update(test_name) 358 return 359 360 test = tests[model.name][args.test_name] 361 test_def = yaml.load(test_def_raw) if test_def_raw else test.body 362 test_def_output = yaml.dump(test_def) 363 364 self._shell.set_next_input( 365 "\n".join( 366 [ 367 " ".join(["%%test", line]), 368 test_def_output, 369 ] 370 ), 371 replace=True, 372 ) 373 374 with open(test.path, "r+", encoding="utf-8") as file: 375 content = yaml.load(file.read()) 376 content[args.test_name] = test_def 377 file.seek(0) 378 yaml.dump(content, file) 379 file.truncate() 380 381 @magic_arguments() 382 @argument( 383 "environment", 384 nargs="?", 385 type=str, 386 help="The environment to run the plan against", 387 ) 388 @argument("--start", "-s", type=str, help="Start date to backfill.") 389 @argument("--end", "-e", type=str, help="End date to backfill.") 390 @argument("--execution-time", type=str, help="Execution time.") 391 @argument( 392 "--create-from", 393 type=str, 394 help="The environment to create the target environment from if it doesn't exist. Default: prod.", 395 ) 396 @argument( 397 "--skip-tests", 398 "-t", 399 action="store_true", 400 help="Skip the unit tests defined for the model.", 401 ) 402 @argument( 403 "--skip-linter", 404 action="store_true", 405 help="Skip the linter for the model.", 406 ) 407 @argument( 408 "--restate-model", 409 "-r", 410 type=str, 411 nargs="*", 412 help="Restate data for specified models (and models downstream from the one specified). For production environment, all related model versions will have their intervals wiped, but only the current versions will be backfilled. For development environment, only the current model versions will be affected.", 413 ) 414 @argument( 415 "--no-gaps", 416 "-g", 417 action="store_true", 418 help="Ensure that new snapshots have no data gaps when comparing to existing snapshots for matching models in the target environment.", 419 ) 420 @argument( 421 "--skip-backfill", 422 "--dry-run", 423 action="store_true", 424 help="Skip the backfill step and only create a virtual update for the plan.", 425 ) 426 @argument( 427 "--empty-backfill", 428 action="store_true", 429 help="Produce empty backfill. Like --skip-backfill no models will be backfilled, unlike --skip-backfill missing intervals will be recorded as if they were backfilled.", 430 ) 431 @argument( 432 "--forward-only", 433 action="store_true", 434 help="Create a plan for forward-only changes.", 435 default=None, 436 ) 437 @argument( 438 "--effective-from", 439 type=str, 440 help="The effective date from which to apply forward-only changes on production.", 441 ) 442 @argument( 443 "--no-prompts", 444 action="store_true", 445 help="Disables interactive prompts for the backfill time range. Please note that if this flag is set and there are uncategorized changes, plan creation will fail.", 446 default=None, 447 ) 448 @argument( 449 "--auto-apply", 450 action="store_true", 451 help="Automatically applies the new plan after creation.", 452 default=None, 453 ) 454 @argument( 455 "--no-auto-categorization", 456 action="store_true", 457 help="Disable automatic change categorization.", 458 default=None, 459 ) 460 @argument( 461 "--include-unmodified", 462 action="store_true", 463 help="Include unmodified models in the target environment.", 464 default=None, 465 ) 466 @argument( 467 "--select-model", 468 type=str, 469 nargs="*", 470 help="Select specific model changes that should be included in the plan.", 471 ) 472 @argument( 473 "--backfill-model", 474 type=str, 475 nargs="*", 476 help="Backfill only the models whose names match the expression.", 477 ) 478 @argument( 479 "--no-diff", 480 action="store_true", 481 help="Hide text differences for changed models.", 482 default=None, 483 ) 484 @argument( 485 "--run", 486 action="store_true", 487 help="Run latest intervals as part of the plan application (prod environment only).", 488 ) 489 @argument( 490 "--ignore-cron", 491 action="store_true", 492 help="Run for all missing intervals, ignoring individual cron schedules. Only applies if --run is set.", 493 default=None, 494 ) 495 @argument( 496 "--enable-preview", 497 action="store_true", 498 help="Enable preview for forward-only models when targeting a development environment.", 499 default=None, 500 ) 501 @argument( 502 "--diff-rendered", 503 action="store_true", 504 help="Output text differences for the rendered versions of the models and standalone audits", 505 ) 506 @argument( 507 "--verbose", 508 "-v", 509 action="count", 510 default=0, 511 help="Verbose output. Use -vv for very verbose.", 512 ) 513 @line_magic 514 @pass_sqlmesh_context 515 def plan(self, context: Context, line: str) -> None: 516 """Goes through a set of prompts to both establish a plan and apply it""" 517 args = parse_argstring(self.plan, line) 518 519 setattr(context.console, "verbosity", Verbosity(args.verbose)) 520 521 context.plan( 522 args.environment, 523 start=args.start, 524 end=args.end, 525 execution_time=args.execution_time, 526 create_from=args.create_from, 527 skip_tests=args.skip_tests, 528 restate_models=args.restate_model, 529 backfill_models=args.backfill_model, 530 no_gaps=args.no_gaps, 531 skip_backfill=args.skip_backfill, 532 empty_backfill=args.empty_backfill, 533 forward_only=args.forward_only, 534 no_prompts=args.no_prompts, 535 auto_apply=args.auto_apply, 536 no_auto_categorization=args.no_auto_categorization, 537 effective_from=args.effective_from, 538 include_unmodified=args.include_unmodified, 539 select_models=args.select_model, 540 no_diff=args.no_diff, 541 run=args.run, 542 ignore_cron=args.run, 543 enable_preview=args.enable_preview, 544 diff_rendered=args.diff_rendered, 545 ) 546 547 @magic_arguments() 548 @argument( 549 "environment", 550 nargs="?", 551 type=str, 552 help="The environment to run against", 553 ) 554 @argument("--start", "-s", type=str, help="Start date to evaluate.") 555 @argument("--end", "-e", type=str, help="End date to evaluate.") 556 @argument("--skip-janitor", action="store_true", help="Skip the janitor task.") 557 @argument( 558 "--ignore-cron", 559 action="store_true", 560 help="Run for all missing intervals, ignoring individual cron schedules.", 561 ) 562 @argument( 563 "--select-model", 564 type=str, 565 nargs="*", 566 help="Select specific models to run. Note: this always includes upstream dependencies.", 567 ) 568 @argument( 569 "--exit-on-env-update", 570 type=int, 571 help="If set, the command will exit with the specified code if the run is interrupted by an update to the target environment.", 572 ) 573 @argument( 574 "--no-auto-upstream", 575 action="store_true", 576 help="Do not automatically include upstream models. Only applicable when --select-model is used. Note: this may result in missing / invalid data for the selected models.", 577 ) 578 @line_magic 579 @pass_sqlmesh_context 580 def run_dag(self, context: Context, line: str) -> None: 581 """Evaluate the DAG of models using the built-in scheduler.""" 582 args = parse_argstring(self.run_dag, line) 583 584 completion_status = context.run( 585 args.environment, 586 start=args.start, 587 end=args.end, 588 skip_janitor=args.skip_janitor, 589 ignore_cron=args.ignore_cron, 590 select_models=args.select_model, 591 exit_on_env_update=args.exit_on_env_update, 592 no_auto_upstream=args.no_auto_upstream, 593 ) 594 if completion_status.is_failure: 595 raise SQLMeshError("Error Running DAG. Check logs for details.") 596 597 @magic_arguments() 598 @argument("model", type=str, help="The model.") 599 @argument("--start", "-s", type=str, help="Start date to render.") 600 @argument("--end", "-e", type=str, help="End date to render.") 601 @argument("--execution-time", type=str, help="Execution time.") 602 @argument( 603 "--limit", 604 type=int, 605 help="The number of rows which the query should be limited to.", 606 ) 607 @line_magic 608 @pass_sqlmesh_context 609 def evaluate(self, context: Context, line: str) -> None: 610 """Evaluate a model query and fetches a dataframe.""" 611 context.refresh() 612 613 snowpark = optional_import("snowflake.snowpark") 614 args = parse_argstring(self.evaluate, line) 615 616 df = context.evaluate( 617 args.model, 618 start=args.start, 619 end=args.end, 620 execution_time=args.execution_time, 621 limit=args.limit, 622 ) 623 624 if snowpark and isinstance(df, snowpark.DataFrame): 625 df = df.limit(args.limit or 100).to_pandas() 626 627 self.display(df) 628 629 @magic_arguments() 630 @argument("model", type=str, help="The model.") 631 @argument("--start", "-s", type=str, help="Start date to render.") 632 @argument("--end", "-e", type=str, help="End date to render.") 633 @argument("--execution-time", type=str, help="Execution time.") 634 @argument( 635 "--expand", 636 type=t.Union[bool, t.Iterable[str]], 637 help="Whether or not to use expand materialized models, defaults to False. If True, all referenced models are expanded as raw queries. If a list, only referenced models are expanded as raw queries.", 638 ) 639 @argument("--dialect", type=str, help="SQL dialect to render.") 640 @argument("--no-format", action="store_true", help="Disable fancy formatting of the query.") 641 @format_arguments 642 @line_magic 643 @pass_sqlmesh_context 644 def render(self, context: Context, line: str) -> None: 645 """Renders a model's query, optionally expanding referenced models.""" 646 context.refresh() 647 render_opts = vars(parse_argstring(self.render, line)) 648 model = render_opts.pop("model") 649 dialect = render_opts.pop("dialect", None) 650 651 model = context.get_model(model, raise_if_missing=True) 652 653 query = context.render( 654 model, 655 start=render_opts.pop("start", None), 656 end=render_opts.pop("end", None), 657 execution_time=render_opts.pop("execution_time", None), 658 expand=render_opts.pop("expand", False), 659 ) 660 661 no_format = render_opts.pop("no_format", False) 662 663 format_config = context.config_for_node(model).format 664 format_options = { 665 **format_config.generator_options, 666 **{k: v for k, v in render_opts.items() if v is not None}, 667 } 668 669 sql = query.sql( 670 pretty=True, 671 dialect=context.config.dialect if dialect is None else dialect, 672 **format_options, 673 ) 674 675 if no_format: 676 context.console.log_status_update(sql) 677 else: 678 context.console.show_sql(sql) 679 680 @magic_arguments() 681 @argument( 682 "df_var", 683 default=None, 684 nargs="?", 685 type=str, 686 help="An optional variable name to store the resulting dataframe.", 687 ) 688 @cell_magic 689 @pass_sqlmesh_context 690 def fetchdf(self, context: Context, line: str, sql: str) -> None: 691 """Fetches a dataframe from sql, optionally storing it in a variable.""" 692 args = parse_argstring(self.fetchdf, line) 693 df = context.fetchdf(sql) 694 if args.df_var: 695 self._shell.user_ns[args.df_var] = df 696 self.display(df) 697 698 @magic_arguments() 699 @argument("--file", "-f", type=str, help="An optional file path to write the HTML output to.") 700 @argument( 701 "--select-model", 702 type=str, 703 nargs="*", 704 help="Select specific models to include in the dag.", 705 ) 706 @line_magic 707 @pass_sqlmesh_context 708 def dag(self, context: Context, line: str) -> None: 709 """Displays the HTML DAG.""" 710 args = parse_argstring(self.dag, line) 711 dag = context.get_dag(args.select_model) 712 if args.file: 713 with open(args.file, "w", encoding="utf-8") as file: 714 file.write(str(dag)) 715 # TODO: Have this go through console instead of calling display directly 716 self.display(dag) 717 718 @magic_arguments() 719 @line_magic 720 @pass_sqlmesh_context 721 def migrate(self, context: Context, line: str) -> None: 722 """Migrate SQLMesh to the current running version.""" 723 context.migrate() 724 context.console.log_success("Migration complete") 725 726 @magic_arguments() 727 @argument( 728 "--strict", 729 action="store_true", 730 help="Raise an error if the external model is missing in the database", 731 ) 732 @line_magic 733 @pass_sqlmesh_context 734 def create_external_models(self, context: Context, line: str) -> None: 735 """Create a schema file containing external model schemas.""" 736 args = parse_argstring(self.create_external_models, line) 737 context.create_external_models(strict=args.strict) 738 739 @magic_arguments() 740 @argument( 741 "source_to_target", 742 type=str, 743 metavar="SOURCE:TARGET", 744 help="Source and target in `SOURCE:TARGET` format", 745 ) 746 @argument( 747 "--on", 748 type=str, 749 nargs="*", 750 help="The column to join on. Can be specified multiple times. The model grain will be used if not specified.", 751 ) 752 @argument( 753 "--skip-columns", 754 type=str, 755 nargs="*", 756 help="The column(s) to skip when comparing the source and target table.", 757 ) 758 @argument( 759 "--model", 760 type=str, 761 help="The model to diff against when source and target are environments and not tables.", 762 ) 763 @argument( 764 "--where", 765 type=str, 766 help="An optional where statement to filter results.", 767 ) 768 @argument( 769 "--limit", 770 type=int, 771 default=20, 772 help="The limit of the sample dataframe.", 773 ) 774 @argument( 775 "--show-sample", 776 action="store_true", 777 help="Show a sample of the rows that differ. With many columns, the output can be very wide.", 778 ) 779 @argument( 780 "--decimals", 781 type=int, 782 default=3, 783 help="The number of decimal places to keep when comparing floating point columns. Default: 3", 784 ) 785 @argument( 786 "--select-model", 787 type=str, 788 nargs="*", 789 help="Specify one or more models to data diff. Use wildcards to diff multiple models. Ex: '*' (all models with applied plan diffs), 'demo.model+' (this and downstream models), 'git:feature_branch' (models with direct modifications in this branch only)", 790 ) 791 @argument( 792 "--skip-grain-check", 793 action="store_true", 794 help="Disable the check for a primary key (grain) that is missing or is not unique.", 795 ) 796 @argument( 797 "--warn-grain-check", 798 action="store_true", 799 help="Warn if any selected model is missing a grain, and compute diffs for the remaining models.", 800 ) 801 @argument( 802 "--schema-diff-ignore-case", 803 action="store_true", 804 help="If set, when performing a schema diff the case of column names is ignored when matching between the two schemas. For example, 'col_a' in the source schema and 'COL_A' in the target schema will be treated as the same column.", 805 ) 806 @line_magic 807 @pass_sqlmesh_context 808 def table_diff(self, context: Context, line: str) -> None: 809 """Show the diff between two tables. 810 811 Can either be two tables or two environments and a model. 812 """ 813 args = parse_argstring(self.table_diff, line) 814 source, target = args.source_to_target.split(":") 815 select_models = {args.model} if args.model else args.select_model or None 816 context.table_diff( 817 source=source, 818 target=target, 819 on=args.on, 820 skip_columns=args.skip_columns, 821 select_models=select_models, 822 where=args.where, 823 limit=args.limit, 824 show_sample=args.show_sample, 825 decimals=args.decimals, 826 skip_grain_check=args.skip_grain_check, 827 warn_grain_check=args.warn_grain_check, 828 schema_diff_ignore_case=args.schema_diff_ignore_case, 829 ) 830 831 @magic_arguments() 832 @argument( 833 "model_name", 834 nargs="?", 835 type=str, 836 help="The name of the model to get the table name for.", 837 ) 838 @argument( 839 "--environment", 840 type=str, 841 help="The environment to source the model version from.", 842 ) 843 @argument( 844 "--prod", 845 action="store_true", 846 help="If set, return the name of the physical table that will be used in production for the model version promoted in the target environment.", 847 ) 848 @line_magic 849 @pass_sqlmesh_context 850 def table_name(self, context: Context, line: str) -> None: 851 """Prints the name of the physical table for the given model.""" 852 args = parse_argstring(self.table_name, line) 853 context.console.log_status_update( 854 context.table_name(args.model_name, args.environment, args.prod) 855 ) 856 857 @magic_arguments() 858 @argument( 859 "pipeline", 860 nargs="?", 861 type=str, 862 help="The dlt pipeline to attach for this SQLMesh project.", 863 ) 864 @argument( 865 "--table", 866 "-t", 867 type=str, 868 nargs="*", 869 help="The specific dlt tables to refresh in the SQLMesh models.", 870 ) 871 @argument( 872 "--force", 873 "-f", 874 action="store_true", 875 help="If set, existing models are overwritten with the new DLT tables.", 876 ) 877 @argument( 878 "--dlt-path", 879 type=str, 880 help="The directory where the DLT pipeline resides.", 881 ) 882 @line_magic 883 @pass_sqlmesh_context 884 def dlt_refresh(self, context: Context, line: str) -> None: 885 """Attaches to a DLT pipeline with the option to update specific or all missing tables in the SQLMesh project.""" 886 from sqlmesh.integrations.dlt import generate_dlt_models 887 888 args = parse_argstring(self.dlt_refresh, line) 889 sqlmesh_models = generate_dlt_models( 890 context, args.pipeline, list(args.table or []), args.force, args.dlt_path 891 ) 892 if sqlmesh_models: 893 model_names = "\n".join([f"- {model_name}" for model_name in sqlmesh_models]) 894 context.console.log_success(f"Updated SQLMesh project with models:\n{model_names}") 895 else: 896 context.console.log_success("All SQLMesh models are up to date.") 897 898 @magic_arguments() 899 @argument( 900 "--read", 901 type=str, 902 default="", 903 help="The input dialect of the sql string.", 904 ) 905 @argument( 906 "--write", 907 type=str, 908 default="", 909 help="The output dialect of the sql string.", 910 ) 911 @line_cell_magic 912 @pass_sqlmesh_context 913 def rewrite(self, context: Context, line: str, sql: str) -> None: 914 """Rewrite a sql expression with semantic references into an executable query. 915 916 https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/ 917 """ 918 args = parse_argstring(self.rewrite, line) 919 context.console.show_sql( 920 context.rewrite(sql, args.read).sql( 921 dialect=args.write or context.config.dialect, pretty=True 922 ) 923 ) 924 925 @magic_arguments() 926 @argument( 927 "--transpile", 928 "-t", 929 type=str, 930 help="Transpile project models to the specified dialect.", 931 ) 932 @argument( 933 "--check", 934 action="store_true", 935 help="Whether or not to check formatting (but not actually format anything).", 936 default=None, 937 ) 938 @argument( 939 "--append-newline", 940 action="store_true", 941 help="Include a newline at the end of the output.", 942 default=None, 943 ) 944 @argument( 945 "--no-rewrite-casts", 946 action="store_true", 947 help="Preserve the existing casts, without rewriting them to use the :: syntax.", 948 default=None, 949 ) 950 @format_arguments 951 @line_magic 952 @pass_sqlmesh_context 953 def format(self, context: Context, line: str) -> bool: 954 """Format all SQL models and audits.""" 955 format_opts = vars(parse_argstring(self.format, line)) 956 if format_opts.pop("no_rewrite_casts", None): 957 format_opts["rewrite_casts"] = False 958 959 return context.format(**{k: v for k, v in format_opts.items() if v is not None}) 960 961 @magic_arguments() 962 @argument("environment", type=str, help="The environment to diff local state against.") 963 @line_magic 964 @pass_sqlmesh_context 965 def diff(self, context: Context, line: str) -> None: 966 """Show the diff between the local state and the target environment.""" 967 args = parse_argstring(self.diff, line) 968 context.diff(args.environment) 969 970 @magic_arguments() 971 @argument("environment", type=str, help="The environment to invalidate.") 972 @line_magic 973 @pass_sqlmesh_context 974 def invalidate(self, context: Context, line: str) -> None: 975 """Invalidate the target environment, forcing its removal during the next run of the janitor process.""" 976 args = parse_argstring(self.invalidate, line) 977 context.invalidate_environment(args.environment) 978 979 @magic_arguments() 980 @argument( 981 "--ignore-ttl", 982 action="store_true", 983 help="Cleanup snapshots that are not referenced in any environment, regardless of when they're set to expire", 984 ) 985 @line_magic 986 @pass_sqlmesh_context 987 def janitor(self, context: Context, line: str) -> None: 988 """Run the janitor process to clean up old environments and expired snapshots.""" 989 args = parse_argstring(self.janitor, line) 990 context.run_janitor(ignore_ttl=args.ignore_ttl) 991 992 @magic_arguments() 993 @argument("model", type=str) 994 @argument( 995 "--query", 996 "-q", 997 type=str, 998 nargs="+", 999 default=[], 1000 help="Queries that will be used to generate data for the model's dependencies.", 1001 ) 1002 @argument( 1003 "--overwrite", 1004 "-o", 1005 action="store_true", 1006 help="When true, the fixture file will be overwritten in case it already exists.", 1007 ) 1008 @argument( 1009 "--var", 1010 "-v", 1011 type=str, 1012 nargs="+", 1013 help="Key-value pairs that will define variables needed by the model.", 1014 ) 1015 @argument( 1016 "--path", 1017 "-p", 1018 type=str, 1019 help="The file path corresponding to the fixture, relative to the test directory. " 1020 "By default, the fixture will be created under the test directory and the file " 1021 "name will be inferred based on the test's name.", 1022 ) 1023 @argument( 1024 "--name", 1025 "-n", 1026 type=str, 1027 help="The name of the test that will be created. By default, it's inferred based on the model's name.", 1028 ) 1029 @argument( 1030 "--include-ctes", 1031 action="store_true", 1032 help="When true, CTE fixtures will also be generated.", 1033 ) 1034 @line_magic 1035 @pass_sqlmesh_context 1036 def create_test(self, context: Context, line: str) -> None: 1037 """Generate a unit test fixture for a given model.""" 1038 args = parse_argstring(self.create_test, line) 1039 queries = iter(args.query) 1040 variables = iter(args.var) if args.var else None 1041 context.create_test( 1042 args.model, 1043 input_queries={k: v.strip('"') for k, v in dict(zip(queries, queries)).items()}, 1044 overwrite=args.overwrite, 1045 variables=dict(zip(variables, variables)) if variables else None, 1046 path=args.path, 1047 name=args.name, 1048 include_ctes=args.include_ctes, 1049 ) 1050 1051 @magic_arguments() 1052 @argument("tests", nargs="*", type=str) 1053 @argument( 1054 "--pattern", 1055 "-k", 1056 nargs="*", 1057 type=str, 1058 help="Only run tests that match the pattern of substring.", 1059 ) 1060 @argument( 1061 "--verbose", 1062 "-v", 1063 action="count", 1064 default=0, 1065 help="Verbose output. Use -vv for very verbose.", 1066 ) 1067 @argument( 1068 "--preserve-fixtures", 1069 action="store_true", 1070 help="Preserve the fixture tables in the testing database, useful for debugging.", 1071 ) 1072 @line_magic 1073 @pass_sqlmesh_context 1074 def run_test(self, context: Context, line: str) -> None: 1075 """Run unit test(s).""" 1076 args = parse_argstring(self.run_test, line) 1077 1078 context.test( 1079 match_patterns=args.pattern, 1080 tests=args.tests, 1081 verbosity=Verbosity(args.verbose), 1082 preserve_fixtures=args.preserve_fixtures, 1083 stream=StringIO(), # consume the output instead of redirecting to stdout 1084 ) 1085 1086 @magic_arguments() 1087 @argument( 1088 "models", type=str, nargs="*", help="A model to audit. Multiple models can be audited." 1089 ) 1090 @argument("--start", "-s", type=str, help="Start date to audit.") 1091 @argument("--end", "-e", type=str, help="End date to audit.") 1092 @argument("--execution-time", type=str, help="Execution time.") 1093 @line_magic 1094 @pass_sqlmesh_context 1095 def audit(self, context: Context, line: str) -> bool: 1096 """Run audit(s)""" 1097 args = parse_argstring(self.audit, line) 1098 return context.audit( 1099 models=args.models, start=args.start, end=args.end, execution_time=args.execution_time 1100 ) 1101 1102 @magic_arguments() 1103 @argument("environment", nargs="?", type=str, help="The environment to check intervals for.") 1104 @argument( 1105 "--no-signals", 1106 action="store_true", 1107 help="Disable signal checks and only show missing intervals.", 1108 default=False, 1109 ) 1110 @argument( 1111 "--select-model", 1112 type=str, 1113 nargs="*", 1114 help="Select specific model changes that should be included in the plan.", 1115 ) 1116 @argument("--start", "-s", type=str, help="Start date of intervals to check for.") 1117 @argument("--end", "-e", type=str, help="End date of intervals to check for.") 1118 @line_magic 1119 @pass_sqlmesh_context 1120 def check_intervals(self, context: Context, line: str) -> None: 1121 """Show missing intervals in an environment, respecting signals.""" 1122 args = parse_argstring(self.check_intervals, line) 1123 1124 context.console.show_intervals( 1125 context.check_intervals( 1126 environment=args.environment, 1127 no_signals=args.no_signals, 1128 select_models=args.select_model, 1129 start=args.start, 1130 end=args.end, 1131 ) 1132 ) 1133 1134 @magic_arguments() 1135 @argument( 1136 "--skip-connection", 1137 action="store_true", 1138 help="Skip the connection test.", 1139 default=False, 1140 ) 1141 @argument( 1142 "--verbose", 1143 "-v", 1144 action="count", 1145 default=0, 1146 help="Verbose output. Use -vv for very verbose.", 1147 ) 1148 @line_magic 1149 @pass_sqlmesh_context 1150 def info(self, context: Context, line: str) -> None: 1151 """Display SQLMesh project information.""" 1152 args = parse_argstring(self.info, line) 1153 context.print_info(skip_connection=args.skip_connection, verbosity=Verbosity(args.verbose)) 1154 1155 @magic_arguments() 1156 @line_magic 1157 @pass_sqlmesh_context 1158 def rollback(self, context: Context, line: str) -> None: 1159 """Rollback SQLMesh to the previous migration.""" 1160 context.rollback() 1161 1162 @magic_arguments() 1163 @line_magic 1164 @pass_sqlmesh_context 1165 def clean(self, context: Context, line: str) -> None: 1166 """Clears the SQLMesh cache and any build artifacts.""" 1167 context.clear_caches() 1168 context.console.log_success("SQLMesh cache and build artifacts cleared") 1169 1170 @magic_arguments() 1171 @line_magic 1172 @pass_sqlmesh_context 1173 def environments(self, context: Context, line: str) -> None: 1174 """Prints the list of SQLMesh environments with its expiry datetime.""" 1175 context.print_environment_names() 1176 1177 @magic_arguments() 1178 @argument( 1179 "--models", 1180 "--model", 1181 type=str, 1182 nargs="*", 1183 help="A model to lint. Multiple models can be linted. If no models are specified, every model will be linted.", 1184 ) 1185 @line_magic 1186 @pass_sqlmesh_context 1187 def lint(self, context: Context, line: str) -> None: 1188 """Run linter for target model(s)""" 1189 args = parse_argstring(self.lint, line) 1190 context.lint_models(args.models) 1191 1192 @magic_arguments() 1193 @line_magic 1194 @pass_sqlmesh_context 1195 def destroy(self, context: Context, line: str) -> None: 1196 """Removes all project resources, engine-managed objects, state tables and clears the SQLMesh cache.""" 1197 context.destroy() 1198 1199 1200def register_magics() -> None: 1201 try: 1202 shell = get_ipython() # type: ignore 1203 shell.register_magics(SQLMeshMagics) 1204 except NameError: 1205 pass
52def pass_sqlmesh_context(func: t.Callable) -> t.Callable: 53 @functools.wraps(func) 54 def wrapper(self: SQLMeshMagics, *args: t.Any, **kwargs: t.Any) -> None: 55 for variable_name in CONTEXT_VARIABLE_NAMES: 56 context = self._shell.user_ns.get(variable_name) 57 if isinstance(context, Context): 58 break 59 else: 60 raise MissingContextException( 61 f"Context must be defined and initialized with one of these names: {', '.join(CONTEXT_VARIABLE_NAMES)}" 62 ) 63 old_console = context.console 64 new_console = create_console(display=self.display) 65 context.console = new_console 66 set_console(new_console) 67 context.refresh() 68 69 magic_name = func.__name__ 70 bound_method = getattr(self, magic_name, None) 71 if bound_method: 72 args_split = arg_split(args[0]) 73 parser = bound_method.parser 74 75 original_parser_actions = deepcopy(parser._actions) 76 original_parser_defaults = parser._defaults 77 78 # Temporarily supress default values, otherwise any missing arg would be set and affect analytics 79 parser._defaults = {} 80 for action in parser._actions: 81 action.default = SUPPRESS 82 83 parsed_args, _ = parser.parse_known_args(args_split, Namespace()) 84 85 parser._actions = original_parser_actions 86 parser._defaults = original_parser_defaults 87 88 command_args = {k for k, v in parsed_args.__dict__.items() if v is not None} 89 analytics.collector.on_magic_command(command_name=magic_name, command_args=command_args) 90 91 func(self, context, *args, **kwargs) 92 93 context.console = old_console 94 set_console(old_console) 95 96 return wrapper
99def format_arguments(func: t.Callable) -> t.Callable: 100 """Decorator to add common format arguments to magic commands.""" 101 func = argument( 102 "--normalize", 103 action="store_true", 104 help="Whether or not to normalize identifiers to lowercase.", 105 default=None, 106 )(func) 107 func = argument( 108 "--pad", 109 type=int, 110 help="Determines the pad size in a formatted string.", 111 )(func) 112 func = argument( 113 "--indent", 114 type=int, 115 help="Determines the indentation size in a formatted string.", 116 )(func) 117 func = argument( 118 "--normalize-functions", 119 type=str, 120 help="Whether or not to normalize all function names. Possible values are: 'upper', 'lower'", 121 )(func) 122 func = argument( 123 "--leading-comma", 124 action="store_true", 125 help="Determines whether or not the comma is leading or trailing in select expressions. Default is trailing.", 126 default=None, 127 )(func) 128 func = argument( 129 "--max-text-width", 130 type=int, 131 help="The max number of characters in a segment before creating new lines in pretty mode.", 132 )(func) 133 return func
Decorator to add common format arguments to magic commands.
136@magics_class 137class SQLMeshMagics(Magics): 138 @property 139 def display(self) -> t.Callable: 140 from sqlmesh import RuntimeEnv 141 142 if RuntimeEnv.get().is_databricks: 143 # Use Databricks' special display instead of the normal IPython display 144 return self._shell.user_ns["display"] 145 return display 146 147 @property 148 def _shell(self) -> t.Any: 149 # Make mypy happy. 150 if not self.shell: 151 raise RuntimeError("IPython Magics are in invalid state") 152 return self.shell 153 154 @magic_arguments() 155 @argument( 156 "paths", 157 type=str, 158 nargs="+", 159 default="", 160 help="The path(s) to the SQLMesh project(s).", 161 ) 162 @argument( 163 "--config", 164 type=str, 165 help="Name of the config object. Only applicable to configuration defined using Python script.", 166 ) 167 @argument("--gateway", type=str, help="The name of the gateway.") 168 @argument("--ignore-warnings", action="store_true", help="Ignore warnings.") 169 @argument("--debug", action="store_true", help="Enable debug mode.") 170 @argument("--log-file-dir", type=str, help="The directory to write the log file to.") 171 @argument( 172 "--dotenv", type=str, help="Path to a custom .env file to load environment variables from." 173 ) 174 @line_magic 175 def context(self, line: str) -> None: 176 """Sets the context in the user namespace.""" 177 from sqlmesh import configure_logging, remove_excess_logs 178 179 args = parse_argstring(self.context, line) 180 log_file_dir = args.log_file_dir 181 182 configure_logging( 183 args.debug, 184 log_file_dir=log_file_dir, 185 ignore_warnings=args.ignore_warnings, 186 ) 187 configure_console(ignore_warnings=args.ignore_warnings) 188 189 dotenv_path = Path(args.dotenv) if args.dotenv else None 190 configs = load_configs( 191 args.config, Context.CONFIG_TYPE, args.paths, dotenv_path=dotenv_path 192 ) 193 log_limit = list(configs.values())[0].log_limit 194 195 remove_excess_logs(log_file_dir, log_limit) 196 197 try: 198 context = Context(paths=args.paths, config=configs, gateway=args.gateway) 199 self._shell.user_ns["context"] = context 200 except Exception: 201 if args.debug: 202 logger.exception("Failed to initialize SQLMesh context") 203 raise 204 205 context.console.log_success(f"SQLMesh project context set to: {', '.join(args.paths)}") 206 207 @magic_arguments() 208 @argument("path", type=str, help="The path where the new SQLMesh project should be created.") 209 @argument( 210 "engine", 211 type=str, 212 help=f"Project SQL engine. Supported values: '{', '.join([info[1] for info in sorted(INIT_DISPLAY_INFO_TO_TYPE.values(), key=lambda x: x[0])])}'.", # type: ignore 213 ) 214 @argument( 215 "--template", 216 "-t", 217 type=str, 218 help="Project template. Supported values: dbt, default, empty.", 219 ) 220 @argument( 221 "--dlt-pipeline", 222 type=str, 223 help="DLT pipeline for which to generate a SQLMesh project. Use alongside template: dlt", 224 ) 225 @argument( 226 "--dlt-path", 227 type=str, 228 help="The directory where the DLT pipeline resides. Use alongside template: dlt", 229 ) 230 @line_magic 231 def init(self, line: str) -> None: 232 """Creates a SQLMesh project scaffold with a default SQL dialect.""" 233 args = parse_argstring(self.init, line) 234 try: 235 project_template = ProjectTemplate( 236 args.template.lower() if args.template else "default" 237 ) 238 except ValueError: 239 raise MagicError(f"Invalid project template '{args.template}'") 240 init_example_project( 241 path=args.path, 242 engine_type=args.engine, 243 dialect=None, 244 template=project_template, 245 pipeline=args.dlt_pipeline, 246 dlt_path=args.dlt_path, 247 ) 248 html = str( 249 h( 250 "div", 251 h( 252 "span", 253 {"style": {"color": "green", "font-weight": "bold"}}, 254 "SQLMesh project scaffold created", 255 ), 256 ) 257 ) 258 self.display(JupyterRenderable(html=html, text="")) 259 260 @magic_arguments() 261 @argument("model", type=str, help="The model.") 262 @argument("--start", "-s", type=str, help="Start date to render.") 263 @argument("--end", "-e", type=str, help="End date to render.") 264 @argument("--execution-time", type=str, help="Execution time.") 265 @argument("--dialect", "-d", type=str, help="The rendered dialect.") 266 @line_cell_magic 267 @pass_sqlmesh_context 268 def model(self, context: Context, line: str, sql: t.Optional[str] = None) -> None: 269 """Renders the model and automatically fills in an editable cell with the model definition.""" 270 args = parse_argstring(self.model, line) 271 272 model = context.get_model(args.model, raise_if_missing=True) 273 config = context.config_for_node(model) 274 275 if sql: 276 expressions = parse(sql, default_dialect=config.dialect) 277 loaded = load_sql_based_model( 278 expressions, 279 macros=context._macros, 280 jinja_macros=context._jinja_macros, 281 path=model._path, 282 dialect=config.dialect, 283 time_column_format=config.time_column_format, 284 physical_schema_mapping=context.config.physical_schema_mapping, 285 default_catalog=context.default_catalog, 286 ) 287 288 if loaded.name == args.model: 289 model = loaded 290 else: 291 if model._path: 292 with open(model._path, "r", encoding="utf-8") as file: 293 expressions = parse(file.read(), default_dialect=config.dialect) 294 295 formatted = format_model_expressions( 296 expressions, 297 model.dialect, 298 rewrite_casts=not config.format.no_rewrite_casts, 299 **config.format.generator_options, 300 ) 301 302 self._shell.set_next_input( 303 "\n".join( 304 [ 305 " ".join(["%%model", line]), 306 formatted, 307 ] 308 ), 309 replace=True, 310 ) 311 312 if model._path: 313 with open(model._path, "w", encoding="utf-8") as file: 314 file.write(formatted) 315 316 if sql: 317 context.console.log_success(f"Model `{args.model}` updated") 318 319 context.upsert_model(model) 320 context.console.show_sql( 321 context.render( 322 model.name, 323 start=args.start, 324 end=args.end, 325 execution_time=args.execution_time, 326 ).sql(pretty=True, dialect=args.dialect or model.dialect) 327 ) 328 329 @magic_arguments() 330 @argument("model", type=str, help="The model.") 331 @argument("test_name", type=str, nargs="?", default=None, help="The test name to display") 332 @argument("--ls", action="store_true", help="List tests associated with a model") 333 @line_cell_magic 334 @pass_sqlmesh_context 335 def test(self, context: Context, line: str, test_def_raw: t.Optional[str] = None) -> None: 336 """Allow the user to list tests for a model, output a specific test, and then write their changes back""" 337 args = parse_argstring(self.test, line) 338 if not args.test_name and not args.ls: 339 raise MagicError("Must provide either test name or `--ls` to list tests") 340 341 test_meta = context.select_tests() 342 343 tests: t.Dict[str, t.Dict[str, ModelTestMetadata]] = defaultdict(dict) 344 for model_test_metadata in test_meta: 345 model = model_test_metadata.body.get("model") 346 if not model: 347 context.console.log_error( 348 f"Test found that does not have `model` defined: {model_test_metadata.path}" 349 ) 350 else: 351 tests[model][model_test_metadata.test_name] = model_test_metadata 352 353 model = context.get_model(args.model, raise_if_missing=True) 354 355 if args.ls: 356 # TODO: Provide better UI for displaying tests 357 for test_name in tests[model.name]: 358 context.console.log_status_update(test_name) 359 return 360 361 test = tests[model.name][args.test_name] 362 test_def = yaml.load(test_def_raw) if test_def_raw else test.body 363 test_def_output = yaml.dump(test_def) 364 365 self._shell.set_next_input( 366 "\n".join( 367 [ 368 " ".join(["%%test", line]), 369 test_def_output, 370 ] 371 ), 372 replace=True, 373 ) 374 375 with open(test.path, "r+", encoding="utf-8") as file: 376 content = yaml.load(file.read()) 377 content[args.test_name] = test_def 378 file.seek(0) 379 yaml.dump(content, file) 380 file.truncate() 381 382 @magic_arguments() 383 @argument( 384 "environment", 385 nargs="?", 386 type=str, 387 help="The environment to run the plan against", 388 ) 389 @argument("--start", "-s", type=str, help="Start date to backfill.") 390 @argument("--end", "-e", type=str, help="End date to backfill.") 391 @argument("--execution-time", type=str, help="Execution time.") 392 @argument( 393 "--create-from", 394 type=str, 395 help="The environment to create the target environment from if it doesn't exist. Default: prod.", 396 ) 397 @argument( 398 "--skip-tests", 399 "-t", 400 action="store_true", 401 help="Skip the unit tests defined for the model.", 402 ) 403 @argument( 404 "--skip-linter", 405 action="store_true", 406 help="Skip the linter for the model.", 407 ) 408 @argument( 409 "--restate-model", 410 "-r", 411 type=str, 412 nargs="*", 413 help="Restate data for specified models (and models downstream from the one specified). For production environment, all related model versions will have their intervals wiped, but only the current versions will be backfilled. For development environment, only the current model versions will be affected.", 414 ) 415 @argument( 416 "--no-gaps", 417 "-g", 418 action="store_true", 419 help="Ensure that new snapshots have no data gaps when comparing to existing snapshots for matching models in the target environment.", 420 ) 421 @argument( 422 "--skip-backfill", 423 "--dry-run", 424 action="store_true", 425 help="Skip the backfill step and only create a virtual update for the plan.", 426 ) 427 @argument( 428 "--empty-backfill", 429 action="store_true", 430 help="Produce empty backfill. Like --skip-backfill no models will be backfilled, unlike --skip-backfill missing intervals will be recorded as if they were backfilled.", 431 ) 432 @argument( 433 "--forward-only", 434 action="store_true", 435 help="Create a plan for forward-only changes.", 436 default=None, 437 ) 438 @argument( 439 "--effective-from", 440 type=str, 441 help="The effective date from which to apply forward-only changes on production.", 442 ) 443 @argument( 444 "--no-prompts", 445 action="store_true", 446 help="Disables interactive prompts for the backfill time range. Please note that if this flag is set and there are uncategorized changes, plan creation will fail.", 447 default=None, 448 ) 449 @argument( 450 "--auto-apply", 451 action="store_true", 452 help="Automatically applies the new plan after creation.", 453 default=None, 454 ) 455 @argument( 456 "--no-auto-categorization", 457 action="store_true", 458 help="Disable automatic change categorization.", 459 default=None, 460 ) 461 @argument( 462 "--include-unmodified", 463 action="store_true", 464 help="Include unmodified models in the target environment.", 465 default=None, 466 ) 467 @argument( 468 "--select-model", 469 type=str, 470 nargs="*", 471 help="Select specific model changes that should be included in the plan.", 472 ) 473 @argument( 474 "--backfill-model", 475 type=str, 476 nargs="*", 477 help="Backfill only the models whose names match the expression.", 478 ) 479 @argument( 480 "--no-diff", 481 action="store_true", 482 help="Hide text differences for changed models.", 483 default=None, 484 ) 485 @argument( 486 "--run", 487 action="store_true", 488 help="Run latest intervals as part of the plan application (prod environment only).", 489 ) 490 @argument( 491 "--ignore-cron", 492 action="store_true", 493 help="Run for all missing intervals, ignoring individual cron schedules. Only applies if --run is set.", 494 default=None, 495 ) 496 @argument( 497 "--enable-preview", 498 action="store_true", 499 help="Enable preview for forward-only models when targeting a development environment.", 500 default=None, 501 ) 502 @argument( 503 "--diff-rendered", 504 action="store_true", 505 help="Output text differences for the rendered versions of the models and standalone audits", 506 ) 507 @argument( 508 "--verbose", 509 "-v", 510 action="count", 511 default=0, 512 help="Verbose output. Use -vv for very verbose.", 513 ) 514 @line_magic 515 @pass_sqlmesh_context 516 def plan(self, context: Context, line: str) -> None: 517 """Goes through a set of prompts to both establish a plan and apply it""" 518 args = parse_argstring(self.plan, line) 519 520 setattr(context.console, "verbosity", Verbosity(args.verbose)) 521 522 context.plan( 523 args.environment, 524 start=args.start, 525 end=args.end, 526 execution_time=args.execution_time, 527 create_from=args.create_from, 528 skip_tests=args.skip_tests, 529 restate_models=args.restate_model, 530 backfill_models=args.backfill_model, 531 no_gaps=args.no_gaps, 532 skip_backfill=args.skip_backfill, 533 empty_backfill=args.empty_backfill, 534 forward_only=args.forward_only, 535 no_prompts=args.no_prompts, 536 auto_apply=args.auto_apply, 537 no_auto_categorization=args.no_auto_categorization, 538 effective_from=args.effective_from, 539 include_unmodified=args.include_unmodified, 540 select_models=args.select_model, 541 no_diff=args.no_diff, 542 run=args.run, 543 ignore_cron=args.run, 544 enable_preview=args.enable_preview, 545 diff_rendered=args.diff_rendered, 546 ) 547 548 @magic_arguments() 549 @argument( 550 "environment", 551 nargs="?", 552 type=str, 553 help="The environment to run against", 554 ) 555 @argument("--start", "-s", type=str, help="Start date to evaluate.") 556 @argument("--end", "-e", type=str, help="End date to evaluate.") 557 @argument("--skip-janitor", action="store_true", help="Skip the janitor task.") 558 @argument( 559 "--ignore-cron", 560 action="store_true", 561 help="Run for all missing intervals, ignoring individual cron schedules.", 562 ) 563 @argument( 564 "--select-model", 565 type=str, 566 nargs="*", 567 help="Select specific models to run. Note: this always includes upstream dependencies.", 568 ) 569 @argument( 570 "--exit-on-env-update", 571 type=int, 572 help="If set, the command will exit with the specified code if the run is interrupted by an update to the target environment.", 573 ) 574 @argument( 575 "--no-auto-upstream", 576 action="store_true", 577 help="Do not automatically include upstream models. Only applicable when --select-model is used. Note: this may result in missing / invalid data for the selected models.", 578 ) 579 @line_magic 580 @pass_sqlmesh_context 581 def run_dag(self, context: Context, line: str) -> None: 582 """Evaluate the DAG of models using the built-in scheduler.""" 583 args = parse_argstring(self.run_dag, line) 584 585 completion_status = context.run( 586 args.environment, 587 start=args.start, 588 end=args.end, 589 skip_janitor=args.skip_janitor, 590 ignore_cron=args.ignore_cron, 591 select_models=args.select_model, 592 exit_on_env_update=args.exit_on_env_update, 593 no_auto_upstream=args.no_auto_upstream, 594 ) 595 if completion_status.is_failure: 596 raise SQLMeshError("Error Running DAG. Check logs for details.") 597 598 @magic_arguments() 599 @argument("model", type=str, help="The model.") 600 @argument("--start", "-s", type=str, help="Start date to render.") 601 @argument("--end", "-e", type=str, help="End date to render.") 602 @argument("--execution-time", type=str, help="Execution time.") 603 @argument( 604 "--limit", 605 type=int, 606 help="The number of rows which the query should be limited to.", 607 ) 608 @line_magic 609 @pass_sqlmesh_context 610 def evaluate(self, context: Context, line: str) -> None: 611 """Evaluate a model query and fetches a dataframe.""" 612 context.refresh() 613 614 snowpark = optional_import("snowflake.snowpark") 615 args = parse_argstring(self.evaluate, line) 616 617 df = context.evaluate( 618 args.model, 619 start=args.start, 620 end=args.end, 621 execution_time=args.execution_time, 622 limit=args.limit, 623 ) 624 625 if snowpark and isinstance(df, snowpark.DataFrame): 626 df = df.limit(args.limit or 100).to_pandas() 627 628 self.display(df) 629 630 @magic_arguments() 631 @argument("model", type=str, help="The model.") 632 @argument("--start", "-s", type=str, help="Start date to render.") 633 @argument("--end", "-e", type=str, help="End date to render.") 634 @argument("--execution-time", type=str, help="Execution time.") 635 @argument( 636 "--expand", 637 type=t.Union[bool, t.Iterable[str]], 638 help="Whether or not to use expand materialized models, defaults to False. If True, all referenced models are expanded as raw queries. If a list, only referenced models are expanded as raw queries.", 639 ) 640 @argument("--dialect", type=str, help="SQL dialect to render.") 641 @argument("--no-format", action="store_true", help="Disable fancy formatting of the query.") 642 @format_arguments 643 @line_magic 644 @pass_sqlmesh_context 645 def render(self, context: Context, line: str) -> None: 646 """Renders a model's query, optionally expanding referenced models.""" 647 context.refresh() 648 render_opts = vars(parse_argstring(self.render, line)) 649 model = render_opts.pop("model") 650 dialect = render_opts.pop("dialect", None) 651 652 model = context.get_model(model, raise_if_missing=True) 653 654 query = context.render( 655 model, 656 start=render_opts.pop("start", None), 657 end=render_opts.pop("end", None), 658 execution_time=render_opts.pop("execution_time", None), 659 expand=render_opts.pop("expand", False), 660 ) 661 662 no_format = render_opts.pop("no_format", False) 663 664 format_config = context.config_for_node(model).format 665 format_options = { 666 **format_config.generator_options, 667 **{k: v for k, v in render_opts.items() if v is not None}, 668 } 669 670 sql = query.sql( 671 pretty=True, 672 dialect=context.config.dialect if dialect is None else dialect, 673 **format_options, 674 ) 675 676 if no_format: 677 context.console.log_status_update(sql) 678 else: 679 context.console.show_sql(sql) 680 681 @magic_arguments() 682 @argument( 683 "df_var", 684 default=None, 685 nargs="?", 686 type=str, 687 help="An optional variable name to store the resulting dataframe.", 688 ) 689 @cell_magic 690 @pass_sqlmesh_context 691 def fetchdf(self, context: Context, line: str, sql: str) -> None: 692 """Fetches a dataframe from sql, optionally storing it in a variable.""" 693 args = parse_argstring(self.fetchdf, line) 694 df = context.fetchdf(sql) 695 if args.df_var: 696 self._shell.user_ns[args.df_var] = df 697 self.display(df) 698 699 @magic_arguments() 700 @argument("--file", "-f", type=str, help="An optional file path to write the HTML output to.") 701 @argument( 702 "--select-model", 703 type=str, 704 nargs="*", 705 help="Select specific models to include in the dag.", 706 ) 707 @line_magic 708 @pass_sqlmesh_context 709 def dag(self, context: Context, line: str) -> None: 710 """Displays the HTML DAG.""" 711 args = parse_argstring(self.dag, line) 712 dag = context.get_dag(args.select_model) 713 if args.file: 714 with open(args.file, "w", encoding="utf-8") as file: 715 file.write(str(dag)) 716 # TODO: Have this go through console instead of calling display directly 717 self.display(dag) 718 719 @magic_arguments() 720 @line_magic 721 @pass_sqlmesh_context 722 def migrate(self, context: Context, line: str) -> None: 723 """Migrate SQLMesh to the current running version.""" 724 context.migrate() 725 context.console.log_success("Migration complete") 726 727 @magic_arguments() 728 @argument( 729 "--strict", 730 action="store_true", 731 help="Raise an error if the external model is missing in the database", 732 ) 733 @line_magic 734 @pass_sqlmesh_context 735 def create_external_models(self, context: Context, line: str) -> None: 736 """Create a schema file containing external model schemas.""" 737 args = parse_argstring(self.create_external_models, line) 738 context.create_external_models(strict=args.strict) 739 740 @magic_arguments() 741 @argument( 742 "source_to_target", 743 type=str, 744 metavar="SOURCE:TARGET", 745 help="Source and target in `SOURCE:TARGET` format", 746 ) 747 @argument( 748 "--on", 749 type=str, 750 nargs="*", 751 help="The column to join on. Can be specified multiple times. The model grain will be used if not specified.", 752 ) 753 @argument( 754 "--skip-columns", 755 type=str, 756 nargs="*", 757 help="The column(s) to skip when comparing the source and target table.", 758 ) 759 @argument( 760 "--model", 761 type=str, 762 help="The model to diff against when source and target are environments and not tables.", 763 ) 764 @argument( 765 "--where", 766 type=str, 767 help="An optional where statement to filter results.", 768 ) 769 @argument( 770 "--limit", 771 type=int, 772 default=20, 773 help="The limit of the sample dataframe.", 774 ) 775 @argument( 776 "--show-sample", 777 action="store_true", 778 help="Show a sample of the rows that differ. With many columns, the output can be very wide.", 779 ) 780 @argument( 781 "--decimals", 782 type=int, 783 default=3, 784 help="The number of decimal places to keep when comparing floating point columns. Default: 3", 785 ) 786 @argument( 787 "--select-model", 788 type=str, 789 nargs="*", 790 help="Specify one or more models to data diff. Use wildcards to diff multiple models. Ex: '*' (all models with applied plan diffs), 'demo.model+' (this and downstream models), 'git:feature_branch' (models with direct modifications in this branch only)", 791 ) 792 @argument( 793 "--skip-grain-check", 794 action="store_true", 795 help="Disable the check for a primary key (grain) that is missing or is not unique.", 796 ) 797 @argument( 798 "--warn-grain-check", 799 action="store_true", 800 help="Warn if any selected model is missing a grain, and compute diffs for the remaining models.", 801 ) 802 @argument( 803 "--schema-diff-ignore-case", 804 action="store_true", 805 help="If set, when performing a schema diff the case of column names is ignored when matching between the two schemas. For example, 'col_a' in the source schema and 'COL_A' in the target schema will be treated as the same column.", 806 ) 807 @line_magic 808 @pass_sqlmesh_context 809 def table_diff(self, context: Context, line: str) -> None: 810 """Show the diff between two tables. 811 812 Can either be two tables or two environments and a model. 813 """ 814 args = parse_argstring(self.table_diff, line) 815 source, target = args.source_to_target.split(":") 816 select_models = {args.model} if args.model else args.select_model or None 817 context.table_diff( 818 source=source, 819 target=target, 820 on=args.on, 821 skip_columns=args.skip_columns, 822 select_models=select_models, 823 where=args.where, 824 limit=args.limit, 825 show_sample=args.show_sample, 826 decimals=args.decimals, 827 skip_grain_check=args.skip_grain_check, 828 warn_grain_check=args.warn_grain_check, 829 schema_diff_ignore_case=args.schema_diff_ignore_case, 830 ) 831 832 @magic_arguments() 833 @argument( 834 "model_name", 835 nargs="?", 836 type=str, 837 help="The name of the model to get the table name for.", 838 ) 839 @argument( 840 "--environment", 841 type=str, 842 help="The environment to source the model version from.", 843 ) 844 @argument( 845 "--prod", 846 action="store_true", 847 help="If set, return the name of the physical table that will be used in production for the model version promoted in the target environment.", 848 ) 849 @line_magic 850 @pass_sqlmesh_context 851 def table_name(self, context: Context, line: str) -> None: 852 """Prints the name of the physical table for the given model.""" 853 args = parse_argstring(self.table_name, line) 854 context.console.log_status_update( 855 context.table_name(args.model_name, args.environment, args.prod) 856 ) 857 858 @magic_arguments() 859 @argument( 860 "pipeline", 861 nargs="?", 862 type=str, 863 help="The dlt pipeline to attach for this SQLMesh project.", 864 ) 865 @argument( 866 "--table", 867 "-t", 868 type=str, 869 nargs="*", 870 help="The specific dlt tables to refresh in the SQLMesh models.", 871 ) 872 @argument( 873 "--force", 874 "-f", 875 action="store_true", 876 help="If set, existing models are overwritten with the new DLT tables.", 877 ) 878 @argument( 879 "--dlt-path", 880 type=str, 881 help="The directory where the DLT pipeline resides.", 882 ) 883 @line_magic 884 @pass_sqlmesh_context 885 def dlt_refresh(self, context: Context, line: str) -> None: 886 """Attaches to a DLT pipeline with the option to update specific or all missing tables in the SQLMesh project.""" 887 from sqlmesh.integrations.dlt import generate_dlt_models 888 889 args = parse_argstring(self.dlt_refresh, line) 890 sqlmesh_models = generate_dlt_models( 891 context, args.pipeline, list(args.table or []), args.force, args.dlt_path 892 ) 893 if sqlmesh_models: 894 model_names = "\n".join([f"- {model_name}" for model_name in sqlmesh_models]) 895 context.console.log_success(f"Updated SQLMesh project with models:\n{model_names}") 896 else: 897 context.console.log_success("All SQLMesh models are up to date.") 898 899 @magic_arguments() 900 @argument( 901 "--read", 902 type=str, 903 default="", 904 help="The input dialect of the sql string.", 905 ) 906 @argument( 907 "--write", 908 type=str, 909 default="", 910 help="The output dialect of the sql string.", 911 ) 912 @line_cell_magic 913 @pass_sqlmesh_context 914 def rewrite(self, context: Context, line: str, sql: str) -> None: 915 """Rewrite a sql expression with semantic references into an executable query. 916 917 https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/ 918 """ 919 args = parse_argstring(self.rewrite, line) 920 context.console.show_sql( 921 context.rewrite(sql, args.read).sql( 922 dialect=args.write or context.config.dialect, pretty=True 923 ) 924 ) 925 926 @magic_arguments() 927 @argument( 928 "--transpile", 929 "-t", 930 type=str, 931 help="Transpile project models to the specified dialect.", 932 ) 933 @argument( 934 "--check", 935 action="store_true", 936 help="Whether or not to check formatting (but not actually format anything).", 937 default=None, 938 ) 939 @argument( 940 "--append-newline", 941 action="store_true", 942 help="Include a newline at the end of the output.", 943 default=None, 944 ) 945 @argument( 946 "--no-rewrite-casts", 947 action="store_true", 948 help="Preserve the existing casts, without rewriting them to use the :: syntax.", 949 default=None, 950 ) 951 @format_arguments 952 @line_magic 953 @pass_sqlmesh_context 954 def format(self, context: Context, line: str) -> bool: 955 """Format all SQL models and audits.""" 956 format_opts = vars(parse_argstring(self.format, line)) 957 if format_opts.pop("no_rewrite_casts", None): 958 format_opts["rewrite_casts"] = False 959 960 return context.format(**{k: v for k, v in format_opts.items() if v is not None}) 961 962 @magic_arguments() 963 @argument("environment", type=str, help="The environment to diff local state against.") 964 @line_magic 965 @pass_sqlmesh_context 966 def diff(self, context: Context, line: str) -> None: 967 """Show the diff between the local state and the target environment.""" 968 args = parse_argstring(self.diff, line) 969 context.diff(args.environment) 970 971 @magic_arguments() 972 @argument("environment", type=str, help="The environment to invalidate.") 973 @line_magic 974 @pass_sqlmesh_context 975 def invalidate(self, context: Context, line: str) -> None: 976 """Invalidate the target environment, forcing its removal during the next run of the janitor process.""" 977 args = parse_argstring(self.invalidate, line) 978 context.invalidate_environment(args.environment) 979 980 @magic_arguments() 981 @argument( 982 "--ignore-ttl", 983 action="store_true", 984 help="Cleanup snapshots that are not referenced in any environment, regardless of when they're set to expire", 985 ) 986 @line_magic 987 @pass_sqlmesh_context 988 def janitor(self, context: Context, line: str) -> None: 989 """Run the janitor process to clean up old environments and expired snapshots.""" 990 args = parse_argstring(self.janitor, line) 991 context.run_janitor(ignore_ttl=args.ignore_ttl) 992 993 @magic_arguments() 994 @argument("model", type=str) 995 @argument( 996 "--query", 997 "-q", 998 type=str, 999 nargs="+", 1000 default=[], 1001 help="Queries that will be used to generate data for the model's dependencies.", 1002 ) 1003 @argument( 1004 "--overwrite", 1005 "-o", 1006 action="store_true", 1007 help="When true, the fixture file will be overwritten in case it already exists.", 1008 ) 1009 @argument( 1010 "--var", 1011 "-v", 1012 type=str, 1013 nargs="+", 1014 help="Key-value pairs that will define variables needed by the model.", 1015 ) 1016 @argument( 1017 "--path", 1018 "-p", 1019 type=str, 1020 help="The file path corresponding to the fixture, relative to the test directory. " 1021 "By default, the fixture will be created under the test directory and the file " 1022 "name will be inferred based on the test's name.", 1023 ) 1024 @argument( 1025 "--name", 1026 "-n", 1027 type=str, 1028 help="The name of the test that will be created. By default, it's inferred based on the model's name.", 1029 ) 1030 @argument( 1031 "--include-ctes", 1032 action="store_true", 1033 help="When true, CTE fixtures will also be generated.", 1034 ) 1035 @line_magic 1036 @pass_sqlmesh_context 1037 def create_test(self, context: Context, line: str) -> None: 1038 """Generate a unit test fixture for a given model.""" 1039 args = parse_argstring(self.create_test, line) 1040 queries = iter(args.query) 1041 variables = iter(args.var) if args.var else None 1042 context.create_test( 1043 args.model, 1044 input_queries={k: v.strip('"') for k, v in dict(zip(queries, queries)).items()}, 1045 overwrite=args.overwrite, 1046 variables=dict(zip(variables, variables)) if variables else None, 1047 path=args.path, 1048 name=args.name, 1049 include_ctes=args.include_ctes, 1050 ) 1051 1052 @magic_arguments() 1053 @argument("tests", nargs="*", type=str) 1054 @argument( 1055 "--pattern", 1056 "-k", 1057 nargs="*", 1058 type=str, 1059 help="Only run tests that match the pattern of substring.", 1060 ) 1061 @argument( 1062 "--verbose", 1063 "-v", 1064 action="count", 1065 default=0, 1066 help="Verbose output. Use -vv for very verbose.", 1067 ) 1068 @argument( 1069 "--preserve-fixtures", 1070 action="store_true", 1071 help="Preserve the fixture tables in the testing database, useful for debugging.", 1072 ) 1073 @line_magic 1074 @pass_sqlmesh_context 1075 def run_test(self, context: Context, line: str) -> None: 1076 """Run unit test(s).""" 1077 args = parse_argstring(self.run_test, line) 1078 1079 context.test( 1080 match_patterns=args.pattern, 1081 tests=args.tests, 1082 verbosity=Verbosity(args.verbose), 1083 preserve_fixtures=args.preserve_fixtures, 1084 stream=StringIO(), # consume the output instead of redirecting to stdout 1085 ) 1086 1087 @magic_arguments() 1088 @argument( 1089 "models", type=str, nargs="*", help="A model to audit. Multiple models can be audited." 1090 ) 1091 @argument("--start", "-s", type=str, help="Start date to audit.") 1092 @argument("--end", "-e", type=str, help="End date to audit.") 1093 @argument("--execution-time", type=str, help="Execution time.") 1094 @line_magic 1095 @pass_sqlmesh_context 1096 def audit(self, context: Context, line: str) -> bool: 1097 """Run audit(s)""" 1098 args = parse_argstring(self.audit, line) 1099 return context.audit( 1100 models=args.models, start=args.start, end=args.end, execution_time=args.execution_time 1101 ) 1102 1103 @magic_arguments() 1104 @argument("environment", nargs="?", type=str, help="The environment to check intervals for.") 1105 @argument( 1106 "--no-signals", 1107 action="store_true", 1108 help="Disable signal checks and only show missing intervals.", 1109 default=False, 1110 ) 1111 @argument( 1112 "--select-model", 1113 type=str, 1114 nargs="*", 1115 help="Select specific model changes that should be included in the plan.", 1116 ) 1117 @argument("--start", "-s", type=str, help="Start date of intervals to check for.") 1118 @argument("--end", "-e", type=str, help="End date of intervals to check for.") 1119 @line_magic 1120 @pass_sqlmesh_context 1121 def check_intervals(self, context: Context, line: str) -> None: 1122 """Show missing intervals in an environment, respecting signals.""" 1123 args = parse_argstring(self.check_intervals, line) 1124 1125 context.console.show_intervals( 1126 context.check_intervals( 1127 environment=args.environment, 1128 no_signals=args.no_signals, 1129 select_models=args.select_model, 1130 start=args.start, 1131 end=args.end, 1132 ) 1133 ) 1134 1135 @magic_arguments() 1136 @argument( 1137 "--skip-connection", 1138 action="store_true", 1139 help="Skip the connection test.", 1140 default=False, 1141 ) 1142 @argument( 1143 "--verbose", 1144 "-v", 1145 action="count", 1146 default=0, 1147 help="Verbose output. Use -vv for very verbose.", 1148 ) 1149 @line_magic 1150 @pass_sqlmesh_context 1151 def info(self, context: Context, line: str) -> None: 1152 """Display SQLMesh project information.""" 1153 args = parse_argstring(self.info, line) 1154 context.print_info(skip_connection=args.skip_connection, verbosity=Verbosity(args.verbose)) 1155 1156 @magic_arguments() 1157 @line_magic 1158 @pass_sqlmesh_context 1159 def rollback(self, context: Context, line: str) -> None: 1160 """Rollback SQLMesh to the previous migration.""" 1161 context.rollback() 1162 1163 @magic_arguments() 1164 @line_magic 1165 @pass_sqlmesh_context 1166 def clean(self, context: Context, line: str) -> None: 1167 """Clears the SQLMesh cache and any build artifacts.""" 1168 context.clear_caches() 1169 context.console.log_success("SQLMesh cache and build artifacts cleared") 1170 1171 @magic_arguments() 1172 @line_magic 1173 @pass_sqlmesh_context 1174 def environments(self, context: Context, line: str) -> None: 1175 """Prints the list of SQLMesh environments with its expiry datetime.""" 1176 context.print_environment_names() 1177 1178 @magic_arguments() 1179 @argument( 1180 "--models", 1181 "--model", 1182 type=str, 1183 nargs="*", 1184 help="A model to lint. Multiple models can be linted. If no models are specified, every model will be linted.", 1185 ) 1186 @line_magic 1187 @pass_sqlmesh_context 1188 def lint(self, context: Context, line: str) -> None: 1189 """Run linter for target model(s)""" 1190 args = parse_argstring(self.lint, line) 1191 context.lint_models(args.models) 1192 1193 @magic_arguments() 1194 @line_magic 1195 @pass_sqlmesh_context 1196 def destroy(self, context: Context, line: str) -> None: 1197 """Removes all project resources, engine-managed objects, state tables and clears the SQLMesh cache.""" 1198 context.destroy()
Base class for implementing magic functions.
Shell functions which can be reached as %function_name. All magic
functions should accept a string, which they can parse for their own
needs. This can make some functions easier to type, eg %cd ../
vs. %cd("../")
Classes providing magic functions need to subclass this class, and they MUST:
Use the method decorators
@line_magicand@cell_magicto decorate individual methods as magic functions, ANDUse the class decorator
@magics_classto ensure that the magic methods are properly registered at the instance level upon instance initialization.
See magic_functions for examples of actual implementation classes.
154 @magic_arguments() 155 @argument( 156 "paths", 157 type=str, 158 nargs="+", 159 default="", 160 help="The path(s) to the SQLMesh project(s).", 161 ) 162 @argument( 163 "--config", 164 type=str, 165 help="Name of the config object. Only applicable to configuration defined using Python script.", 166 ) 167 @argument("--gateway", type=str, help="The name of the gateway.") 168 @argument("--ignore-warnings", action="store_true", help="Ignore warnings.") 169 @argument("--debug", action="store_true", help="Enable debug mode.") 170 @argument("--log-file-dir", type=str, help="The directory to write the log file to.") 171 @argument( 172 "--dotenv", type=str, help="Path to a custom .env file to load environment variables from." 173 ) 174 @line_magic 175 def context(self, line: str) -> None: 176 """Sets the context in the user namespace.""" 177 from sqlmesh import configure_logging, remove_excess_logs 178 179 args = parse_argstring(self.context, line) 180 log_file_dir = args.log_file_dir 181 182 configure_logging( 183 args.debug, 184 log_file_dir=log_file_dir, 185 ignore_warnings=args.ignore_warnings, 186 ) 187 configure_console(ignore_warnings=args.ignore_warnings) 188 189 dotenv_path = Path(args.dotenv) if args.dotenv else None 190 configs = load_configs( 191 args.config, Context.CONFIG_TYPE, args.paths, dotenv_path=dotenv_path 192 ) 193 log_limit = list(configs.values())[0].log_limit 194 195 remove_excess_logs(log_file_dir, log_limit) 196 197 try: 198 context = Context(paths=args.paths, config=configs, gateway=args.gateway) 199 self._shell.user_ns["context"] = context 200 except Exception: 201 if args.debug: 202 logger.exception("Failed to initialize SQLMesh context") 203 raise 204 205 context.console.log_success(f"SQLMesh project context set to: {', '.join(args.paths)}")
::
%context [--config CONFIG] [--gateway GATEWAY] [--ignore-warnings] [--debug] [--log-file-dir LOG_FILE_DIR] [--dotenv DOTENV] paths [paths ...]
Sets the context in the user namespace.
positional arguments: paths The path(s) to the SQLMesh project(s).
options: --config CONFIG Name of the config object. Only applicable to configuration defined using Python script. --gateway GATEWAY The name of the gateway. --ignore-warnings Ignore warnings. --debug Enable debug mode. --log-file-dir LOG_FILE_DIR The directory to write the log file to. --dotenv DOTENV Path to a custom .env file to load environment variables from.
207 @magic_arguments() 208 @argument("path", type=str, help="The path where the new SQLMesh project should be created.") 209 @argument( 210 "engine", 211 type=str, 212 help=f"Project SQL engine. Supported values: '{', '.join([info[1] for info in sorted(INIT_DISPLAY_INFO_TO_TYPE.values(), key=lambda x: x[0])])}'.", # type: ignore 213 ) 214 @argument( 215 "--template", 216 "-t", 217 type=str, 218 help="Project template. Supported values: dbt, default, empty.", 219 ) 220 @argument( 221 "--dlt-pipeline", 222 type=str, 223 help="DLT pipeline for which to generate a SQLMesh project. Use alongside template: dlt", 224 ) 225 @argument( 226 "--dlt-path", 227 type=str, 228 help="The directory where the DLT pipeline resides. Use alongside template: dlt", 229 ) 230 @line_magic 231 def init(self, line: str) -> None: 232 """Creates a SQLMesh project scaffold with a default SQL dialect.""" 233 args = parse_argstring(self.init, line) 234 try: 235 project_template = ProjectTemplate( 236 args.template.lower() if args.template else "default" 237 ) 238 except ValueError: 239 raise MagicError(f"Invalid project template '{args.template}'") 240 init_example_project( 241 path=args.path, 242 engine_type=args.engine, 243 dialect=None, 244 template=project_template, 245 pipeline=args.dlt_pipeline, 246 dlt_path=args.dlt_path, 247 ) 248 html = str( 249 h( 250 "div", 251 h( 252 "span", 253 {"style": {"color": "green", "font-weight": "bold"}}, 254 "SQLMesh project scaffold created", 255 ), 256 ) 257 ) 258 self.display(JupyterRenderable(html=html, text=""))
::
%init [--template TEMPLATE] [--dlt-pipeline DLT_PIPELINE] [--dlt-path DLT_PATH] path engine
Creates a SQLMesh project scaffold with a default SQL dialect.
positional arguments: path The path where the new SQLMesh project should be created. engine Project SQL engine. Supported values: 'DuckDB, Snowflake, Databricks, BigQuery, MotherDuck, ClickHouse, Redshift, Spark, Trino, Azure SQL, MSSQL, Postgres, GCP Postgres, MySQL, Athena, RisingWave, Fabric'.
options: --template TEMPLATE, -t TEMPLATE Project template. Supported values: dbt, default, empty. --dlt-pipeline DLT_PIPELINE DLT pipeline for which to generate a SQLMesh project. Use alongside template: dlt --dlt-path DLT_PATH The directory where the DLT pipeline resides. Use alongside template: dlt
260 @magic_arguments() 261 @argument("model", type=str, help="The model.") 262 @argument("--start", "-s", type=str, help="Start date to render.") 263 @argument("--end", "-e", type=str, help="End date to render.") 264 @argument("--execution-time", type=str, help="Execution time.") 265 @argument("--dialect", "-d", type=str, help="The rendered dialect.") 266 @line_cell_magic 267 @pass_sqlmesh_context 268 def model(self, context: Context, line: str, sql: t.Optional[str] = None) -> None: 269 """Renders the model and automatically fills in an editable cell with the model definition.""" 270 args = parse_argstring(self.model, line) 271 272 model = context.get_model(args.model, raise_if_missing=True) 273 config = context.config_for_node(model) 274 275 if sql: 276 expressions = parse(sql, default_dialect=config.dialect) 277 loaded = load_sql_based_model( 278 expressions, 279 macros=context._macros, 280 jinja_macros=context._jinja_macros, 281 path=model._path, 282 dialect=config.dialect, 283 time_column_format=config.time_column_format, 284 physical_schema_mapping=context.config.physical_schema_mapping, 285 default_catalog=context.default_catalog, 286 ) 287 288 if loaded.name == args.model: 289 model = loaded 290 else: 291 if model._path: 292 with open(model._path, "r", encoding="utf-8") as file: 293 expressions = parse(file.read(), default_dialect=config.dialect) 294 295 formatted = format_model_expressions( 296 expressions, 297 model.dialect, 298 rewrite_casts=not config.format.no_rewrite_casts, 299 **config.format.generator_options, 300 ) 301 302 self._shell.set_next_input( 303 "\n".join( 304 [ 305 " ".join(["%%model", line]), 306 formatted, 307 ] 308 ), 309 replace=True, 310 ) 311 312 if model._path: 313 with open(model._path, "w", encoding="utf-8") as file: 314 file.write(formatted) 315 316 if sql: 317 context.console.log_success(f"Model `{args.model}` updated") 318 319 context.upsert_model(model) 320 context.console.show_sql( 321 context.render( 322 model.name, 323 start=args.start, 324 end=args.end, 325 execution_time=args.execution_time, 326 ).sql(pretty=True, dialect=args.dialect or model.dialect) 327 )
Renders the model and automatically fills in an editable cell with the model definition.
329 @magic_arguments() 330 @argument("model", type=str, help="The model.") 331 @argument("test_name", type=str, nargs="?", default=None, help="The test name to display") 332 @argument("--ls", action="store_true", help="List tests associated with a model") 333 @line_cell_magic 334 @pass_sqlmesh_context 335 def test(self, context: Context, line: str, test_def_raw: t.Optional[str] = None) -> None: 336 """Allow the user to list tests for a model, output a specific test, and then write their changes back""" 337 args = parse_argstring(self.test, line) 338 if not args.test_name and not args.ls: 339 raise MagicError("Must provide either test name or `--ls` to list tests") 340 341 test_meta = context.select_tests() 342 343 tests: t.Dict[str, t.Dict[str, ModelTestMetadata]] = defaultdict(dict) 344 for model_test_metadata in test_meta: 345 model = model_test_metadata.body.get("model") 346 if not model: 347 context.console.log_error( 348 f"Test found that does not have `model` defined: {model_test_metadata.path}" 349 ) 350 else: 351 tests[model][model_test_metadata.test_name] = model_test_metadata 352 353 model = context.get_model(args.model, raise_if_missing=True) 354 355 if args.ls: 356 # TODO: Provide better UI for displaying tests 357 for test_name in tests[model.name]: 358 context.console.log_status_update(test_name) 359 return 360 361 test = tests[model.name][args.test_name] 362 test_def = yaml.load(test_def_raw) if test_def_raw else test.body 363 test_def_output = yaml.dump(test_def) 364 365 self._shell.set_next_input( 366 "\n".join( 367 [ 368 " ".join(["%%test", line]), 369 test_def_output, 370 ] 371 ), 372 replace=True, 373 ) 374 375 with open(test.path, "r+", encoding="utf-8") as file: 376 content = yaml.load(file.read()) 377 content[args.test_name] = test_def 378 file.seek(0) 379 yaml.dump(content, file) 380 file.truncate()
Allow the user to list tests for a model, output a specific test, and then write their changes back
382 @magic_arguments() 383 @argument( 384 "environment", 385 nargs="?", 386 type=str, 387 help="The environment to run the plan against", 388 ) 389 @argument("--start", "-s", type=str, help="Start date to backfill.") 390 @argument("--end", "-e", type=str, help="End date to backfill.") 391 @argument("--execution-time", type=str, help="Execution time.") 392 @argument( 393 "--create-from", 394 type=str, 395 help="The environment to create the target environment from if it doesn't exist. Default: prod.", 396 ) 397 @argument( 398 "--skip-tests", 399 "-t", 400 action="store_true", 401 help="Skip the unit tests defined for the model.", 402 ) 403 @argument( 404 "--skip-linter", 405 action="store_true", 406 help="Skip the linter for the model.", 407 ) 408 @argument( 409 "--restate-model", 410 "-r", 411 type=str, 412 nargs="*", 413 help="Restate data for specified models (and models downstream from the one specified). For production environment, all related model versions will have their intervals wiped, but only the current versions will be backfilled. For development environment, only the current model versions will be affected.", 414 ) 415 @argument( 416 "--no-gaps", 417 "-g", 418 action="store_true", 419 help="Ensure that new snapshots have no data gaps when comparing to existing snapshots for matching models in the target environment.", 420 ) 421 @argument( 422 "--skip-backfill", 423 "--dry-run", 424 action="store_true", 425 help="Skip the backfill step and only create a virtual update for the plan.", 426 ) 427 @argument( 428 "--empty-backfill", 429 action="store_true", 430 help="Produce empty backfill. Like --skip-backfill no models will be backfilled, unlike --skip-backfill missing intervals will be recorded as if they were backfilled.", 431 ) 432 @argument( 433 "--forward-only", 434 action="store_true", 435 help="Create a plan for forward-only changes.", 436 default=None, 437 ) 438 @argument( 439 "--effective-from", 440 type=str, 441 help="The effective date from which to apply forward-only changes on production.", 442 ) 443 @argument( 444 "--no-prompts", 445 action="store_true", 446 help="Disables interactive prompts for the backfill time range. Please note that if this flag is set and there are uncategorized changes, plan creation will fail.", 447 default=None, 448 ) 449 @argument( 450 "--auto-apply", 451 action="store_true", 452 help="Automatically applies the new plan after creation.", 453 default=None, 454 ) 455 @argument( 456 "--no-auto-categorization", 457 action="store_true", 458 help="Disable automatic change categorization.", 459 default=None, 460 ) 461 @argument( 462 "--include-unmodified", 463 action="store_true", 464 help="Include unmodified models in the target environment.", 465 default=None, 466 ) 467 @argument( 468 "--select-model", 469 type=str, 470 nargs="*", 471 help="Select specific model changes that should be included in the plan.", 472 ) 473 @argument( 474 "--backfill-model", 475 type=str, 476 nargs="*", 477 help="Backfill only the models whose names match the expression.", 478 ) 479 @argument( 480 "--no-diff", 481 action="store_true", 482 help="Hide text differences for changed models.", 483 default=None, 484 ) 485 @argument( 486 "--run", 487 action="store_true", 488 help="Run latest intervals as part of the plan application (prod environment only).", 489 ) 490 @argument( 491 "--ignore-cron", 492 action="store_true", 493 help="Run for all missing intervals, ignoring individual cron schedules. Only applies if --run is set.", 494 default=None, 495 ) 496 @argument( 497 "--enable-preview", 498 action="store_true", 499 help="Enable preview for forward-only models when targeting a development environment.", 500 default=None, 501 ) 502 @argument( 503 "--diff-rendered", 504 action="store_true", 505 help="Output text differences for the rendered versions of the models and standalone audits", 506 ) 507 @argument( 508 "--verbose", 509 "-v", 510 action="count", 511 default=0, 512 help="Verbose output. Use -vv for very verbose.", 513 ) 514 @line_magic 515 @pass_sqlmesh_context 516 def plan(self, context: Context, line: str) -> None: 517 """Goes through a set of prompts to both establish a plan and apply it""" 518 args = parse_argstring(self.plan, line) 519 520 setattr(context.console, "verbosity", Verbosity(args.verbose)) 521 522 context.plan( 523 args.environment, 524 start=args.start, 525 end=args.end, 526 execution_time=args.execution_time, 527 create_from=args.create_from, 528 skip_tests=args.skip_tests, 529 restate_models=args.restate_model, 530 backfill_models=args.backfill_model, 531 no_gaps=args.no_gaps, 532 skip_backfill=args.skip_backfill, 533 empty_backfill=args.empty_backfill, 534 forward_only=args.forward_only, 535 no_prompts=args.no_prompts, 536 auto_apply=args.auto_apply, 537 no_auto_categorization=args.no_auto_categorization, 538 effective_from=args.effective_from, 539 include_unmodified=args.include_unmodified, 540 select_models=args.select_model, 541 no_diff=args.no_diff, 542 run=args.run, 543 ignore_cron=args.run, 544 enable_preview=args.enable_preview, 545 diff_rendered=args.diff_rendered, 546 )
Goes through a set of prompts to both establish a plan and apply it
548 @magic_arguments() 549 @argument( 550 "environment", 551 nargs="?", 552 type=str, 553 help="The environment to run against", 554 ) 555 @argument("--start", "-s", type=str, help="Start date to evaluate.") 556 @argument("--end", "-e", type=str, help="End date to evaluate.") 557 @argument("--skip-janitor", action="store_true", help="Skip the janitor task.") 558 @argument( 559 "--ignore-cron", 560 action="store_true", 561 help="Run for all missing intervals, ignoring individual cron schedules.", 562 ) 563 @argument( 564 "--select-model", 565 type=str, 566 nargs="*", 567 help="Select specific models to run. Note: this always includes upstream dependencies.", 568 ) 569 @argument( 570 "--exit-on-env-update", 571 type=int, 572 help="If set, the command will exit with the specified code if the run is interrupted by an update to the target environment.", 573 ) 574 @argument( 575 "--no-auto-upstream", 576 action="store_true", 577 help="Do not automatically include upstream models. Only applicable when --select-model is used. Note: this may result in missing / invalid data for the selected models.", 578 ) 579 @line_magic 580 @pass_sqlmesh_context 581 def run_dag(self, context: Context, line: str) -> None: 582 """Evaluate the DAG of models using the built-in scheduler.""" 583 args = parse_argstring(self.run_dag, line) 584 585 completion_status = context.run( 586 args.environment, 587 start=args.start, 588 end=args.end, 589 skip_janitor=args.skip_janitor, 590 ignore_cron=args.ignore_cron, 591 select_models=args.select_model, 592 exit_on_env_update=args.exit_on_env_update, 593 no_auto_upstream=args.no_auto_upstream, 594 ) 595 if completion_status.is_failure: 596 raise SQLMeshError("Error Running DAG. Check logs for details.")
Evaluate the DAG of models using the built-in scheduler.
598 @magic_arguments() 599 @argument("model", type=str, help="The model.") 600 @argument("--start", "-s", type=str, help="Start date to render.") 601 @argument("--end", "-e", type=str, help="End date to render.") 602 @argument("--execution-time", type=str, help="Execution time.") 603 @argument( 604 "--limit", 605 type=int, 606 help="The number of rows which the query should be limited to.", 607 ) 608 @line_magic 609 @pass_sqlmesh_context 610 def evaluate(self, context: Context, line: str) -> None: 611 """Evaluate a model query and fetches a dataframe.""" 612 context.refresh() 613 614 snowpark = optional_import("snowflake.snowpark") 615 args = parse_argstring(self.evaluate, line) 616 617 df = context.evaluate( 618 args.model, 619 start=args.start, 620 end=args.end, 621 execution_time=args.execution_time, 622 limit=args.limit, 623 ) 624 625 if snowpark and isinstance(df, snowpark.DataFrame): 626 df = df.limit(args.limit or 100).to_pandas() 627 628 self.display(df)
Evaluate a model query and fetches a dataframe.
630 @magic_arguments() 631 @argument("model", type=str, help="The model.") 632 @argument("--start", "-s", type=str, help="Start date to render.") 633 @argument("--end", "-e", type=str, help="End date to render.") 634 @argument("--execution-time", type=str, help="Execution time.") 635 @argument( 636 "--expand", 637 type=t.Union[bool, t.Iterable[str]], 638 help="Whether or not to use expand materialized models, defaults to False. If True, all referenced models are expanded as raw queries. If a list, only referenced models are expanded as raw queries.", 639 ) 640 @argument("--dialect", type=str, help="SQL dialect to render.") 641 @argument("--no-format", action="store_true", help="Disable fancy formatting of the query.") 642 @format_arguments 643 @line_magic 644 @pass_sqlmesh_context 645 def render(self, context: Context, line: str) -> None: 646 """Renders a model's query, optionally expanding referenced models.""" 647 context.refresh() 648 render_opts = vars(parse_argstring(self.render, line)) 649 model = render_opts.pop("model") 650 dialect = render_opts.pop("dialect", None) 651 652 model = context.get_model(model, raise_if_missing=True) 653 654 query = context.render( 655 model, 656 start=render_opts.pop("start", None), 657 end=render_opts.pop("end", None), 658 execution_time=render_opts.pop("execution_time", None), 659 expand=render_opts.pop("expand", False), 660 ) 661 662 no_format = render_opts.pop("no_format", False) 663 664 format_config = context.config_for_node(model).format 665 format_options = { 666 **format_config.generator_options, 667 **{k: v for k, v in render_opts.items() if v is not None}, 668 } 669 670 sql = query.sql( 671 pretty=True, 672 dialect=context.config.dialect if dialect is None else dialect, 673 **format_options, 674 ) 675 676 if no_format: 677 context.console.log_status_update(sql) 678 else: 679 context.console.show_sql(sql)
Renders a model's query, optionally expanding referenced models.
681 @magic_arguments() 682 @argument( 683 "df_var", 684 default=None, 685 nargs="?", 686 type=str, 687 help="An optional variable name to store the resulting dataframe.", 688 ) 689 @cell_magic 690 @pass_sqlmesh_context 691 def fetchdf(self, context: Context, line: str, sql: str) -> None: 692 """Fetches a dataframe from sql, optionally storing it in a variable.""" 693 args = parse_argstring(self.fetchdf, line) 694 df = context.fetchdf(sql) 695 if args.df_var: 696 self._shell.user_ns[args.df_var] = df 697 self.display(df)
Fetches a dataframe from sql, optionally storing it in a variable.
699 @magic_arguments() 700 @argument("--file", "-f", type=str, help="An optional file path to write the HTML output to.") 701 @argument( 702 "--select-model", 703 type=str, 704 nargs="*", 705 help="Select specific models to include in the dag.", 706 ) 707 @line_magic 708 @pass_sqlmesh_context 709 def dag(self, context: Context, line: str) -> None: 710 """Displays the HTML DAG.""" 711 args = parse_argstring(self.dag, line) 712 dag = context.get_dag(args.select_model) 713 if args.file: 714 with open(args.file, "w", encoding="utf-8") as file: 715 file.write(str(dag)) 716 # TODO: Have this go through console instead of calling display directly 717 self.display(dag)
Displays the HTML DAG.
719 @magic_arguments() 720 @line_magic 721 @pass_sqlmesh_context 722 def migrate(self, context: Context, line: str) -> None: 723 """Migrate SQLMesh to the current running version.""" 724 context.migrate() 725 context.console.log_success("Migration complete")
Migrate SQLMesh to the current running version.
727 @magic_arguments() 728 @argument( 729 "--strict", 730 action="store_true", 731 help="Raise an error if the external model is missing in the database", 732 ) 733 @line_magic 734 @pass_sqlmesh_context 735 def create_external_models(self, context: Context, line: str) -> None: 736 """Create a schema file containing external model schemas.""" 737 args = parse_argstring(self.create_external_models, line) 738 context.create_external_models(strict=args.strict)
Create a schema file containing external model schemas.
740 @magic_arguments() 741 @argument( 742 "source_to_target", 743 type=str, 744 metavar="SOURCE:TARGET", 745 help="Source and target in `SOURCE:TARGET` format", 746 ) 747 @argument( 748 "--on", 749 type=str, 750 nargs="*", 751 help="The column to join on. Can be specified multiple times. The model grain will be used if not specified.", 752 ) 753 @argument( 754 "--skip-columns", 755 type=str, 756 nargs="*", 757 help="The column(s) to skip when comparing the source and target table.", 758 ) 759 @argument( 760 "--model", 761 type=str, 762 help="The model to diff against when source and target are environments and not tables.", 763 ) 764 @argument( 765 "--where", 766 type=str, 767 help="An optional where statement to filter results.", 768 ) 769 @argument( 770 "--limit", 771 type=int, 772 default=20, 773 help="The limit of the sample dataframe.", 774 ) 775 @argument( 776 "--show-sample", 777 action="store_true", 778 help="Show a sample of the rows that differ. With many columns, the output can be very wide.", 779 ) 780 @argument( 781 "--decimals", 782 type=int, 783 default=3, 784 help="The number of decimal places to keep when comparing floating point columns. Default: 3", 785 ) 786 @argument( 787 "--select-model", 788 type=str, 789 nargs="*", 790 help="Specify one or more models to data diff. Use wildcards to diff multiple models. Ex: '*' (all models with applied plan diffs), 'demo.model+' (this and downstream models), 'git:feature_branch' (models with direct modifications in this branch only)", 791 ) 792 @argument( 793 "--skip-grain-check", 794 action="store_true", 795 help="Disable the check for a primary key (grain) that is missing or is not unique.", 796 ) 797 @argument( 798 "--warn-grain-check", 799 action="store_true", 800 help="Warn if any selected model is missing a grain, and compute diffs for the remaining models.", 801 ) 802 @argument( 803 "--schema-diff-ignore-case", 804 action="store_true", 805 help="If set, when performing a schema diff the case of column names is ignored when matching between the two schemas. For example, 'col_a' in the source schema and 'COL_A' in the target schema will be treated as the same column.", 806 ) 807 @line_magic 808 @pass_sqlmesh_context 809 def table_diff(self, context: Context, line: str) -> None: 810 """Show the diff between two tables. 811 812 Can either be two tables or two environments and a model. 813 """ 814 args = parse_argstring(self.table_diff, line) 815 source, target = args.source_to_target.split(":") 816 select_models = {args.model} if args.model else args.select_model or None 817 context.table_diff( 818 source=source, 819 target=target, 820 on=args.on, 821 skip_columns=args.skip_columns, 822 select_models=select_models, 823 where=args.where, 824 limit=args.limit, 825 show_sample=args.show_sample, 826 decimals=args.decimals, 827 skip_grain_check=args.skip_grain_check, 828 warn_grain_check=args.warn_grain_check, 829 schema_diff_ignore_case=args.schema_diff_ignore_case, 830 )
Show the diff between two tables.
Can either be two tables or two environments and a model.
832 @magic_arguments() 833 @argument( 834 "model_name", 835 nargs="?", 836 type=str, 837 help="The name of the model to get the table name for.", 838 ) 839 @argument( 840 "--environment", 841 type=str, 842 help="The environment to source the model version from.", 843 ) 844 @argument( 845 "--prod", 846 action="store_true", 847 help="If set, return the name of the physical table that will be used in production for the model version promoted in the target environment.", 848 ) 849 @line_magic 850 @pass_sqlmesh_context 851 def table_name(self, context: Context, line: str) -> None: 852 """Prints the name of the physical table for the given model.""" 853 args = parse_argstring(self.table_name, line) 854 context.console.log_status_update( 855 context.table_name(args.model_name, args.environment, args.prod) 856 )
Prints the name of the physical table for the given model.
858 @magic_arguments() 859 @argument( 860 "pipeline", 861 nargs="?", 862 type=str, 863 help="The dlt pipeline to attach for this SQLMesh project.", 864 ) 865 @argument( 866 "--table", 867 "-t", 868 type=str, 869 nargs="*", 870 help="The specific dlt tables to refresh in the SQLMesh models.", 871 ) 872 @argument( 873 "--force", 874 "-f", 875 action="store_true", 876 help="If set, existing models are overwritten with the new DLT tables.", 877 ) 878 @argument( 879 "--dlt-path", 880 type=str, 881 help="The directory where the DLT pipeline resides.", 882 ) 883 @line_magic 884 @pass_sqlmesh_context 885 def dlt_refresh(self, context: Context, line: str) -> None: 886 """Attaches to a DLT pipeline with the option to update specific or all missing tables in the SQLMesh project.""" 887 from sqlmesh.integrations.dlt import generate_dlt_models 888 889 args = parse_argstring(self.dlt_refresh, line) 890 sqlmesh_models = generate_dlt_models( 891 context, args.pipeline, list(args.table or []), args.force, args.dlt_path 892 ) 893 if sqlmesh_models: 894 model_names = "\n".join([f"- {model_name}" for model_name in sqlmesh_models]) 895 context.console.log_success(f"Updated SQLMesh project with models:\n{model_names}") 896 else: 897 context.console.log_success("All SQLMesh models are up to date.")
Attaches to a DLT pipeline with the option to update specific or all missing tables in the SQLMesh project.
899 @magic_arguments() 900 @argument( 901 "--read", 902 type=str, 903 default="", 904 help="The input dialect of the sql string.", 905 ) 906 @argument( 907 "--write", 908 type=str, 909 default="", 910 help="The output dialect of the sql string.", 911 ) 912 @line_cell_magic 913 @pass_sqlmesh_context 914 def rewrite(self, context: Context, line: str, sql: str) -> None: 915 """Rewrite a sql expression with semantic references into an executable query. 916 917 https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/ 918 """ 919 args = parse_argstring(self.rewrite, line) 920 context.console.show_sql( 921 context.rewrite(sql, args.read).sql( 922 dialect=args.write or context.config.dialect, pretty=True 923 ) 924 )
Rewrite a sql expression with semantic references into an executable query.
https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/
926 @magic_arguments() 927 @argument( 928 "--transpile", 929 "-t", 930 type=str, 931 help="Transpile project models to the specified dialect.", 932 ) 933 @argument( 934 "--check", 935 action="store_true", 936 help="Whether or not to check formatting (but not actually format anything).", 937 default=None, 938 ) 939 @argument( 940 "--append-newline", 941 action="store_true", 942 help="Include a newline at the end of the output.", 943 default=None, 944 ) 945 @argument( 946 "--no-rewrite-casts", 947 action="store_true", 948 help="Preserve the existing casts, without rewriting them to use the :: syntax.", 949 default=None, 950 ) 951 @format_arguments 952 @line_magic 953 @pass_sqlmesh_context 954 def format(self, context: Context, line: str) -> bool: 955 """Format all SQL models and audits.""" 956 format_opts = vars(parse_argstring(self.format, line)) 957 if format_opts.pop("no_rewrite_casts", None): 958 format_opts["rewrite_casts"] = False 959 960 return context.format(**{k: v for k, v in format_opts.items() if v is not None})
Format all SQL models and audits.
962 @magic_arguments() 963 @argument("environment", type=str, help="The environment to diff local state against.") 964 @line_magic 965 @pass_sqlmesh_context 966 def diff(self, context: Context, line: str) -> None: 967 """Show the diff between the local state and the target environment.""" 968 args = parse_argstring(self.diff, line) 969 context.diff(args.environment)
Show the diff between the local state and the target environment.
971 @magic_arguments() 972 @argument("environment", type=str, help="The environment to invalidate.") 973 @line_magic 974 @pass_sqlmesh_context 975 def invalidate(self, context: Context, line: str) -> None: 976 """Invalidate the target environment, forcing its removal during the next run of the janitor process.""" 977 args = parse_argstring(self.invalidate, line) 978 context.invalidate_environment(args.environment)
Invalidate the target environment, forcing its removal during the next run of the janitor process.
980 @magic_arguments() 981 @argument( 982 "--ignore-ttl", 983 action="store_true", 984 help="Cleanup snapshots that are not referenced in any environment, regardless of when they're set to expire", 985 ) 986 @line_magic 987 @pass_sqlmesh_context 988 def janitor(self, context: Context, line: str) -> None: 989 """Run the janitor process to clean up old environments and expired snapshots.""" 990 args = parse_argstring(self.janitor, line) 991 context.run_janitor(ignore_ttl=args.ignore_ttl)
Run the janitor process to clean up old environments and expired snapshots.
993 @magic_arguments() 994 @argument("model", type=str) 995 @argument( 996 "--query", 997 "-q", 998 type=str, 999 nargs="+", 1000 default=[], 1001 help="Queries that will be used to generate data for the model's dependencies.", 1002 ) 1003 @argument( 1004 "--overwrite", 1005 "-o", 1006 action="store_true", 1007 help="When true, the fixture file will be overwritten in case it already exists.", 1008 ) 1009 @argument( 1010 "--var", 1011 "-v", 1012 type=str, 1013 nargs="+", 1014 help="Key-value pairs that will define variables needed by the model.", 1015 ) 1016 @argument( 1017 "--path", 1018 "-p", 1019 type=str, 1020 help="The file path corresponding to the fixture, relative to the test directory. " 1021 "By default, the fixture will be created under the test directory and the file " 1022 "name will be inferred based on the test's name.", 1023 ) 1024 @argument( 1025 "--name", 1026 "-n", 1027 type=str, 1028 help="The name of the test that will be created. By default, it's inferred based on the model's name.", 1029 ) 1030 @argument( 1031 "--include-ctes", 1032 action="store_true", 1033 help="When true, CTE fixtures will also be generated.", 1034 ) 1035 @line_magic 1036 @pass_sqlmesh_context 1037 def create_test(self, context: Context, line: str) -> None: 1038 """Generate a unit test fixture for a given model.""" 1039 args = parse_argstring(self.create_test, line) 1040 queries = iter(args.query) 1041 variables = iter(args.var) if args.var else None 1042 context.create_test( 1043 args.model, 1044 input_queries={k: v.strip('"') for k, v in dict(zip(queries, queries)).items()}, 1045 overwrite=args.overwrite, 1046 variables=dict(zip(variables, variables)) if variables else None, 1047 path=args.path, 1048 name=args.name, 1049 include_ctes=args.include_ctes, 1050 )
Generate a unit test fixture for a given model.
1052 @magic_arguments() 1053 @argument("tests", nargs="*", type=str) 1054 @argument( 1055 "--pattern", 1056 "-k", 1057 nargs="*", 1058 type=str, 1059 help="Only run tests that match the pattern of substring.", 1060 ) 1061 @argument( 1062 "--verbose", 1063 "-v", 1064 action="count", 1065 default=0, 1066 help="Verbose output. Use -vv for very verbose.", 1067 ) 1068 @argument( 1069 "--preserve-fixtures", 1070 action="store_true", 1071 help="Preserve the fixture tables in the testing database, useful for debugging.", 1072 ) 1073 @line_magic 1074 @pass_sqlmesh_context 1075 def run_test(self, context: Context, line: str) -> None: 1076 """Run unit test(s).""" 1077 args = parse_argstring(self.run_test, line) 1078 1079 context.test( 1080 match_patterns=args.pattern, 1081 tests=args.tests, 1082 verbosity=Verbosity(args.verbose), 1083 preserve_fixtures=args.preserve_fixtures, 1084 stream=StringIO(), # consume the output instead of redirecting to stdout 1085 )
Run unit test(s).
1087 @magic_arguments() 1088 @argument( 1089 "models", type=str, nargs="*", help="A model to audit. Multiple models can be audited." 1090 ) 1091 @argument("--start", "-s", type=str, help="Start date to audit.") 1092 @argument("--end", "-e", type=str, help="End date to audit.") 1093 @argument("--execution-time", type=str, help="Execution time.") 1094 @line_magic 1095 @pass_sqlmesh_context 1096 def audit(self, context: Context, line: str) -> bool: 1097 """Run audit(s)""" 1098 args = parse_argstring(self.audit, line) 1099 return context.audit( 1100 models=args.models, start=args.start, end=args.end, execution_time=args.execution_time 1101 )
Run audit(s)
1103 @magic_arguments() 1104 @argument("environment", nargs="?", type=str, help="The environment to check intervals for.") 1105 @argument( 1106 "--no-signals", 1107 action="store_true", 1108 help="Disable signal checks and only show missing intervals.", 1109 default=False, 1110 ) 1111 @argument( 1112 "--select-model", 1113 type=str, 1114 nargs="*", 1115 help="Select specific model changes that should be included in the plan.", 1116 ) 1117 @argument("--start", "-s", type=str, help="Start date of intervals to check for.") 1118 @argument("--end", "-e", type=str, help="End date of intervals to check for.") 1119 @line_magic 1120 @pass_sqlmesh_context 1121 def check_intervals(self, context: Context, line: str) -> None: 1122 """Show missing intervals in an environment, respecting signals.""" 1123 args = parse_argstring(self.check_intervals, line) 1124 1125 context.console.show_intervals( 1126 context.check_intervals( 1127 environment=args.environment, 1128 no_signals=args.no_signals, 1129 select_models=args.select_model, 1130 start=args.start, 1131 end=args.end, 1132 ) 1133 )
Show missing intervals in an environment, respecting signals.
1135 @magic_arguments() 1136 @argument( 1137 "--skip-connection", 1138 action="store_true", 1139 help="Skip the connection test.", 1140 default=False, 1141 ) 1142 @argument( 1143 "--verbose", 1144 "-v", 1145 action="count", 1146 default=0, 1147 help="Verbose output. Use -vv for very verbose.", 1148 ) 1149 @line_magic 1150 @pass_sqlmesh_context 1151 def info(self, context: Context, line: str) -> None: 1152 """Display SQLMesh project information.""" 1153 args = parse_argstring(self.info, line) 1154 context.print_info(skip_connection=args.skip_connection, verbosity=Verbosity(args.verbose))
Display SQLMesh project information.
1156 @magic_arguments() 1157 @line_magic 1158 @pass_sqlmesh_context 1159 def rollback(self, context: Context, line: str) -> None: 1160 """Rollback SQLMesh to the previous migration.""" 1161 context.rollback()
Rollback SQLMesh to the previous migration.
1163 @magic_arguments() 1164 @line_magic 1165 @pass_sqlmesh_context 1166 def clean(self, context: Context, line: str) -> None: 1167 """Clears the SQLMesh cache and any build artifacts.""" 1168 context.clear_caches() 1169 context.console.log_success("SQLMesh cache and build artifacts cleared")
Clears the SQLMesh cache and any build artifacts.
1171 @magic_arguments() 1172 @line_magic 1173 @pass_sqlmesh_context 1174 def environments(self, context: Context, line: str) -> None: 1175 """Prints the list of SQLMesh environments with its expiry datetime.""" 1176 context.print_environment_names()
Prints the list of SQLMesh environments with its expiry datetime.
1178 @magic_arguments() 1179 @argument( 1180 "--models", 1181 "--model", 1182 type=str, 1183 nargs="*", 1184 help="A model to lint. Multiple models can be linted. If no models are specified, every model will be linted.", 1185 ) 1186 @line_magic 1187 @pass_sqlmesh_context 1188 def lint(self, context: Context, line: str) -> None: 1189 """Run linter for target model(s)""" 1190 args = parse_argstring(self.lint, line) 1191 context.lint_models(args.models)
Run linter for target model(s)
1193 @magic_arguments() 1194 @line_magic 1195 @pass_sqlmesh_context 1196 def destroy(self, context: Context, line: str) -> None: 1197 """Removes all project resources, engine-managed objects, state tables and clears the SQLMesh cache.""" 1198 context.destroy()
Removes all project resources, engine-managed objects, state tables and clears the SQLMesh cache.
Inherited Members
- IPython.core.magic.Magics
- Magics
- options_table
- shell
- arg_err
- format_latex
- parse_options
- default_option
- traitlets.config.configurable.Configurable
- config
- parent
- section_names
- update_config
- class_get_help
- class_get_trait_help
- class_print_help
- class_config_section
- class_config_rst_doc
- traitlets.traitlets.HasTraits
- setup_instance
- cross_validation_lock
- hold_trait_notifications
- notify_change
- on_trait_change
- observe
- unobserve
- unobserve_all
- add_traits
- set_trait
- class_trait_names
- class_traits
- class_own_traits
- has_trait
- trait_has_value
- trait_values
- trait_defaults
- trait_names
- traits
- trait_metadata
- class_own_trait_events
- trait_events