sqlmesh.magics
1from __future__ import annotations 2 3import functools 4import logging 5import typing as t 6from collections import defaultdict 7 8from hyperscript import h 9from IPython.core.display import display 10from IPython.core.magic import ( 11 Magics, 12 cell_magic, 13 line_cell_magic, 14 line_magic, 15 magics_class, 16) 17from IPython.core.magic_arguments import argument, magic_arguments, parse_argstring 18from rich.jupyter import JupyterRenderable 19 20from sqlmesh.cli.example_project import ProjectTemplate, init_example_project 21from sqlmesh.core import constants as c 22from sqlmesh.core.config import load_configs 23from sqlmesh.core.console import get_console 24from sqlmesh.core.context import Context 25from sqlmesh.core.dialect import format_model_expressions, parse 26from sqlmesh.core.model import load_sql_based_model 27from sqlmesh.core.test import ModelTestMetadata, get_all_model_tests 28from sqlmesh.utils import sqlglot_dialects, yaml 29from sqlmesh.utils.errors import MagicError, MissingContextException, SQLMeshError 30 31logger = logging.getLogger(__name__) 32 33CONTEXT_VARIABLE_NAMES = [ 34 "context", 35 "ctx", 36 "sqlmesh", 37] 38 39 40def pass_sqlmesh_context(func: t.Callable) -> t.Callable: 41 @functools.wraps(func) 42 def wrapper(self: SQLMeshMagics, *args: t.Any, **kwargs: t.Any) -> None: 43 for variable_name in CONTEXT_VARIABLE_NAMES: 44 context = self._shell.user_ns.get(variable_name) 45 if isinstance(context, Context): 46 break 47 else: 48 raise MissingContextException( 49 f"Context must be defined and initialized with one of these names: {', '.join(CONTEXT_VARIABLE_NAMES)}" 50 ) 51 old_console = context.console 52 context.console = get_console(display=self.display) 53 context.refresh() 54 func(self, context, *args, **kwargs) 55 context.console = old_console 56 57 return wrapper 58 59 60@magics_class 61class SQLMeshMagics(Magics): 62 @property 63 def display(self) -> t.Callable: 64 from sqlmesh import RuntimeEnv 65 66 if RuntimeEnv.get().is_databricks: 67 # Use Databricks' special display instead of the normal IPython display 68 return self._shell.user_ns["display"] 69 return display 70 71 @property 72 def _shell(self) -> t.Any: 73 # Make mypy happy. 74 if not self.shell: 75 raise RuntimeError("IPython Magics are in invalid state") 76 return self.shell 77 78 @magic_arguments() 79 @argument( 80 "paths", 81 type=str, 82 nargs="+", 83 default="", 84 help="The path(s) to the SQLMesh project(s).", 85 ) 86 @argument( 87 "--config", 88 type=str, 89 help="Name of the config object. Only applicable to configuration defined using Python script.", 90 ) 91 @argument("--gateway", type=str, help="The name of the gateway.") 92 @argument("--ignore-warnings", action="store_true", help="Ignore warnings.") 93 @argument("--debug", action="store_true", help="Enable debug mode.") 94 @argument("--log-file-dir", type=str, help="The directory to write the log file to.") 95 @line_magic 96 def context(self, line: str) -> None: 97 """Sets the context in the user namespace.""" 98 from sqlmesh import configure_logging 99 100 args = parse_argstring(self.context, line) 101 configs = load_configs(args.config, Context.CONFIG_TYPE, args.paths) 102 log_limit = list(configs.values())[0].log_limit 103 configure_logging( 104 args.debug, args.ignore_warnings, log_limit=log_limit, log_file_dir=args.log_file_dir 105 ) 106 try: 107 context = Context(paths=args.paths, config=configs, gateway=args.gateway) 108 self._shell.user_ns["context"] = context 109 except Exception: 110 if args.debug: 111 logger.exception("Failed to initialize SQLMesh context") 112 raise 113 context.console.log_success(f"SQLMesh project context set to: {', '.join(args.paths)}") 114 115 @magic_arguments() 116 @argument("path", type=str, help="The path where the new SQLMesh project should be created.") 117 @argument( 118 "sql_dialect", 119 type=str, 120 help=f"Default model SQL dialect. Supported values: {sqlglot_dialects()}.", 121 ) 122 @argument( 123 "--template", 124 "-t", 125 type=str, 126 help="Project template. Supported values: airflow, dbt, default, empty.", 127 ) 128 @line_magic 129 def init(self, line: str) -> None: 130 """Creates a SQLMesh project scaffold with a default SQL dialect.""" 131 args = parse_argstring(self.init, line) 132 try: 133 project_template = ProjectTemplate( 134 args.template.lower() if args.template else "default" 135 ) 136 except ValueError: 137 raise MagicError(f"Invalid project template '{args.template}'") 138 init_example_project(args.path, args.sql_dialect, project_template) 139 html = str( 140 h( 141 "div", 142 h( 143 "span", 144 {"style": {"color": "green", "font-weight": "bold"}}, 145 "SQLMesh project scaffold created", 146 ), 147 ) 148 ) 149 self.display(JupyterRenderable(html=html, text="")) 150 151 @magic_arguments() 152 @argument("model", type=str, help="The model.") 153 @argument("--start", "-s", type=str, help="Start date to render.") 154 @argument("--end", "-e", type=str, help="End date to render.") 155 @argument("--execution-time", type=str, help="Execution time.") 156 @argument("--dialect", "-d", type=str, help="The rendered dialect.") 157 @line_cell_magic 158 @pass_sqlmesh_context 159 def model(self, context: Context, line: str, sql: t.Optional[str] = None) -> None: 160 """Renders the model and automatically fills in an editable cell with the model definition.""" 161 args = parse_argstring(self.model, line) 162 163 model = context.get_model(args.model, raise_if_missing=True) 164 config = context.config_for_node(model) 165 166 if sql: 167 expressions = parse(sql, default_dialect=config.dialect) 168 loaded = load_sql_based_model( 169 expressions, 170 macros=context._macros, 171 jinja_macros=context._jinja_macros, 172 path=model._path, 173 dialect=config.dialect, 174 time_column_format=config.time_column_format, 175 physical_schema_override=context.config.physical_schema_override, 176 default_catalog=context.default_catalog, 177 ) 178 179 if loaded.name == args.model: 180 model = loaded 181 else: 182 with open(model._path, "r", encoding="utf-8") as file: 183 expressions = parse(file.read(), default_dialect=config.dialect) 184 185 formatted = format_model_expressions( 186 expressions, model.dialect, **config.format.generator_options 187 ) 188 189 self._shell.set_next_input( 190 "\n".join( 191 [ 192 " ".join(["%%model", line]), 193 formatted, 194 ] 195 ), 196 replace=True, 197 ) 198 199 with open(model._path, "w", encoding="utf-8") as file: 200 file.write(formatted) 201 202 if sql: 203 context.console.log_success(f"Model `{args.model}` updated") 204 205 context.upsert_model(model) 206 context.console.show_sql( 207 context.render( 208 model.name, 209 start=args.start, 210 end=args.end, 211 execution_time=args.execution_time, 212 ).sql(pretty=True, dialect=args.dialect or model.dialect) 213 ) 214 215 @magic_arguments() 216 @argument("model", type=str, help="The model.") 217 @argument("test_name", type=str, nargs="?", default=None, help="The test name to display") 218 @argument("--ls", action="store_true", help="List tests associated with a model") 219 @line_cell_magic 220 @pass_sqlmesh_context 221 def test(self, context: Context, line: str, test_def_raw: t.Optional[str] = None) -> None: 222 """Allow the user to list tests for a model, output a specific test, and then write their changes back""" 223 args = parse_argstring(self.test, line) 224 if not args.test_name and not args.ls: 225 raise MagicError("Must provide either test name or `--ls` to list tests") 226 227 test_meta = [] 228 229 for path, config in context.configs.items(): 230 test_meta.extend( 231 get_all_model_tests( 232 path / c.TESTS, 233 ignore_patterns=config.ignore_patterns, 234 ) 235 ) 236 237 tests: t.Dict[str, t.Dict[str, ModelTestMetadata]] = defaultdict(dict) 238 for model_test_metadata in test_meta: 239 model = model_test_metadata.body.get("model") 240 if not model: 241 context.console.log_error( 242 f"Test found that does not have `model` defined: {model_test_metadata.path}" 243 ) 244 else: 245 tests[model][model_test_metadata.test_name] = model_test_metadata 246 247 model = context.get_model(args.model, raise_if_missing=True) 248 249 if args.ls: 250 # TODO: Provide better UI for displaying tests 251 for test_name in tests[model.name]: 252 context.console.log_status_update(test_name) 253 return 254 255 test = tests[model.name][args.test_name] 256 test_def = yaml.load(test_def_raw) if test_def_raw else test.body 257 test_def_output = yaml.dump(test_def) 258 259 self._shell.set_next_input( 260 "\n".join( 261 [ 262 " ".join(["%%test", line]), 263 test_def_output, 264 ] 265 ), 266 replace=True, 267 ) 268 269 with open(test.path, "r+", encoding="utf-8") as file: 270 content = yaml.load(file.read()) 271 content[args.test_name] = test_def 272 file.seek(0) 273 yaml.dump(content, file) 274 file.truncate() 275 276 @magic_arguments() 277 @argument( 278 "environment", 279 nargs="?", 280 type=str, 281 help="The environment to run the plan against", 282 ) 283 @argument("--start", "-s", type=str, help="Start date to backfill.") 284 @argument("--end", "-e", type=str, help="End date to backfill.") 285 @argument("--execution-time", type=str, help="Execution time.") 286 @argument( 287 "--create-from", 288 type=str, 289 help="The environment to create the target environment from if it doesn't exist. Default: prod.", 290 ) 291 @argument( 292 "--skip-tests", 293 "-t", 294 action="store_true", 295 help="Skip the unit tests defined for the model.", 296 ) 297 @argument( 298 "--restate-model", 299 "-r", 300 type=str, 301 nargs="*", 302 help="Restate data for specified models (and models downstream from the one specified). For production environment, all related model versions will have their intervals wiped, but only the current versions will be backfilled. For development environment, only the current model versions will be affected.", 303 ) 304 @argument( 305 "--no-gaps", 306 "-g", 307 action="store_true", 308 help="Ensure that new snapshots have no data gaps when comparing to existing snapshots for matching models in the target environment.", 309 ) 310 @argument( 311 "--skip-backfill", 312 action="store_true", 313 help="Skip the backfill step.", 314 ) 315 @argument( 316 "--forward-only", 317 action="store_true", 318 help="Create a plan for forward-only changes.", 319 default=None, 320 ) 321 @argument( 322 "--effective-from", 323 type=str, 324 help="The effective date from which to apply forward-only changes on production.", 325 ) 326 @argument( 327 "--no-prompts", 328 action="store_true", 329 help="Disables interactive prompts for the backfill time range. Please note that if this flag is set and there are uncategorized changes, plan creation will fail.", 330 default=None, 331 ) 332 @argument( 333 "--auto-apply", 334 action="store_true", 335 help="Automatically applies the new plan after creation.", 336 default=None, 337 ) 338 @argument( 339 "--no-auto-categorization", 340 action="store_true", 341 help="Disable automatic change categorization.", 342 default=None, 343 ) 344 @argument( 345 "--include-unmodified", 346 action="store_true", 347 help="Include unmodified models in the target environment.", 348 default=None, 349 ) 350 @argument( 351 "--select-model", 352 type=str, 353 nargs="*", 354 help="Select specific model changes that should be included in the plan.", 355 ) 356 @argument( 357 "--backfill-model", 358 type=str, 359 nargs="*", 360 help="Backfill only the models whose names match the expression. This is supported only when targeting a development environment.", 361 ) 362 @argument( 363 "--no-diff", 364 action="store_true", 365 help="Hide text differences for changed models.", 366 default=None, 367 ) 368 @argument( 369 "--run", 370 action="store_true", 371 help="Run latest intervals as part of the plan application (prod environment only).", 372 ) 373 @argument( 374 "--enable-preview", 375 action="store_true", 376 help="Enable preview for forward-only models when targeting a development environment.", 377 default=None, 378 ) 379 @line_magic 380 @pass_sqlmesh_context 381 def plan(self, context: Context, line: str) -> None: 382 """Goes through a set of prompts to both establish a plan and apply it""" 383 args = parse_argstring(self.plan, line) 384 385 context.plan( 386 args.environment, 387 start=args.start, 388 end=args.end, 389 execution_time=args.execution_time, 390 create_from=args.create_from, 391 skip_tests=args.skip_tests, 392 restate_models=args.restate_model, 393 backfill_models=args.backfill_model, 394 no_gaps=args.no_gaps, 395 skip_backfill=args.skip_backfill, 396 forward_only=args.forward_only, 397 no_prompts=args.no_prompts, 398 auto_apply=args.auto_apply, 399 no_auto_categorization=args.no_auto_categorization, 400 effective_from=args.effective_from, 401 include_unmodified=args.include_unmodified, 402 select_models=args.select_model, 403 no_diff=args.no_diff, 404 run=args.run, 405 enable_preview=args.enable_preview, 406 ) 407 408 @magic_arguments() 409 @argument( 410 "environment", 411 nargs="?", 412 type=str, 413 help="The environment to run against", 414 ) 415 @argument("--start", "-s", type=str, help="Start date to evaluate.") 416 @argument("--end", "-e", type=str, help="End date to evaluate.") 417 @argument("--skip-janitor", action="store_true", help="Skip the janitor task.") 418 @argument( 419 "--ignore-cron", 420 action="store_true", 421 help="Run for all missing intervals, ignoring individual cron schedules.", 422 ) 423 @line_magic 424 @pass_sqlmesh_context 425 def run_dag(self, context: Context, line: str) -> None: 426 """Evaluate the DAG of models using the built-in scheduler.""" 427 args = parse_argstring(self.run_dag, line) 428 429 success = context.run( 430 args.environment, 431 start=args.start, 432 end=args.end, 433 skip_janitor=args.skip_janitor, 434 ignore_cron=args.ignore_cron, 435 ) 436 if not success: 437 raise SQLMeshError("Error Running DAG. Check logs for details.") 438 439 @magic_arguments() 440 @argument("model", type=str, help="The model.") 441 @argument("--start", "-s", type=str, help="Start date to render.") 442 @argument("--end", "-e", type=str, help="End date to render.") 443 @argument("--execution-time", type=str, help="Execution time.") 444 @argument( 445 "--limit", 446 type=int, 447 help="The number of rows which the query should be limited to.", 448 ) 449 @line_magic 450 @pass_sqlmesh_context 451 def evaluate(self, context: Context, line: str) -> None: 452 """Evaluate a model query and fetches a dataframe.""" 453 context.refresh() 454 args = parse_argstring(self.evaluate, line) 455 456 df = context.evaluate( 457 args.model, 458 start=args.start, 459 end=args.end, 460 execution_time=args.execution_time, 461 limit=args.limit, 462 ) 463 self.display(df) 464 465 @magic_arguments() 466 @argument("model", type=str, help="The model.") 467 @argument("--start", "-s", type=str, help="Start date to render.") 468 @argument("--end", "-e", type=str, help="End date to render.") 469 @argument("--execution-time", type=str, help="Execution time.") 470 @argument( 471 "--expand", 472 type=t.Union[bool, t.Iterable[str]], 473 help="Whether or not to use expand materialized models, defaults to False. If True, all referenced models are expanded as raw queries. If a list, only referenced models are expanded as raw queries.", 474 ) 475 @argument("--dialect", type=str, help="SQL dialect to render.") 476 @argument("--no-format", action="store_true", help="Disable fancy formatting of the query.") 477 @line_magic 478 @pass_sqlmesh_context 479 def render(self, context: Context, line: str) -> None: 480 """Renders a model's query, optionally expanding referenced models.""" 481 context.refresh() 482 args = parse_argstring(self.render, line) 483 484 query = context.render( 485 args.model, 486 start=args.start, 487 end=args.end, 488 execution_time=args.execution_time, 489 expand=args.expand, 490 ) 491 492 sql = query.sql(pretty=True, dialect=args.dialect or context.config.dialect) 493 if args.no_format: 494 context.console.log_status_update(sql) 495 else: 496 context.console.show_sql(sql) 497 498 @magic_arguments() 499 @argument( 500 "df_var", 501 default=None, 502 nargs="?", 503 type=str, 504 help="An optional variable name to store the resulting dataframe.", 505 ) 506 @cell_magic 507 @pass_sqlmesh_context 508 def fetchdf(self, context: Context, line: str, sql: str) -> None: 509 """Fetches a dataframe from sql, optionally storing it in a variable.""" 510 args = parse_argstring(self.fetchdf, line) 511 df = context.fetchdf(sql) 512 if args.df_var: 513 self._shell.user_ns[args.df_var] = df 514 self.display(df) 515 516 @magic_arguments() 517 @argument("--file", "-f", type=str, help="An optional file path to write the HTML output to.") 518 @argument( 519 "--select-model", 520 type=str, 521 nargs="*", 522 help="Select specific models to include in the dag.", 523 ) 524 @line_magic 525 @pass_sqlmesh_context 526 def dag(self, context: Context, line: str) -> None: 527 """Displays the HTML DAG.""" 528 args = parse_argstring(self.dag, line) 529 dag = context.get_dag(args.select_model) 530 if args.file: 531 with open(args.file, "w", encoding="utf-8") as file: 532 file.write(str(dag)) 533 # TODO: Have this go through console instead of calling display directly 534 self.display(dag) 535 536 @magic_arguments() 537 @line_magic 538 @pass_sqlmesh_context 539 def migrate(self, context: Context, line: str) -> None: 540 """Migrate SQLMesh to the current running version.""" 541 context.migrate() 542 context.console.log_success("Migration complete") 543 544 @magic_arguments() 545 @line_magic 546 @pass_sqlmesh_context 547 def create_external_models(self, context: Context, line: str) -> None: 548 """Create a schema file containing external model schemas.""" 549 context.create_external_models() 550 551 @magic_arguments() 552 @argument( 553 "source_to_target", 554 type=str, 555 metavar="SOURCE:TARGET", 556 help="Source and target in `SOURCE:TARGET` format", 557 ) 558 @argument( 559 "--on", 560 type=str, 561 nargs="*", 562 help="The column to join on. Can be specified multiple times. The model grain will be used if not specified.", 563 ) 564 @argument( 565 "--model", 566 type=str, 567 help="The model to diff against when source and target are environments and not tables.", 568 ) 569 @argument( 570 "--where", 571 type=str, 572 help="An optional where statement to filter results.", 573 ) 574 @argument( 575 "--limit", 576 type=int, 577 default=20, 578 help="The limit of the sample dataframe.", 579 ) 580 @argument( 581 "--show-sample", 582 action="store_true", 583 help="Show a sample of the rows that differ. With many columns, the output can be very wide.", 584 ) 585 @line_magic 586 @pass_sqlmesh_context 587 def table_diff(self, context: Context, line: str) -> None: 588 """Show the diff between two tables. 589 590 Can either be two tables or two environments and a model. 591 """ 592 args = parse_argstring(self.table_diff, line) 593 source, target = args.source_to_target.split(":") 594 context.table_diff( 595 source=source, 596 target=target, 597 on=args.on, 598 model_or_snapshot=args.model, 599 where=args.where, 600 limit=args.limit, 601 show_sample=args.show_sample, 602 ) 603 604 @magic_arguments() 605 @argument( 606 "model_name", 607 nargs="?", 608 type=str, 609 help="The name of the model to get the table name for.", 610 ) 611 @argument( 612 "--dev", 613 action="store_true", 614 help="Print the name of the snapshot table used for previews in development environments.", 615 ) 616 @line_magic 617 @pass_sqlmesh_context 618 def table_name(self, context: Context, line: str) -> None: 619 """Prints the name of the physical table for the given model.""" 620 args = parse_argstring(self.table_name, line) 621 context.console.log_status_update(context.table_name(args.model_name, args.dev)) 622 623 @magic_arguments() 624 @argument( 625 "--read", 626 type=str, 627 default="", 628 help="The input dialect of the sql string.", 629 ) 630 @argument( 631 "--write", 632 type=str, 633 default="", 634 help="The output dialect of the sql string.", 635 ) 636 @line_cell_magic 637 @pass_sqlmesh_context 638 def rewrite(self, context: Context, line: str, sql: str) -> None: 639 """Rewrite a sql expression with semantic references into an executable query. 640 641 https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/ 642 """ 643 args = parse_argstring(self.rewrite, line) 644 context.console.show_sql( 645 context.rewrite(sql, args.read).sql( 646 dialect=args.write or context.config.dialect, pretty=True 647 ) 648 ) 649 650 @magic_arguments() 651 @argument( 652 "--transpile", 653 "-t", 654 type=str, 655 help="Transpile project models to the specified dialect.", 656 ) 657 @argument( 658 "--append-newline", 659 action="store_true", 660 help="Whether or not to append a newline to the end of the file.", 661 default=None, 662 ) 663 @argument( 664 "--normalize", 665 action="store_true", 666 help="Whether or not to normalize identifiers to lowercase.", 667 default=None, 668 ) 669 @argument( 670 "--pad", 671 type=int, 672 help="Determines the pad size in a formatted string.", 673 ) 674 @argument( 675 "--indent", 676 type=int, 677 help="Determines the indentation size in a formatted string.", 678 ) 679 @argument( 680 "--normalize-functions", 681 type=str, 682 help="Whether or not to normalize all function names. Possible values are: 'upper', 'lower'", 683 ) 684 @argument( 685 "--leading-comma", 686 action="store_true", 687 help="Determines whether or not the comma is leading or trailing in select expressions. Default is trailing.", 688 default=None, 689 ) 690 @argument( 691 "--max-text-width", 692 type=int, 693 help="The max number of characters in a segment before creating new lines in pretty mode.", 694 ) 695 @line_magic 696 @pass_sqlmesh_context 697 def format(self, context: Context, line: str) -> None: 698 """Format all SQL models.""" 699 args = parse_argstring(self.format, line) 700 context.format(**{k: v for k, v in vars(args).items() if v is not None}) 701 702 @magic_arguments() 703 @argument("environment", type=str, help="The environment to diff local state against.") 704 @line_magic 705 @pass_sqlmesh_context 706 def diff(self, context: Context, line: str) -> None: 707 """Show the diff between the local state and the target environment.""" 708 args = parse_argstring(self.diff, line) 709 context.diff(args.environment) 710 711 @magic_arguments() 712 @argument("environment", type=str, help="The environment to invalidate.") 713 @line_magic 714 @pass_sqlmesh_context 715 def invalidate(self, context: Context, line: str) -> None: 716 """Invalidate the target environment, forcing its removal during the next run of the janitor process.""" 717 args = parse_argstring(self.invalidate, line) 718 context.invalidate_environment(args.environment) 719 720 @magic_arguments() 721 @argument("model", type=str) 722 @argument( 723 "--query", 724 "-q", 725 type=str, 726 nargs="+", 727 required=True, 728 help="Queries that will be used to generate data for the model's dependencies.", 729 ) 730 @argument( 731 "--overwrite", 732 "-o", 733 action="store_true", 734 help="When true, the fixture file will be overwritten in case it already exists.", 735 ) 736 @argument( 737 "--var", 738 "-v", 739 type=str, 740 nargs="+", 741 help="Key-value pairs that will define variables needed by the model.", 742 ) 743 @argument( 744 "--path", 745 "-p", 746 type=str, 747 help="The file path corresponding to the fixture, relative to the test directory. " 748 "By default, the fixture will be created under the test directory and the file " 749 "name will be inferred based on the test's name.", 750 ) 751 @argument( 752 "--name", 753 "-n", 754 type=str, 755 help="The name of the test that will be created. By default, it's inferred based on the model's name.", 756 ) 757 @argument( 758 "--include-ctes", 759 action="store_true", 760 help="When true, CTE fixtures will also be generated.", 761 ) 762 @line_magic 763 @pass_sqlmesh_context 764 def create_test(self, context: Context, line: str) -> None: 765 """Generate a unit test fixture for a given model.""" 766 args = parse_argstring(self.create_test, line) 767 queries = iter(args.query) 768 variables = iter(args.var) if args.var else None 769 context.create_test( 770 args.model, 771 input_queries={k: v.strip('"') for k, v in dict(zip(queries, queries)).items()}, 772 overwrite=args.overwrite, 773 variables=dict(zip(variables, variables)) if variables else None, 774 path=args.path, 775 name=args.name, 776 include_ctes=args.include_ctes, 777 ) 778 779 @magic_arguments() 780 @argument("tests", nargs="*", type=str) 781 @argument( 782 "--pattern", 783 "-k", 784 nargs="*", 785 type=str, 786 help="Only run tests that match the pattern of substring.", 787 ) 788 @argument("--verbose", "-v", action="store_true", help="Verbose output.") 789 @argument( 790 "--preserve-fixtures", 791 action="store_true", 792 help="Preserve the fixture tables in the testing database, useful for debugging.", 793 ) 794 @line_magic 795 @pass_sqlmesh_context 796 def run_test(self, context: Context, line: str) -> None: 797 """Run unit test(s).""" 798 args = parse_argstring(self.run_test, line) 799 context.test( 800 match_patterns=args.pattern, 801 tests=args.tests, 802 verbose=args.verbose, 803 preserve_fixtures=args.preserve_fixtures, 804 ) 805 806 @magic_arguments() 807 @argument( 808 "models", type=str, nargs="*", help="A model to audit. Multiple models can be audited." 809 ) 810 @argument("--start", "-s", type=str, help="Start date to audit.") 811 @argument("--end", "-e", type=str, help="End date to audit.") 812 @argument("--execution-time", type=str, help="Execution time.") 813 @line_magic 814 @pass_sqlmesh_context 815 def audit(self, context: Context, line: str) -> None: 816 """Run audit(s)""" 817 args = parse_argstring(self.audit, line) 818 context.audit( 819 models=args.models, start=args.start, end=args.end, execution_time=args.execution_time 820 ) 821 822 @magic_arguments() 823 @line_magic 824 @pass_sqlmesh_context 825 def info(self, context: Context, line: str) -> None: 826 """Display SQLMesh project information.""" 827 context.print_info() 828 829 @magic_arguments() 830 @line_magic 831 @pass_sqlmesh_context 832 def rollback(self, context: Context, line: str) -> None: 833 """Rollback SQLMesh to the previous migration.""" 834 context.rollback() 835 836 @magic_arguments() 837 @line_magic 838 @pass_sqlmesh_context 839 def clean(self, context: Context, line: str) -> None: 840 """Clears the SQLMesh cache and any build artifacts.""" 841 context.clear_caches() 842 context.console.log_success("SQLMesh cache and build artifacts cleared") 843 844 845def register_magics() -> None: 846 try: 847 shell = get_ipython() # type: ignore 848 shell.register_magics(SQLMeshMagics) 849 except NameError: 850 pass
41def pass_sqlmesh_context(func: t.Callable) -> t.Callable: 42 @functools.wraps(func) 43 def wrapper(self: SQLMeshMagics, *args: t.Any, **kwargs: t.Any) -> None: 44 for variable_name in CONTEXT_VARIABLE_NAMES: 45 context = self._shell.user_ns.get(variable_name) 46 if isinstance(context, Context): 47 break 48 else: 49 raise MissingContextException( 50 f"Context must be defined and initialized with one of these names: {', '.join(CONTEXT_VARIABLE_NAMES)}" 51 ) 52 old_console = context.console 53 context.console = get_console(display=self.display) 54 context.refresh() 55 func(self, context, *args, **kwargs) 56 context.console = old_console 57 58 return wrapper
62class SQLMeshMagics(Magics): 63 @property 64 def display(self) -> t.Callable: 65 from sqlmesh import RuntimeEnv 66 67 if RuntimeEnv.get().is_databricks: 68 # Use Databricks' special display instead of the normal IPython display 69 return self._shell.user_ns["display"] 70 return display 71 72 @property 73 def _shell(self) -> t.Any: 74 # Make mypy happy. 75 if not self.shell: 76 raise RuntimeError("IPython Magics are in invalid state") 77 return self.shell 78 79 @magic_arguments() 80 @argument( 81 "paths", 82 type=str, 83 nargs="+", 84 default="", 85 help="The path(s) to the SQLMesh project(s).", 86 ) 87 @argument( 88 "--config", 89 type=str, 90 help="Name of the config object. Only applicable to configuration defined using Python script.", 91 ) 92 @argument("--gateway", type=str, help="The name of the gateway.") 93 @argument("--ignore-warnings", action="store_true", help="Ignore warnings.") 94 @argument("--debug", action="store_true", help="Enable debug mode.") 95 @argument("--log-file-dir", type=str, help="The directory to write the log file to.") 96 @line_magic 97 def context(self, line: str) -> None: 98 """Sets the context in the user namespace.""" 99 from sqlmesh import configure_logging 100 101 args = parse_argstring(self.context, line) 102 configs = load_configs(args.config, Context.CONFIG_TYPE, args.paths) 103 log_limit = list(configs.values())[0].log_limit 104 configure_logging( 105 args.debug, args.ignore_warnings, log_limit=log_limit, log_file_dir=args.log_file_dir 106 ) 107 try: 108 context = Context(paths=args.paths, config=configs, gateway=args.gateway) 109 self._shell.user_ns["context"] = context 110 except Exception: 111 if args.debug: 112 logger.exception("Failed to initialize SQLMesh context") 113 raise 114 context.console.log_success(f"SQLMesh project context set to: {', '.join(args.paths)}") 115 116 @magic_arguments() 117 @argument("path", type=str, help="The path where the new SQLMesh project should be created.") 118 @argument( 119 "sql_dialect", 120 type=str, 121 help=f"Default model SQL dialect. Supported values: {sqlglot_dialects()}.", 122 ) 123 @argument( 124 "--template", 125 "-t", 126 type=str, 127 help="Project template. Supported values: airflow, dbt, default, empty.", 128 ) 129 @line_magic 130 def init(self, line: str) -> None: 131 """Creates a SQLMesh project scaffold with a default SQL dialect.""" 132 args = parse_argstring(self.init, line) 133 try: 134 project_template = ProjectTemplate( 135 args.template.lower() if args.template else "default" 136 ) 137 except ValueError: 138 raise MagicError(f"Invalid project template '{args.template}'") 139 init_example_project(args.path, args.sql_dialect, project_template) 140 html = str( 141 h( 142 "div", 143 h( 144 "span", 145 {"style": {"color": "green", "font-weight": "bold"}}, 146 "SQLMesh project scaffold created", 147 ), 148 ) 149 ) 150 self.display(JupyterRenderable(html=html, text="")) 151 152 @magic_arguments() 153 @argument("model", type=str, help="The model.") 154 @argument("--start", "-s", type=str, help="Start date to render.") 155 @argument("--end", "-e", type=str, help="End date to render.") 156 @argument("--execution-time", type=str, help="Execution time.") 157 @argument("--dialect", "-d", type=str, help="The rendered dialect.") 158 @line_cell_magic 159 @pass_sqlmesh_context 160 def model(self, context: Context, line: str, sql: t.Optional[str] = None) -> None: 161 """Renders the model and automatically fills in an editable cell with the model definition.""" 162 args = parse_argstring(self.model, line) 163 164 model = context.get_model(args.model, raise_if_missing=True) 165 config = context.config_for_node(model) 166 167 if sql: 168 expressions = parse(sql, default_dialect=config.dialect) 169 loaded = load_sql_based_model( 170 expressions, 171 macros=context._macros, 172 jinja_macros=context._jinja_macros, 173 path=model._path, 174 dialect=config.dialect, 175 time_column_format=config.time_column_format, 176 physical_schema_override=context.config.physical_schema_override, 177 default_catalog=context.default_catalog, 178 ) 179 180 if loaded.name == args.model: 181 model = loaded 182 else: 183 with open(model._path, "r", encoding="utf-8") as file: 184 expressions = parse(file.read(), default_dialect=config.dialect) 185 186 formatted = format_model_expressions( 187 expressions, model.dialect, **config.format.generator_options 188 ) 189 190 self._shell.set_next_input( 191 "\n".join( 192 [ 193 " ".join(["%%model", line]), 194 formatted, 195 ] 196 ), 197 replace=True, 198 ) 199 200 with open(model._path, "w", encoding="utf-8") as file: 201 file.write(formatted) 202 203 if sql: 204 context.console.log_success(f"Model `{args.model}` updated") 205 206 context.upsert_model(model) 207 context.console.show_sql( 208 context.render( 209 model.name, 210 start=args.start, 211 end=args.end, 212 execution_time=args.execution_time, 213 ).sql(pretty=True, dialect=args.dialect or model.dialect) 214 ) 215 216 @magic_arguments() 217 @argument("model", type=str, help="The model.") 218 @argument("test_name", type=str, nargs="?", default=None, help="The test name to display") 219 @argument("--ls", action="store_true", help="List tests associated with a model") 220 @line_cell_magic 221 @pass_sqlmesh_context 222 def test(self, context: Context, line: str, test_def_raw: t.Optional[str] = None) -> None: 223 """Allow the user to list tests for a model, output a specific test, and then write their changes back""" 224 args = parse_argstring(self.test, line) 225 if not args.test_name and not args.ls: 226 raise MagicError("Must provide either test name or `--ls` to list tests") 227 228 test_meta = [] 229 230 for path, config in context.configs.items(): 231 test_meta.extend( 232 get_all_model_tests( 233 path / c.TESTS, 234 ignore_patterns=config.ignore_patterns, 235 ) 236 ) 237 238 tests: t.Dict[str, t.Dict[str, ModelTestMetadata]] = defaultdict(dict) 239 for model_test_metadata in test_meta: 240 model = model_test_metadata.body.get("model") 241 if not model: 242 context.console.log_error( 243 f"Test found that does not have `model` defined: {model_test_metadata.path}" 244 ) 245 else: 246 tests[model][model_test_metadata.test_name] = model_test_metadata 247 248 model = context.get_model(args.model, raise_if_missing=True) 249 250 if args.ls: 251 # TODO: Provide better UI for displaying tests 252 for test_name in tests[model.name]: 253 context.console.log_status_update(test_name) 254 return 255 256 test = tests[model.name][args.test_name] 257 test_def = yaml.load(test_def_raw) if test_def_raw else test.body 258 test_def_output = yaml.dump(test_def) 259 260 self._shell.set_next_input( 261 "\n".join( 262 [ 263 " ".join(["%%test", line]), 264 test_def_output, 265 ] 266 ), 267 replace=True, 268 ) 269 270 with open(test.path, "r+", encoding="utf-8") as file: 271 content = yaml.load(file.read()) 272 content[args.test_name] = test_def 273 file.seek(0) 274 yaml.dump(content, file) 275 file.truncate() 276 277 @magic_arguments() 278 @argument( 279 "environment", 280 nargs="?", 281 type=str, 282 help="The environment to run the plan against", 283 ) 284 @argument("--start", "-s", type=str, help="Start date to backfill.") 285 @argument("--end", "-e", type=str, help="End date to backfill.") 286 @argument("--execution-time", type=str, help="Execution time.") 287 @argument( 288 "--create-from", 289 type=str, 290 help="The environment to create the target environment from if it doesn't exist. Default: prod.", 291 ) 292 @argument( 293 "--skip-tests", 294 "-t", 295 action="store_true", 296 help="Skip the unit tests defined for the model.", 297 ) 298 @argument( 299 "--restate-model", 300 "-r", 301 type=str, 302 nargs="*", 303 help="Restate data for specified models (and models downstream from the one specified). For production environment, all related model versions will have their intervals wiped, but only the current versions will be backfilled. For development environment, only the current model versions will be affected.", 304 ) 305 @argument( 306 "--no-gaps", 307 "-g", 308 action="store_true", 309 help="Ensure that new snapshots have no data gaps when comparing to existing snapshots for matching models in the target environment.", 310 ) 311 @argument( 312 "--skip-backfill", 313 action="store_true", 314 help="Skip the backfill step.", 315 ) 316 @argument( 317 "--forward-only", 318 action="store_true", 319 help="Create a plan for forward-only changes.", 320 default=None, 321 ) 322 @argument( 323 "--effective-from", 324 type=str, 325 help="The effective date from which to apply forward-only changes on production.", 326 ) 327 @argument( 328 "--no-prompts", 329 action="store_true", 330 help="Disables interactive prompts for the backfill time range. Please note that if this flag is set and there are uncategorized changes, plan creation will fail.", 331 default=None, 332 ) 333 @argument( 334 "--auto-apply", 335 action="store_true", 336 help="Automatically applies the new plan after creation.", 337 default=None, 338 ) 339 @argument( 340 "--no-auto-categorization", 341 action="store_true", 342 help="Disable automatic change categorization.", 343 default=None, 344 ) 345 @argument( 346 "--include-unmodified", 347 action="store_true", 348 help="Include unmodified models in the target environment.", 349 default=None, 350 ) 351 @argument( 352 "--select-model", 353 type=str, 354 nargs="*", 355 help="Select specific model changes that should be included in the plan.", 356 ) 357 @argument( 358 "--backfill-model", 359 type=str, 360 nargs="*", 361 help="Backfill only the models whose names match the expression. This is supported only when targeting a development environment.", 362 ) 363 @argument( 364 "--no-diff", 365 action="store_true", 366 help="Hide text differences for changed models.", 367 default=None, 368 ) 369 @argument( 370 "--run", 371 action="store_true", 372 help="Run latest intervals as part of the plan application (prod environment only).", 373 ) 374 @argument( 375 "--enable-preview", 376 action="store_true", 377 help="Enable preview for forward-only models when targeting a development environment.", 378 default=None, 379 ) 380 @line_magic 381 @pass_sqlmesh_context 382 def plan(self, context: Context, line: str) -> None: 383 """Goes through a set of prompts to both establish a plan and apply it""" 384 args = parse_argstring(self.plan, line) 385 386 context.plan( 387 args.environment, 388 start=args.start, 389 end=args.end, 390 execution_time=args.execution_time, 391 create_from=args.create_from, 392 skip_tests=args.skip_tests, 393 restate_models=args.restate_model, 394 backfill_models=args.backfill_model, 395 no_gaps=args.no_gaps, 396 skip_backfill=args.skip_backfill, 397 forward_only=args.forward_only, 398 no_prompts=args.no_prompts, 399 auto_apply=args.auto_apply, 400 no_auto_categorization=args.no_auto_categorization, 401 effective_from=args.effective_from, 402 include_unmodified=args.include_unmodified, 403 select_models=args.select_model, 404 no_diff=args.no_diff, 405 run=args.run, 406 enable_preview=args.enable_preview, 407 ) 408 409 @magic_arguments() 410 @argument( 411 "environment", 412 nargs="?", 413 type=str, 414 help="The environment to run against", 415 ) 416 @argument("--start", "-s", type=str, help="Start date to evaluate.") 417 @argument("--end", "-e", type=str, help="End date to evaluate.") 418 @argument("--skip-janitor", action="store_true", help="Skip the janitor task.") 419 @argument( 420 "--ignore-cron", 421 action="store_true", 422 help="Run for all missing intervals, ignoring individual cron schedules.", 423 ) 424 @line_magic 425 @pass_sqlmesh_context 426 def run_dag(self, context: Context, line: str) -> None: 427 """Evaluate the DAG of models using the built-in scheduler.""" 428 args = parse_argstring(self.run_dag, line) 429 430 success = context.run( 431 args.environment, 432 start=args.start, 433 end=args.end, 434 skip_janitor=args.skip_janitor, 435 ignore_cron=args.ignore_cron, 436 ) 437 if not success: 438 raise SQLMeshError("Error Running DAG. Check logs for details.") 439 440 @magic_arguments() 441 @argument("model", type=str, help="The model.") 442 @argument("--start", "-s", type=str, help="Start date to render.") 443 @argument("--end", "-e", type=str, help="End date to render.") 444 @argument("--execution-time", type=str, help="Execution time.") 445 @argument( 446 "--limit", 447 type=int, 448 help="The number of rows which the query should be limited to.", 449 ) 450 @line_magic 451 @pass_sqlmesh_context 452 def evaluate(self, context: Context, line: str) -> None: 453 """Evaluate a model query and fetches a dataframe.""" 454 context.refresh() 455 args = parse_argstring(self.evaluate, line) 456 457 df = context.evaluate( 458 args.model, 459 start=args.start, 460 end=args.end, 461 execution_time=args.execution_time, 462 limit=args.limit, 463 ) 464 self.display(df) 465 466 @magic_arguments() 467 @argument("model", type=str, help="The model.") 468 @argument("--start", "-s", type=str, help="Start date to render.") 469 @argument("--end", "-e", type=str, help="End date to render.") 470 @argument("--execution-time", type=str, help="Execution time.") 471 @argument( 472 "--expand", 473 type=t.Union[bool, t.Iterable[str]], 474 help="Whether or not to use expand materialized models, defaults to False. If True, all referenced models are expanded as raw queries. If a list, only referenced models are expanded as raw queries.", 475 ) 476 @argument("--dialect", type=str, help="SQL dialect to render.") 477 @argument("--no-format", action="store_true", help="Disable fancy formatting of the query.") 478 @line_magic 479 @pass_sqlmesh_context 480 def render(self, context: Context, line: str) -> None: 481 """Renders a model's query, optionally expanding referenced models.""" 482 context.refresh() 483 args = parse_argstring(self.render, line) 484 485 query = context.render( 486 args.model, 487 start=args.start, 488 end=args.end, 489 execution_time=args.execution_time, 490 expand=args.expand, 491 ) 492 493 sql = query.sql(pretty=True, dialect=args.dialect or context.config.dialect) 494 if args.no_format: 495 context.console.log_status_update(sql) 496 else: 497 context.console.show_sql(sql) 498 499 @magic_arguments() 500 @argument( 501 "df_var", 502 default=None, 503 nargs="?", 504 type=str, 505 help="An optional variable name to store the resulting dataframe.", 506 ) 507 @cell_magic 508 @pass_sqlmesh_context 509 def fetchdf(self, context: Context, line: str, sql: str) -> None: 510 """Fetches a dataframe from sql, optionally storing it in a variable.""" 511 args = parse_argstring(self.fetchdf, line) 512 df = context.fetchdf(sql) 513 if args.df_var: 514 self._shell.user_ns[args.df_var] = df 515 self.display(df) 516 517 @magic_arguments() 518 @argument("--file", "-f", type=str, help="An optional file path to write the HTML output to.") 519 @argument( 520 "--select-model", 521 type=str, 522 nargs="*", 523 help="Select specific models to include in the dag.", 524 ) 525 @line_magic 526 @pass_sqlmesh_context 527 def dag(self, context: Context, line: str) -> None: 528 """Displays the HTML DAG.""" 529 args = parse_argstring(self.dag, line) 530 dag = context.get_dag(args.select_model) 531 if args.file: 532 with open(args.file, "w", encoding="utf-8") as file: 533 file.write(str(dag)) 534 # TODO: Have this go through console instead of calling display directly 535 self.display(dag) 536 537 @magic_arguments() 538 @line_magic 539 @pass_sqlmesh_context 540 def migrate(self, context: Context, line: str) -> None: 541 """Migrate SQLMesh to the current running version.""" 542 context.migrate() 543 context.console.log_success("Migration complete") 544 545 @magic_arguments() 546 @line_magic 547 @pass_sqlmesh_context 548 def create_external_models(self, context: Context, line: str) -> None: 549 """Create a schema file containing external model schemas.""" 550 context.create_external_models() 551 552 @magic_arguments() 553 @argument( 554 "source_to_target", 555 type=str, 556 metavar="SOURCE:TARGET", 557 help="Source and target in `SOURCE:TARGET` format", 558 ) 559 @argument( 560 "--on", 561 type=str, 562 nargs="*", 563 help="The column to join on. Can be specified multiple times. The model grain will be used if not specified.", 564 ) 565 @argument( 566 "--model", 567 type=str, 568 help="The model to diff against when source and target are environments and not tables.", 569 ) 570 @argument( 571 "--where", 572 type=str, 573 help="An optional where statement to filter results.", 574 ) 575 @argument( 576 "--limit", 577 type=int, 578 default=20, 579 help="The limit of the sample dataframe.", 580 ) 581 @argument( 582 "--show-sample", 583 action="store_true", 584 help="Show a sample of the rows that differ. With many columns, the output can be very wide.", 585 ) 586 @line_magic 587 @pass_sqlmesh_context 588 def table_diff(self, context: Context, line: str) -> None: 589 """Show the diff between two tables. 590 591 Can either be two tables or two environments and a model. 592 """ 593 args = parse_argstring(self.table_diff, line) 594 source, target = args.source_to_target.split(":") 595 context.table_diff( 596 source=source, 597 target=target, 598 on=args.on, 599 model_or_snapshot=args.model, 600 where=args.where, 601 limit=args.limit, 602 show_sample=args.show_sample, 603 ) 604 605 @magic_arguments() 606 @argument( 607 "model_name", 608 nargs="?", 609 type=str, 610 help="The name of the model to get the table name for.", 611 ) 612 @argument( 613 "--dev", 614 action="store_true", 615 help="Print the name of the snapshot table used for previews in development environments.", 616 ) 617 @line_magic 618 @pass_sqlmesh_context 619 def table_name(self, context: Context, line: str) -> None: 620 """Prints the name of the physical table for the given model.""" 621 args = parse_argstring(self.table_name, line) 622 context.console.log_status_update(context.table_name(args.model_name, args.dev)) 623 624 @magic_arguments() 625 @argument( 626 "--read", 627 type=str, 628 default="", 629 help="The input dialect of the sql string.", 630 ) 631 @argument( 632 "--write", 633 type=str, 634 default="", 635 help="The output dialect of the sql string.", 636 ) 637 @line_cell_magic 638 @pass_sqlmesh_context 639 def rewrite(self, context: Context, line: str, sql: str) -> None: 640 """Rewrite a sql expression with semantic references into an executable query. 641 642 https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/ 643 """ 644 args = parse_argstring(self.rewrite, line) 645 context.console.show_sql( 646 context.rewrite(sql, args.read).sql( 647 dialect=args.write or context.config.dialect, pretty=True 648 ) 649 ) 650 651 @magic_arguments() 652 @argument( 653 "--transpile", 654 "-t", 655 type=str, 656 help="Transpile project models to the specified dialect.", 657 ) 658 @argument( 659 "--append-newline", 660 action="store_true", 661 help="Whether or not to append a newline to the end of the file.", 662 default=None, 663 ) 664 @argument( 665 "--normalize", 666 action="store_true", 667 help="Whether or not to normalize identifiers to lowercase.", 668 default=None, 669 ) 670 @argument( 671 "--pad", 672 type=int, 673 help="Determines the pad size in a formatted string.", 674 ) 675 @argument( 676 "--indent", 677 type=int, 678 help="Determines the indentation size in a formatted string.", 679 ) 680 @argument( 681 "--normalize-functions", 682 type=str, 683 help="Whether or not to normalize all function names. Possible values are: 'upper', 'lower'", 684 ) 685 @argument( 686 "--leading-comma", 687 action="store_true", 688 help="Determines whether or not the comma is leading or trailing in select expressions. Default is trailing.", 689 default=None, 690 ) 691 @argument( 692 "--max-text-width", 693 type=int, 694 help="The max number of characters in a segment before creating new lines in pretty mode.", 695 ) 696 @line_magic 697 @pass_sqlmesh_context 698 def format(self, context: Context, line: str) -> None: 699 """Format all SQL models.""" 700 args = parse_argstring(self.format, line) 701 context.format(**{k: v for k, v in vars(args).items() if v is not None}) 702 703 @magic_arguments() 704 @argument("environment", type=str, help="The environment to diff local state against.") 705 @line_magic 706 @pass_sqlmesh_context 707 def diff(self, context: Context, line: str) -> None: 708 """Show the diff between the local state and the target environment.""" 709 args = parse_argstring(self.diff, line) 710 context.diff(args.environment) 711 712 @magic_arguments() 713 @argument("environment", type=str, help="The environment to invalidate.") 714 @line_magic 715 @pass_sqlmesh_context 716 def invalidate(self, context: Context, line: str) -> None: 717 """Invalidate the target environment, forcing its removal during the next run of the janitor process.""" 718 args = parse_argstring(self.invalidate, line) 719 context.invalidate_environment(args.environment) 720 721 @magic_arguments() 722 @argument("model", type=str) 723 @argument( 724 "--query", 725 "-q", 726 type=str, 727 nargs="+", 728 required=True, 729 help="Queries that will be used to generate data for the model's dependencies.", 730 ) 731 @argument( 732 "--overwrite", 733 "-o", 734 action="store_true", 735 help="When true, the fixture file will be overwritten in case it already exists.", 736 ) 737 @argument( 738 "--var", 739 "-v", 740 type=str, 741 nargs="+", 742 help="Key-value pairs that will define variables needed by the model.", 743 ) 744 @argument( 745 "--path", 746 "-p", 747 type=str, 748 help="The file path corresponding to the fixture, relative to the test directory. " 749 "By default, the fixture will be created under the test directory and the file " 750 "name will be inferred based on the test's name.", 751 ) 752 @argument( 753 "--name", 754 "-n", 755 type=str, 756 help="The name of the test that will be created. By default, it's inferred based on the model's name.", 757 ) 758 @argument( 759 "--include-ctes", 760 action="store_true", 761 help="When true, CTE fixtures will also be generated.", 762 ) 763 @line_magic 764 @pass_sqlmesh_context 765 def create_test(self, context: Context, line: str) -> None: 766 """Generate a unit test fixture for a given model.""" 767 args = parse_argstring(self.create_test, line) 768 queries = iter(args.query) 769 variables = iter(args.var) if args.var else None 770 context.create_test( 771 args.model, 772 input_queries={k: v.strip('"') for k, v in dict(zip(queries, queries)).items()}, 773 overwrite=args.overwrite, 774 variables=dict(zip(variables, variables)) if variables else None, 775 path=args.path, 776 name=args.name, 777 include_ctes=args.include_ctes, 778 ) 779 780 @magic_arguments() 781 @argument("tests", nargs="*", type=str) 782 @argument( 783 "--pattern", 784 "-k", 785 nargs="*", 786 type=str, 787 help="Only run tests that match the pattern of substring.", 788 ) 789 @argument("--verbose", "-v", action="store_true", help="Verbose output.") 790 @argument( 791 "--preserve-fixtures", 792 action="store_true", 793 help="Preserve the fixture tables in the testing database, useful for debugging.", 794 ) 795 @line_magic 796 @pass_sqlmesh_context 797 def run_test(self, context: Context, line: str) -> None: 798 """Run unit test(s).""" 799 args = parse_argstring(self.run_test, line) 800 context.test( 801 match_patterns=args.pattern, 802 tests=args.tests, 803 verbose=args.verbose, 804 preserve_fixtures=args.preserve_fixtures, 805 ) 806 807 @magic_arguments() 808 @argument( 809 "models", type=str, nargs="*", help="A model to audit. Multiple models can be audited." 810 ) 811 @argument("--start", "-s", type=str, help="Start date to audit.") 812 @argument("--end", "-e", type=str, help="End date to audit.") 813 @argument("--execution-time", type=str, help="Execution time.") 814 @line_magic 815 @pass_sqlmesh_context 816 def audit(self, context: Context, line: str) -> None: 817 """Run audit(s)""" 818 args = parse_argstring(self.audit, line) 819 context.audit( 820 models=args.models, start=args.start, end=args.end, execution_time=args.execution_time 821 ) 822 823 @magic_arguments() 824 @line_magic 825 @pass_sqlmesh_context 826 def info(self, context: Context, line: str) -> None: 827 """Display SQLMesh project information.""" 828 context.print_info() 829 830 @magic_arguments() 831 @line_magic 832 @pass_sqlmesh_context 833 def rollback(self, context: Context, line: str) -> None: 834 """Rollback SQLMesh to the previous migration.""" 835 context.rollback() 836 837 @magic_arguments() 838 @line_magic 839 @pass_sqlmesh_context 840 def clean(self, context: Context, line: str) -> None: 841 """Clears the SQLMesh cache and any build artifacts.""" 842 context.clear_caches() 843 context.console.log_success("SQLMesh cache and build artifacts cleared")
Base class for implementing magic functions.
Shell functions which can be reached as %function_name. All magic
functions should accept a string, which they can parse for their own
needs. This can make some functions easier to type, eg %cd ../
vs. %cd("../")
Classes providing magic functions need to subclass this class, and they MUST:
Use the method decorators
@line_magic
and@cell_magic
to decorate individual methods as magic functions, ANDUse the class decorator
@magics_class
to ensure that the magic methods are properly registered at the instance level upon instance initialization.
See magic_functions
for examples of actual implementation classes.
79 @magic_arguments() 80 @argument( 81 "paths", 82 type=str, 83 nargs="+", 84 default="", 85 help="The path(s) to the SQLMesh project(s).", 86 ) 87 @argument( 88 "--config", 89 type=str, 90 help="Name of the config object. Only applicable to configuration defined using Python script.", 91 ) 92 @argument("--gateway", type=str, help="The name of the gateway.") 93 @argument("--ignore-warnings", action="store_true", help="Ignore warnings.") 94 @argument("--debug", action="store_true", help="Enable debug mode.") 95 @argument("--log-file-dir", type=str, help="The directory to write the log file to.") 96 @line_magic 97 def context(self, line: str) -> None: 98 """Sets the context in the user namespace.""" 99 from sqlmesh import configure_logging 100 101 args = parse_argstring(self.context, line) 102 configs = load_configs(args.config, Context.CONFIG_TYPE, args.paths) 103 log_limit = list(configs.values())[0].log_limit 104 configure_logging( 105 args.debug, args.ignore_warnings, log_limit=log_limit, log_file_dir=args.log_file_dir 106 ) 107 try: 108 context = Context(paths=args.paths, config=configs, gateway=args.gateway) 109 self._shell.user_ns["context"] = context 110 except Exception: 111 if args.debug: 112 logger.exception("Failed to initialize SQLMesh context") 113 raise 114 context.console.log_success(f"SQLMesh project context set to: {', '.join(args.paths)}")
Sets the context in the user namespace.
116 @magic_arguments() 117 @argument("path", type=str, help="The path where the new SQLMesh project should be created.") 118 @argument( 119 "sql_dialect", 120 type=str, 121 help=f"Default model SQL dialect. Supported values: {sqlglot_dialects()}.", 122 ) 123 @argument( 124 "--template", 125 "-t", 126 type=str, 127 help="Project template. Supported values: airflow, dbt, default, empty.", 128 ) 129 @line_magic 130 def init(self, line: str) -> None: 131 """Creates a SQLMesh project scaffold with a default SQL dialect.""" 132 args = parse_argstring(self.init, line) 133 try: 134 project_template = ProjectTemplate( 135 args.template.lower() if args.template else "default" 136 ) 137 except ValueError: 138 raise MagicError(f"Invalid project template '{args.template}'") 139 init_example_project(args.path, args.sql_dialect, project_template) 140 html = str( 141 h( 142 "div", 143 h( 144 "span", 145 {"style": {"color": "green", "font-weight": "bold"}}, 146 "SQLMesh project scaffold created", 147 ), 148 ) 149 ) 150 self.display(JupyterRenderable(html=html, text=""))
Creates a SQLMesh project scaffold with a default SQL dialect.
152 @magic_arguments() 153 @argument("model", type=str, help="The model.") 154 @argument("--start", "-s", type=str, help="Start date to render.") 155 @argument("--end", "-e", type=str, help="End date to render.") 156 @argument("--execution-time", type=str, help="Execution time.") 157 @argument("--dialect", "-d", type=str, help="The rendered dialect.") 158 @line_cell_magic 159 @pass_sqlmesh_context 160 def model(self, context: Context, line: str, sql: t.Optional[str] = None) -> None: 161 """Renders the model and automatically fills in an editable cell with the model definition.""" 162 args = parse_argstring(self.model, line) 163 164 model = context.get_model(args.model, raise_if_missing=True) 165 config = context.config_for_node(model) 166 167 if sql: 168 expressions = parse(sql, default_dialect=config.dialect) 169 loaded = load_sql_based_model( 170 expressions, 171 macros=context._macros, 172 jinja_macros=context._jinja_macros, 173 path=model._path, 174 dialect=config.dialect, 175 time_column_format=config.time_column_format, 176 physical_schema_override=context.config.physical_schema_override, 177 default_catalog=context.default_catalog, 178 ) 179 180 if loaded.name == args.model: 181 model = loaded 182 else: 183 with open(model._path, "r", encoding="utf-8") as file: 184 expressions = parse(file.read(), default_dialect=config.dialect) 185 186 formatted = format_model_expressions( 187 expressions, model.dialect, **config.format.generator_options 188 ) 189 190 self._shell.set_next_input( 191 "\n".join( 192 [ 193 " ".join(["%%model", line]), 194 formatted, 195 ] 196 ), 197 replace=True, 198 ) 199 200 with open(model._path, "w", encoding="utf-8") as file: 201 file.write(formatted) 202 203 if sql: 204 context.console.log_success(f"Model `{args.model}` updated") 205 206 context.upsert_model(model) 207 context.console.show_sql( 208 context.render( 209 model.name, 210 start=args.start, 211 end=args.end, 212 execution_time=args.execution_time, 213 ).sql(pretty=True, dialect=args.dialect or model.dialect) 214 )
Renders the model and automatically fills in an editable cell with the model definition.
216 @magic_arguments() 217 @argument("model", type=str, help="The model.") 218 @argument("test_name", type=str, nargs="?", default=None, help="The test name to display") 219 @argument("--ls", action="store_true", help="List tests associated with a model") 220 @line_cell_magic 221 @pass_sqlmesh_context 222 def test(self, context: Context, line: str, test_def_raw: t.Optional[str] = None) -> None: 223 """Allow the user to list tests for a model, output a specific test, and then write their changes back""" 224 args = parse_argstring(self.test, line) 225 if not args.test_name and not args.ls: 226 raise MagicError("Must provide either test name or `--ls` to list tests") 227 228 test_meta = [] 229 230 for path, config in context.configs.items(): 231 test_meta.extend( 232 get_all_model_tests( 233 path / c.TESTS, 234 ignore_patterns=config.ignore_patterns, 235 ) 236 ) 237 238 tests: t.Dict[str, t.Dict[str, ModelTestMetadata]] = defaultdict(dict) 239 for model_test_metadata in test_meta: 240 model = model_test_metadata.body.get("model") 241 if not model: 242 context.console.log_error( 243 f"Test found that does not have `model` defined: {model_test_metadata.path}" 244 ) 245 else: 246 tests[model][model_test_metadata.test_name] = model_test_metadata 247 248 model = context.get_model(args.model, raise_if_missing=True) 249 250 if args.ls: 251 # TODO: Provide better UI for displaying tests 252 for test_name in tests[model.name]: 253 context.console.log_status_update(test_name) 254 return 255 256 test = tests[model.name][args.test_name] 257 test_def = yaml.load(test_def_raw) if test_def_raw else test.body 258 test_def_output = yaml.dump(test_def) 259 260 self._shell.set_next_input( 261 "\n".join( 262 [ 263 " ".join(["%%test", line]), 264 test_def_output, 265 ] 266 ), 267 replace=True, 268 ) 269 270 with open(test.path, "r+", encoding="utf-8") as file: 271 content = yaml.load(file.read()) 272 content[args.test_name] = test_def 273 file.seek(0) 274 yaml.dump(content, file) 275 file.truncate()
Allow the user to list tests for a model, output a specific test, and then write their changes back
277 @magic_arguments() 278 @argument( 279 "environment", 280 nargs="?", 281 type=str, 282 help="The environment to run the plan against", 283 ) 284 @argument("--start", "-s", type=str, help="Start date to backfill.") 285 @argument("--end", "-e", type=str, help="End date to backfill.") 286 @argument("--execution-time", type=str, help="Execution time.") 287 @argument( 288 "--create-from", 289 type=str, 290 help="The environment to create the target environment from if it doesn't exist. Default: prod.", 291 ) 292 @argument( 293 "--skip-tests", 294 "-t", 295 action="store_true", 296 help="Skip the unit tests defined for the model.", 297 ) 298 @argument( 299 "--restate-model", 300 "-r", 301 type=str, 302 nargs="*", 303 help="Restate data for specified models (and models downstream from the one specified). For production environment, all related model versions will have their intervals wiped, but only the current versions will be backfilled. For development environment, only the current model versions will be affected.", 304 ) 305 @argument( 306 "--no-gaps", 307 "-g", 308 action="store_true", 309 help="Ensure that new snapshots have no data gaps when comparing to existing snapshots for matching models in the target environment.", 310 ) 311 @argument( 312 "--skip-backfill", 313 action="store_true", 314 help="Skip the backfill step.", 315 ) 316 @argument( 317 "--forward-only", 318 action="store_true", 319 help="Create a plan for forward-only changes.", 320 default=None, 321 ) 322 @argument( 323 "--effective-from", 324 type=str, 325 help="The effective date from which to apply forward-only changes on production.", 326 ) 327 @argument( 328 "--no-prompts", 329 action="store_true", 330 help="Disables interactive prompts for the backfill time range. Please note that if this flag is set and there are uncategorized changes, plan creation will fail.", 331 default=None, 332 ) 333 @argument( 334 "--auto-apply", 335 action="store_true", 336 help="Automatically applies the new plan after creation.", 337 default=None, 338 ) 339 @argument( 340 "--no-auto-categorization", 341 action="store_true", 342 help="Disable automatic change categorization.", 343 default=None, 344 ) 345 @argument( 346 "--include-unmodified", 347 action="store_true", 348 help="Include unmodified models in the target environment.", 349 default=None, 350 ) 351 @argument( 352 "--select-model", 353 type=str, 354 nargs="*", 355 help="Select specific model changes that should be included in the plan.", 356 ) 357 @argument( 358 "--backfill-model", 359 type=str, 360 nargs="*", 361 help="Backfill only the models whose names match the expression. This is supported only when targeting a development environment.", 362 ) 363 @argument( 364 "--no-diff", 365 action="store_true", 366 help="Hide text differences for changed models.", 367 default=None, 368 ) 369 @argument( 370 "--run", 371 action="store_true", 372 help="Run latest intervals as part of the plan application (prod environment only).", 373 ) 374 @argument( 375 "--enable-preview", 376 action="store_true", 377 help="Enable preview for forward-only models when targeting a development environment.", 378 default=None, 379 ) 380 @line_magic 381 @pass_sqlmesh_context 382 def plan(self, context: Context, line: str) -> None: 383 """Goes through a set of prompts to both establish a plan and apply it""" 384 args = parse_argstring(self.plan, line) 385 386 context.plan( 387 args.environment, 388 start=args.start, 389 end=args.end, 390 execution_time=args.execution_time, 391 create_from=args.create_from, 392 skip_tests=args.skip_tests, 393 restate_models=args.restate_model, 394 backfill_models=args.backfill_model, 395 no_gaps=args.no_gaps, 396 skip_backfill=args.skip_backfill, 397 forward_only=args.forward_only, 398 no_prompts=args.no_prompts, 399 auto_apply=args.auto_apply, 400 no_auto_categorization=args.no_auto_categorization, 401 effective_from=args.effective_from, 402 include_unmodified=args.include_unmodified, 403 select_models=args.select_model, 404 no_diff=args.no_diff, 405 run=args.run, 406 enable_preview=args.enable_preview, 407 )
Goes through a set of prompts to both establish a plan and apply it
409 @magic_arguments() 410 @argument( 411 "environment", 412 nargs="?", 413 type=str, 414 help="The environment to run against", 415 ) 416 @argument("--start", "-s", type=str, help="Start date to evaluate.") 417 @argument("--end", "-e", type=str, help="End date to evaluate.") 418 @argument("--skip-janitor", action="store_true", help="Skip the janitor task.") 419 @argument( 420 "--ignore-cron", 421 action="store_true", 422 help="Run for all missing intervals, ignoring individual cron schedules.", 423 ) 424 @line_magic 425 @pass_sqlmesh_context 426 def run_dag(self, context: Context, line: str) -> None: 427 """Evaluate the DAG of models using the built-in scheduler.""" 428 args = parse_argstring(self.run_dag, line) 429 430 success = context.run( 431 args.environment, 432 start=args.start, 433 end=args.end, 434 skip_janitor=args.skip_janitor, 435 ignore_cron=args.ignore_cron, 436 ) 437 if not success: 438 raise SQLMeshError("Error Running DAG. Check logs for details.")
Evaluate the DAG of models using the built-in scheduler.
440 @magic_arguments() 441 @argument("model", type=str, help="The model.") 442 @argument("--start", "-s", type=str, help="Start date to render.") 443 @argument("--end", "-e", type=str, help="End date to render.") 444 @argument("--execution-time", type=str, help="Execution time.") 445 @argument( 446 "--limit", 447 type=int, 448 help="The number of rows which the query should be limited to.", 449 ) 450 @line_magic 451 @pass_sqlmesh_context 452 def evaluate(self, context: Context, line: str) -> None: 453 """Evaluate a model query and fetches a dataframe.""" 454 context.refresh() 455 args = parse_argstring(self.evaluate, line) 456 457 df = context.evaluate( 458 args.model, 459 start=args.start, 460 end=args.end, 461 execution_time=args.execution_time, 462 limit=args.limit, 463 ) 464 self.display(df)
Evaluate a model query and fetches a dataframe.
466 @magic_arguments() 467 @argument("model", type=str, help="The model.") 468 @argument("--start", "-s", type=str, help="Start date to render.") 469 @argument("--end", "-e", type=str, help="End date to render.") 470 @argument("--execution-time", type=str, help="Execution time.") 471 @argument( 472 "--expand", 473 type=t.Union[bool, t.Iterable[str]], 474 help="Whether or not to use expand materialized models, defaults to False. If True, all referenced models are expanded as raw queries. If a list, only referenced models are expanded as raw queries.", 475 ) 476 @argument("--dialect", type=str, help="SQL dialect to render.") 477 @argument("--no-format", action="store_true", help="Disable fancy formatting of the query.") 478 @line_magic 479 @pass_sqlmesh_context 480 def render(self, context: Context, line: str) -> None: 481 """Renders a model's query, optionally expanding referenced models.""" 482 context.refresh() 483 args = parse_argstring(self.render, line) 484 485 query = context.render( 486 args.model, 487 start=args.start, 488 end=args.end, 489 execution_time=args.execution_time, 490 expand=args.expand, 491 ) 492 493 sql = query.sql(pretty=True, dialect=args.dialect or context.config.dialect) 494 if args.no_format: 495 context.console.log_status_update(sql) 496 else: 497 context.console.show_sql(sql)
Renders a model's query, optionally expanding referenced models.
499 @magic_arguments() 500 @argument( 501 "df_var", 502 default=None, 503 nargs="?", 504 type=str, 505 help="An optional variable name to store the resulting dataframe.", 506 ) 507 @cell_magic 508 @pass_sqlmesh_context 509 def fetchdf(self, context: Context, line: str, sql: str) -> None: 510 """Fetches a dataframe from sql, optionally storing it in a variable.""" 511 args = parse_argstring(self.fetchdf, line) 512 df = context.fetchdf(sql) 513 if args.df_var: 514 self._shell.user_ns[args.df_var] = df 515 self.display(df)
Fetches a dataframe from sql, optionally storing it in a variable.
517 @magic_arguments() 518 @argument("--file", "-f", type=str, help="An optional file path to write the HTML output to.") 519 @argument( 520 "--select-model", 521 type=str, 522 nargs="*", 523 help="Select specific models to include in the dag.", 524 ) 525 @line_magic 526 @pass_sqlmesh_context 527 def dag(self, context: Context, line: str) -> None: 528 """Displays the HTML DAG.""" 529 args = parse_argstring(self.dag, line) 530 dag = context.get_dag(args.select_model) 531 if args.file: 532 with open(args.file, "w", encoding="utf-8") as file: 533 file.write(str(dag)) 534 # TODO: Have this go through console instead of calling display directly 535 self.display(dag)
Displays the HTML DAG.
537 @magic_arguments() 538 @line_magic 539 @pass_sqlmesh_context 540 def migrate(self, context: Context, line: str) -> None: 541 """Migrate SQLMesh to the current running version.""" 542 context.migrate() 543 context.console.log_success("Migration complete")
Migrate SQLMesh to the current running version.
545 @magic_arguments() 546 @line_magic 547 @pass_sqlmesh_context 548 def create_external_models(self, context: Context, line: str) -> None: 549 """Create a schema file containing external model schemas.""" 550 context.create_external_models()
Create a schema file containing external model schemas.
552 @magic_arguments() 553 @argument( 554 "source_to_target", 555 type=str, 556 metavar="SOURCE:TARGET", 557 help="Source and target in `SOURCE:TARGET` format", 558 ) 559 @argument( 560 "--on", 561 type=str, 562 nargs="*", 563 help="The column to join on. Can be specified multiple times. The model grain will be used if not specified.", 564 ) 565 @argument( 566 "--model", 567 type=str, 568 help="The model to diff against when source and target are environments and not tables.", 569 ) 570 @argument( 571 "--where", 572 type=str, 573 help="An optional where statement to filter results.", 574 ) 575 @argument( 576 "--limit", 577 type=int, 578 default=20, 579 help="The limit of the sample dataframe.", 580 ) 581 @argument( 582 "--show-sample", 583 action="store_true", 584 help="Show a sample of the rows that differ. With many columns, the output can be very wide.", 585 ) 586 @line_magic 587 @pass_sqlmesh_context 588 def table_diff(self, context: Context, line: str) -> None: 589 """Show the diff between two tables. 590 591 Can either be two tables or two environments and a model. 592 """ 593 args = parse_argstring(self.table_diff, line) 594 source, target = args.source_to_target.split(":") 595 context.table_diff( 596 source=source, 597 target=target, 598 on=args.on, 599 model_or_snapshot=args.model, 600 where=args.where, 601 limit=args.limit, 602 show_sample=args.show_sample, 603 )
Show the diff between two tables.
Can either be two tables or two environments and a model.
605 @magic_arguments() 606 @argument( 607 "model_name", 608 nargs="?", 609 type=str, 610 help="The name of the model to get the table name for.", 611 ) 612 @argument( 613 "--dev", 614 action="store_true", 615 help="Print the name of the snapshot table used for previews in development environments.", 616 ) 617 @line_magic 618 @pass_sqlmesh_context 619 def table_name(self, context: Context, line: str) -> None: 620 """Prints the name of the physical table for the given model.""" 621 args = parse_argstring(self.table_name, line) 622 context.console.log_status_update(context.table_name(args.model_name, args.dev))
Prints the name of the physical table for the given model.
624 @magic_arguments() 625 @argument( 626 "--read", 627 type=str, 628 default="", 629 help="The input dialect of the sql string.", 630 ) 631 @argument( 632 "--write", 633 type=str, 634 default="", 635 help="The output dialect of the sql string.", 636 ) 637 @line_cell_magic 638 @pass_sqlmesh_context 639 def rewrite(self, context: Context, line: str, sql: str) -> None: 640 """Rewrite a sql expression with semantic references into an executable query. 641 642 https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/ 643 """ 644 args = parse_argstring(self.rewrite, line) 645 context.console.show_sql( 646 context.rewrite(sql, args.read).sql( 647 dialect=args.write or context.config.dialect, pretty=True 648 ) 649 )
Rewrite a sql expression with semantic references into an executable query.
https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/
651 @magic_arguments() 652 @argument( 653 "--transpile", 654 "-t", 655 type=str, 656 help="Transpile project models to the specified dialect.", 657 ) 658 @argument( 659 "--append-newline", 660 action="store_true", 661 help="Whether or not to append a newline to the end of the file.", 662 default=None, 663 ) 664 @argument( 665 "--normalize", 666 action="store_true", 667 help="Whether or not to normalize identifiers to lowercase.", 668 default=None, 669 ) 670 @argument( 671 "--pad", 672 type=int, 673 help="Determines the pad size in a formatted string.", 674 ) 675 @argument( 676 "--indent", 677 type=int, 678 help="Determines the indentation size in a formatted string.", 679 ) 680 @argument( 681 "--normalize-functions", 682 type=str, 683 help="Whether or not to normalize all function names. Possible values are: 'upper', 'lower'", 684 ) 685 @argument( 686 "--leading-comma", 687 action="store_true", 688 help="Determines whether or not the comma is leading or trailing in select expressions. Default is trailing.", 689 default=None, 690 ) 691 @argument( 692 "--max-text-width", 693 type=int, 694 help="The max number of characters in a segment before creating new lines in pretty mode.", 695 ) 696 @line_magic 697 @pass_sqlmesh_context 698 def format(self, context: Context, line: str) -> None: 699 """Format all SQL models.""" 700 args = parse_argstring(self.format, line) 701 context.format(**{k: v for k, v in vars(args).items() if v is not None})
Format all SQL models.
703 @magic_arguments() 704 @argument("environment", type=str, help="The environment to diff local state against.") 705 @line_magic 706 @pass_sqlmesh_context 707 def diff(self, context: Context, line: str) -> None: 708 """Show the diff between the local state and the target environment.""" 709 args = parse_argstring(self.diff, line) 710 context.diff(args.environment)
Show the diff between the local state and the target environment.
712 @magic_arguments() 713 @argument("environment", type=str, help="The environment to invalidate.") 714 @line_magic 715 @pass_sqlmesh_context 716 def invalidate(self, context: Context, line: str) -> None: 717 """Invalidate the target environment, forcing its removal during the next run of the janitor process.""" 718 args = parse_argstring(self.invalidate, line) 719 context.invalidate_environment(args.environment)
Invalidate the target environment, forcing its removal during the next run of the janitor process.
721 @magic_arguments() 722 @argument("model", type=str) 723 @argument( 724 "--query", 725 "-q", 726 type=str, 727 nargs="+", 728 required=True, 729 help="Queries that will be used to generate data for the model's dependencies.", 730 ) 731 @argument( 732 "--overwrite", 733 "-o", 734 action="store_true", 735 help="When true, the fixture file will be overwritten in case it already exists.", 736 ) 737 @argument( 738 "--var", 739 "-v", 740 type=str, 741 nargs="+", 742 help="Key-value pairs that will define variables needed by the model.", 743 ) 744 @argument( 745 "--path", 746 "-p", 747 type=str, 748 help="The file path corresponding to the fixture, relative to the test directory. " 749 "By default, the fixture will be created under the test directory and the file " 750 "name will be inferred based on the test's name.", 751 ) 752 @argument( 753 "--name", 754 "-n", 755 type=str, 756 help="The name of the test that will be created. By default, it's inferred based on the model's name.", 757 ) 758 @argument( 759 "--include-ctes", 760 action="store_true", 761 help="When true, CTE fixtures will also be generated.", 762 ) 763 @line_magic 764 @pass_sqlmesh_context 765 def create_test(self, context: Context, line: str) -> None: 766 """Generate a unit test fixture for a given model.""" 767 args = parse_argstring(self.create_test, line) 768 queries = iter(args.query) 769 variables = iter(args.var) if args.var else None 770 context.create_test( 771 args.model, 772 input_queries={k: v.strip('"') for k, v in dict(zip(queries, queries)).items()}, 773 overwrite=args.overwrite, 774 variables=dict(zip(variables, variables)) if variables else None, 775 path=args.path, 776 name=args.name, 777 include_ctes=args.include_ctes, 778 )
Generate a unit test fixture for a given model.
780 @magic_arguments() 781 @argument("tests", nargs="*", type=str) 782 @argument( 783 "--pattern", 784 "-k", 785 nargs="*", 786 type=str, 787 help="Only run tests that match the pattern of substring.", 788 ) 789 @argument("--verbose", "-v", action="store_true", help="Verbose output.") 790 @argument( 791 "--preserve-fixtures", 792 action="store_true", 793 help="Preserve the fixture tables in the testing database, useful for debugging.", 794 ) 795 @line_magic 796 @pass_sqlmesh_context 797 def run_test(self, context: Context, line: str) -> None: 798 """Run unit test(s).""" 799 args = parse_argstring(self.run_test, line) 800 context.test( 801 match_patterns=args.pattern, 802 tests=args.tests, 803 verbose=args.verbose, 804 preserve_fixtures=args.preserve_fixtures, 805 )
Run unit test(s).
807 @magic_arguments() 808 @argument( 809 "models", type=str, nargs="*", help="A model to audit. Multiple models can be audited." 810 ) 811 @argument("--start", "-s", type=str, help="Start date to audit.") 812 @argument("--end", "-e", type=str, help="End date to audit.") 813 @argument("--execution-time", type=str, help="Execution time.") 814 @line_magic 815 @pass_sqlmesh_context 816 def audit(self, context: Context, line: str) -> None: 817 """Run audit(s)""" 818 args = parse_argstring(self.audit, line) 819 context.audit( 820 models=args.models, start=args.start, end=args.end, execution_time=args.execution_time 821 )
Run audit(s)
823 @magic_arguments() 824 @line_magic 825 @pass_sqlmesh_context 826 def info(self, context: Context, line: str) -> None: 827 """Display SQLMesh project information.""" 828 context.print_info()
Display SQLMesh project information.
830 @magic_arguments() 831 @line_magic 832 @pass_sqlmesh_context 833 def rollback(self, context: Context, line: str) -> None: 834 """Rollback SQLMesh to the previous migration.""" 835 context.rollback()
Rollback SQLMesh to the previous migration.
837 @magic_arguments() 838 @line_magic 839 @pass_sqlmesh_context 840 def clean(self, context: Context, line: str) -> None: 841 """Clears the SQLMesh cache and any build artifacts.""" 842 context.clear_caches() 843 context.console.log_success("SQLMesh cache and build artifacts cleared")
Clears the SQLMesh cache and any build artifacts.
Inherited Members
- IPython.core.magic.Magics
- Magics
- arg_err
- format_latex
- parse_options
- default_option
- traitlets.config.configurable.Configurable
- config
- parent
- section_names
- update_config
- class_get_help
- class_get_trait_help
- class_print_help
- class_config_section
- class_config_rst_doc
- traitlets.traitlets.HasTraits
- setup_instance
- cross_validation_lock
- hold_trait_notifications
- notify_change
- on_trait_change
- observe
- unobserve
- unobserve_all
- add_traits
- set_trait
- class_trait_names
- class_traits
- class_own_traits
- has_trait
- trait_has_value
- trait_values
- trait_defaults
- trait_names
- traits
- trait_metadata
- class_own_trait_events
- trait_events