Edit on GitHub

Context

A SQLMesh context encapsulates a SQLMesh environment. When you create a new context, it will discover and load your project's models, macros, and audits. Afterwards, you can use the context to create and apply plans, visualize your model's lineage, run your audits and model tests, and perform various other tasks. For more information regarding what a context can do, see sqlmesh.core.context.Context.

Examples:

Creating and applying a plan against the staging environment.

from sqlmesh.core.context import Context
context = Context(path="example", config="local_config")
plan = context.plan("staging")
context.apply(plan)

Running audits on your data.

from sqlmesh.core.context import Context
context = Context(path="example", config="local_config")
context.audit("yesterday", "now")

Running tests on your models.

from sqlmesh.core.context import Context
context = Context(path="example")
context.test()
   1"""
   2# Context
   3
   4A SQLMesh context encapsulates a SQLMesh environment. When you create a new context, it will discover and
   5load your project's models, macros, and audits. Afterwards, you can use the context to create and apply
   6plans, visualize your model's lineage, run your audits and model tests, and perform various other tasks.
   7For more information regarding what a context can do, see `sqlmesh.core.context.Context`.
   8
   9# Examples:
  10
  11Creating and applying a plan against the staging environment.
  12```python
  13from sqlmesh.core.context import Context
  14context = Context(path="example", config="local_config")
  15plan = context.plan("staging")
  16context.apply(plan)
  17```
  18
  19Running audits on your data.
  20```python
  21from sqlmesh.core.context import Context
  22context = Context(path="example", config="local_config")
  23context.audit("yesterday", "now")
  24```
  25
  26Running tests on your models.
  27```python
  28from sqlmesh.core.context import Context
  29context = Context(path="example")
  30context.test()
  31```
  32"""
  33
  34from __future__ import annotations
  35
  36import abc
  37import collections
  38import gc
  39import logging
  40import time
  41import traceback
  42import typing as t
  43import unittest.result
  44from datetime import timedelta
  45from functools import cached_property
  46from io import StringIO
  47from pathlib import Path
  48from shutil import rmtree
  49from types import MappingProxyType
  50
  51import pandas as pd
  52from sqlglot import exp
  53from sqlglot.lineage import GraphHTML
  54
  55from sqlmesh.core import constants as c
  56from sqlmesh.core.audit import Audit, StandaloneAudit
  57from sqlmesh.core.config import CategorizerConfig, Config, load_configs
  58from sqlmesh.core.config.loader import C
  59from sqlmesh.core.console import Console, get_console
  60from sqlmesh.core.context_diff import ContextDiff
  61from sqlmesh.core.dialect import (
  62    format_model_expressions,
  63    normalize_model_name,
  64    pandas_to_sql,
  65    parse,
  66    parse_one,
  67)
  68from sqlmesh.core.engine_adapter import EngineAdapter
  69from sqlmesh.core.environment import Environment, EnvironmentNamingInfo
  70from sqlmesh.core.loader import Loader, update_model_schemas
  71from sqlmesh.core.macros import ExecutableOrMacro, macro
  72from sqlmesh.core.metric import Metric, rewrite
  73from sqlmesh.core.model import Model
  74from sqlmesh.core.notification_target import (
  75    NotificationEvent,
  76    NotificationTarget,
  77    NotificationTargetManager,
  78)
  79from sqlmesh.core.plan import Plan, PlanBuilder
  80from sqlmesh.core.reference import ReferenceGraph
  81from sqlmesh.core.scheduler import Scheduler
  82from sqlmesh.core.schema_loader import create_schema_file
  83from sqlmesh.core.selector import Selector
  84from sqlmesh.core.snapshot import (
  85    DeployabilityIndex,
  86    Snapshot,
  87    SnapshotEvaluator,
  88    SnapshotFingerprint,
  89    to_table_mapping,
  90)
  91from sqlmesh.core.state_sync import (
  92    CachingStateSync,
  93    StateReader,
  94    StateSync,
  95    cleanup_expired_views,
  96)
  97from sqlmesh.core.table_diff import TableDiff
  98from sqlmesh.core.test import (
  99    ModelTextTestResult,
 100    generate_test,
 101    get_all_model_tests,
 102    run_model_tests,
 103    run_tests,
 104)
 105from sqlmesh.core.user import User
 106from sqlmesh.utils import UniqueKeyDict, sys_path
 107from sqlmesh.utils.dag import DAG
 108from sqlmesh.utils.date import TimeLike, now_ds, to_date
 109from sqlmesh.utils.errors import (
 110    CircuitBreakerError,
 111    ConfigError,
 112    PlanError,
 113    SQLMeshError,
 114    UncategorizedPlanError,
 115)
 116from sqlmesh.utils.jinja import JinjaMacroRegistry
 117
 118if t.TYPE_CHECKING:
 119    from typing_extensions import Literal
 120
 121    from sqlmesh.core.engine_adapter._typing import DF, PySparkDataFrame, PySparkSession
 122    from sqlmesh.core.snapshot import Node
 123
 124    ModelOrSnapshot = t.Union[str, Model, Snapshot]
 125    NodeOrSnapshot = t.Union[str, Model, StandaloneAudit, Snapshot]
 126
 127logger = logging.getLogger(__name__)
 128
 129
 130class BaseContext(abc.ABC):
 131    """The base context which defines methods to execute a model."""
 132
 133    @property
 134    @abc.abstractmethod
 135    def default_dialect(self) -> t.Optional[str]:
 136        """Returns the default dialect."""
 137
 138    @property
 139    @abc.abstractmethod
 140    def _model_tables(self) -> t.Dict[str, str]:
 141        """Returns a mapping of model names to tables."""
 142
 143    @property
 144    @abc.abstractmethod
 145    def engine_adapter(self) -> EngineAdapter:
 146        """Returns an engine adapter."""
 147
 148    @property
 149    def spark(self) -> t.Optional[PySparkSession]:
 150        """Returns the spark session if it exists."""
 151        return self.engine_adapter.spark
 152
 153    @property
 154    def default_catalog(self) -> t.Optional[str]:
 155        raise NotImplementedError
 156
 157    def table(self, model_name: str) -> str:
 158        """Gets the physical table name for a given model.
 159
 160        Args:
 161            model_name: The model name.
 162
 163        Returns:
 164            The physical table name.
 165        """
 166        model_name = normalize_model_name(model_name, self.default_catalog, self.default_dialect)
 167
 168        # We generate SQL for the default dialect because the table name may be used in a
 169        # fetchdf call and so the quotes need to be correct (eg. backticks for bigquery)
 170        return parse_one(self._model_tables[model_name]).sql(dialect=self.default_dialect)
 171
 172    def fetchdf(
 173        self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False
 174    ) -> pd.DataFrame:
 175        """Fetches a dataframe given a sql string or sqlglot expression.
 176
 177        Args:
 178            query: SQL string or sqlglot expression.
 179            quote_identifiers: Whether to quote all identifiers in the query.
 180
 181        Returns:
 182            The default dataframe is Pandas, but for Spark a PySpark dataframe is returned.
 183        """
 184        return self.engine_adapter.fetchdf(query, quote_identifiers=quote_identifiers)
 185
 186    def fetch_pyspark_df(
 187        self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False
 188    ) -> PySparkDataFrame:
 189        """Fetches a PySpark dataframe given a sql string or sqlglot expression.
 190
 191        Args:
 192            query: SQL string or sqlglot expression.
 193            quote_identifiers: Whether to quote all identifiers in the query.
 194
 195        Returns:
 196            A PySpark dataframe.
 197        """
 198        return self.engine_adapter.fetch_pyspark_df(query, quote_identifiers=quote_identifiers)
 199
 200
 201class ExecutionContext(BaseContext):
 202    """The minimal context needed to execute a model.
 203
 204    Args:
 205        engine_adapter: The engine adapter to execute queries against.
 206        snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations.
 207        deployability_index: Determines snapshots that are deployable in the context of this evaluation.
 208    """
 209
 210    def __init__(
 211        self,
 212        engine_adapter: EngineAdapter,
 213        snapshots: t.Dict[str, Snapshot],
 214        deployability_index: t.Optional[DeployabilityIndex] = None,
 215        default_dialect: t.Optional[str] = None,
 216        default_catalog: t.Optional[str] = None,
 217        variables: t.Optional[t.Dict[str, t.Any]] = None,
 218    ):
 219        self.snapshots = snapshots
 220        self.deployability_index = deployability_index
 221        self._engine_adapter = engine_adapter
 222        self._default_catalog = default_catalog
 223        self._default_dialect = default_dialect
 224        self._variables = variables or {}
 225
 226    @property
 227    def default_dialect(self) -> t.Optional[str]:
 228        return self._default_dialect
 229
 230    @property
 231    def engine_adapter(self) -> EngineAdapter:
 232        """Returns an engine adapter."""
 233        return self._engine_adapter
 234
 235    @cached_property
 236    def _model_tables(self) -> t.Dict[str, str]:
 237        """Returns a mapping of model names to tables."""
 238        return to_table_mapping(self.snapshots.values(), self.deployability_index)
 239
 240    @property
 241    def default_catalog(self) -> t.Optional[str]:
 242        return self._default_catalog
 243
 244    @property
 245    def gateway(self) -> t.Optional[str]:
 246        """Returns the gateway name."""
 247        return self.var(c.GATEWAY)
 248
 249    def var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]:
 250        """Returns a variable value."""
 251        return self._variables.get(var_name.lower(), default)
 252
 253    def with_variables(self, variables: t.Dict[str, t.Any]) -> ExecutionContext:
 254        """Returns a new ExecutionContext with additional variables."""
 255        return ExecutionContext(
 256            self._engine_adapter,
 257            self.snapshots,
 258            self.deployability_index,
 259            self._default_dialect,
 260            self._default_catalog,
 261            variables=variables,
 262        )
 263
 264
 265class GenericContext(BaseContext, t.Generic[C]):
 266    """Encapsulates a SQLMesh environment supplying convenient functions to perform various tasks.
 267
 268    Args:
 269        engine_adapter: The default engine adapter to use.
 270        notification_targets: The notification target to use. Defaults to what is defined in config.
 271        paths: The directories containing SQLMesh files.
 272        config: A Config object or the name of a Config object in config.py.
 273        connection: The name of the connection. If not specified the first connection as it appears
 274            in configuration will be used.
 275        test_connection: The name of the connection to use for tests. If not specified the first
 276            connection as it appears in configuration will be used.
 277        concurrent_tasks: The maximum number of tasks that can use the connection concurrently.
 278        load: Whether or not to automatically load all models and macros (default True).
 279        console: The rich instance used for printing out CLI command results.
 280        users: A list of users to make known to SQLMesh.
 281        config_type: The type of config object to use (default Config).
 282    """
 283
 284    CONFIG_TYPE: t.Type[C]
 285
 286    def __init__(
 287        self,
 288        engine_adapter: t.Optional[EngineAdapter] = None,
 289        notification_targets: t.Optional[t.List[NotificationTarget]] = None,
 290        state_sync: t.Optional[StateSync] = None,
 291        paths: t.Union[str | Path, t.Iterable[str | Path]] = "",
 292        config: t.Optional[t.Union[C, str, t.Dict[Path, C]]] = None,
 293        gateway: t.Optional[str] = None,
 294        concurrent_tasks: t.Optional[int] = None,
 295        loader: t.Optional[t.Type[Loader]] = None,
 296        load: bool = True,
 297        console: t.Optional[Console] = None,
 298        users: t.Optional[t.List[User]] = None,
 299    ):
 300        self.console = console or get_console()
 301        self.configs = (
 302            config if isinstance(config, dict) else load_configs(config, self.CONFIG_TYPE, paths)
 303        )
 304        self.dag: DAG[str] = DAG()
 305        self._models: UniqueKeyDict[str, Model] = UniqueKeyDict("models")
 306        self._audits: UniqueKeyDict[str, Audit] = UniqueKeyDict("audits")
 307        self._standalone_audits: UniqueKeyDict[str, StandaloneAudit] = UniqueKeyDict(
 308            "standaloneaudits"
 309        )
 310        self._macros: UniqueKeyDict[str, ExecutableOrMacro] = UniqueKeyDict("macros")
 311        self._metrics: UniqueKeyDict[str, Metric] = UniqueKeyDict("metrics")
 312        self._jinja_macros = JinjaMacroRegistry()
 313        self._default_catalog: t.Optional[str] = None
 314
 315        self.path, self.config = t.cast(t.Tuple[Path, C], next(iter(self.configs.items())))
 316
 317        self.gateway = gateway
 318        self._scheduler = self.config.get_scheduler(self.gateway)
 319        self.environment_ttl = self.config.environment_ttl
 320        self.pinned_environments = Environment.normalize_names(self.config.pinned_environments)
 321        self.auto_categorize_changes = self.config.plan.auto_categorize_changes
 322
 323        self._connection_config = self.config.get_connection(self.gateway)
 324        self.concurrent_tasks = concurrent_tasks or self._connection_config.concurrent_tasks
 325        self._engine_adapter = engine_adapter or self._connection_config.create_engine_adapter()
 326
 327        self._test_connection_config = self.config.get_test_connection(
 328            self.gateway, self.default_catalog, default_catalog_dialect=self.engine_adapter.DIALECT
 329        )
 330
 331        self._snapshot_evaluator: t.Optional[SnapshotEvaluator] = None
 332
 333        self._provided_state_sync: t.Optional[StateSync] = state_sync
 334        self._state_sync: t.Optional[StateSync] = None
 335
 336        self._loader = (loader or self.config.loader)(**self.config.loader_kwargs)
 337
 338        # Should we dedupe notification_targets? If so how?
 339        self.notification_targets = (notification_targets or []) + self.config.notification_targets
 340        self.users = (users or []) + self.config.users
 341        self.users = list({user.username: user for user in self.users}.values())
 342        self._register_notification_targets()
 343
 344        if (
 345            self.config.environment_catalog_mapping
 346            and not self.engine_adapter.CATALOG_SUPPORT.is_multi_catalog_supported
 347        ):
 348            raise SQLMeshError(
 349                "Environment catalog mapping is only supported for engine adapters that support multiple catalogs"
 350            )
 351
 352        if load:
 353            self.load()
 354
 355    @property
 356    def default_dialect(self) -> t.Optional[str]:
 357        return self.config.dialect
 358
 359    @property
 360    def engine_adapter(self) -> EngineAdapter:
 361        """Returns an engine adapter."""
 362        return self._engine_adapter
 363
 364    @property
 365    def snapshot_evaluator(self) -> SnapshotEvaluator:
 366        if not self._snapshot_evaluator:
 367            self._snapshot_evaluator = SnapshotEvaluator(
 368                self.engine_adapter.with_log_level(logging.INFO),
 369                ddl_concurrent_tasks=self.concurrent_tasks,
 370            )
 371        return self._snapshot_evaluator
 372
 373    def execution_context(
 374        self, deployability_index: t.Optional[DeployabilityIndex] = None
 375    ) -> ExecutionContext:
 376        """Returns an execution context."""
 377        return ExecutionContext(
 378            engine_adapter=self._engine_adapter,
 379            snapshots=self.snapshots,
 380            deployability_index=deployability_index,
 381            default_dialect=self.default_dialect,
 382            default_catalog=self.default_catalog,
 383        )
 384
 385    def upsert_model(self, model: t.Union[str, Model], **kwargs: t.Any) -> Model:
 386        """Update or insert a model.
 387
 388        The context's models dictionary will be updated to include these changes.
 389
 390        Args:
 391            model: Model name or instance to update.
 392            kwargs: The kwargs to update the model with.
 393
 394        Returns:
 395            A new instance of the updated or inserted model.
 396        """
 397        model = self.get_model(model, raise_if_missing=True)
 398        path = model._path
 399
 400        # model.copy() can't be used here due to a cached state that can be a part of a model instance.
 401        model = t.cast(Model, type(model)(**{**t.cast(Model, model).dict(), **kwargs}))
 402        model._path = path
 403
 404        self._models.update({model.fqn: model})
 405        self.dag.add(model.fqn, model.depends_on)
 406        update_model_schemas(
 407            self.dag,
 408            self._models,
 409            self.path,
 410        )
 411
 412        model.validate_definition()
 413
 414        return model
 415
 416    def scheduler(self, environment: t.Optional[str] = None) -> Scheduler:
 417        """Returns the built-in scheduler.
 418
 419        Args:
 420            environment: The target environment to source model snapshots from, or None
 421                if snapshots should be sourced from the currently loaded local state.
 422
 423        Returns:
 424            The built-in scheduler instance.
 425        """
 426        snapshots: t.Iterable[Snapshot]
 427        if environment is not None:
 428            stored_environment = self.state_sync.get_environment(environment)
 429            if stored_environment is None:
 430                raise ConfigError(f"Environment '{environment}' was not found.")
 431            snapshots = self.state_sync.get_snapshots(stored_environment.snapshots).values()
 432        else:
 433            snapshots = self.snapshots.values()
 434
 435        if not snapshots:
 436            raise ConfigError("No models were found")
 437
 438        return Scheduler(
 439            snapshots,
 440            self.snapshot_evaluator,
 441            self.state_sync,
 442            default_catalog=self.default_catalog,
 443            max_workers=self.concurrent_tasks,
 444            console=self.console,
 445            notification_target_manager=self.notification_target_manager,
 446        )
 447
 448    @property
 449    def state_sync(self) -> StateSync:
 450        if not self._state_sync:
 451            self._state_sync = self._new_state_sync()
 452
 453            if self._state_sync.get_versions(validate=False).schema_version == 0:
 454                self._state_sync.migrate(default_catalog=self.default_catalog)
 455            self._state_sync.get_versions()
 456            self._state_sync = CachingStateSync(self._state_sync)  # type: ignore
 457        return self._state_sync
 458
 459    @property
 460    def state_reader(self) -> StateReader:
 461        return self.state_sync
 462
 463    def refresh(self) -> None:
 464        """Refresh all models that have been updated."""
 465        if self._loader.reload_needed():
 466            self.load()
 467
 468    def load(self, update_schemas: bool = True) -> GenericContext[C]:
 469        """Load all files in the context's path."""
 470        with sys_path(*self.configs):
 471            gc.disable()
 472            project = self._loader.load(self, update_schemas)
 473            self._macros = project.macros
 474            self._jinja_macros = project.jinja_macros
 475            self._models = project.models
 476            self._metrics = project.metrics
 477            self._standalone_audits.clear()
 478            self._audits.clear()
 479            for name, audit in project.audits.items():
 480                if isinstance(audit, StandaloneAudit):
 481                    self._standalone_audits[name] = audit
 482                else:
 483                    self._audits[name] = audit
 484            self.dag = project.dag
 485            gc.enable()
 486
 487            duplicates = set(self._models) & set(self._standalone_audits)
 488            if duplicates:
 489                raise ConfigError(
 490                    f"Models and Standalone audits cannot have the same name: {duplicates}"
 491                )
 492
 493        return self
 494
 495    def run(
 496        self,
 497        environment: t.Optional[str] = None,
 498        *,
 499        start: t.Optional[TimeLike] = None,
 500        end: t.Optional[TimeLike] = None,
 501        execution_time: t.Optional[TimeLike] = None,
 502        skip_janitor: bool = False,
 503        ignore_cron: bool = False,
 504    ) -> bool:
 505        """Run the entire dag through the scheduler.
 506
 507        Args:
 508            environment: The target environment to source model snapshots from and virtually update. Default: prod.
 509            start: The start of the interval to render.
 510            end: The end of the interval to render.
 511            execution_time: The date/time time reference to use for execution time. Defaults to now.
 512            skip_janitor: Whether to skip the janitor task.
 513            ignore_cron: Whether to ignore the model's cron schedule and run all available missing intervals.
 514
 515        Returns:
 516            True if the run was successful, False otherwise.
 517        """
 518        environment = environment or self.config.default_target_environment
 519
 520        self.notification_target_manager.notify(
 521            NotificationEvent.RUN_START, environment=environment
 522        )
 523        success = False
 524        try:
 525            success = self._run(
 526                environment=environment,
 527                start=start,
 528                end=end,
 529                execution_time=execution_time,
 530                skip_janitor=skip_janitor,
 531                ignore_cron=ignore_cron,
 532            )
 533        except Exception as e:
 534            self.notification_target_manager.notify(
 535                NotificationEvent.RUN_FAILURE, traceback.format_exc()
 536            )
 537            logger.error(f"Run Failure: {traceback.format_exc()}")
 538            raise e
 539
 540        if success:
 541            self.notification_target_manager.notify(
 542                NotificationEvent.RUN_END, environment=environment
 543            )
 544            self.console.log_success(f"Run finished for environment '{environment}'")
 545        else:
 546            self.notification_target_manager.notify(
 547                NotificationEvent.RUN_FAILURE, "See console logs for details."
 548            )
 549
 550        return success
 551
 552    @t.overload
 553    def get_model(
 554        self, model_or_snapshot: ModelOrSnapshot, raise_if_missing: Literal[True] = True
 555    ) -> Model: ...
 556
 557    @t.overload
 558    def get_model(
 559        self,
 560        model_or_snapshot: ModelOrSnapshot,
 561        raise_if_missing: Literal[False] = False,
 562    ) -> t.Optional[Model]: ...
 563
 564    def get_model(
 565        self, model_or_snapshot: ModelOrSnapshot, raise_if_missing: bool = False
 566    ) -> t.Optional[Model]:
 567        """Returns a model with the given name or None if a model with such name doesn't exist.
 568
 569        Args:
 570            model_or_snapshot: A model name, model, or snapshot.
 571            raise_if_missing: Raises an error if a model is not found.
 572
 573        Returns:
 574            The expected model.
 575        """
 576        if isinstance(model_or_snapshot, str):
 577            normalized_name = normalize_model_name(
 578                model_or_snapshot,
 579                dialect=self.default_dialect,
 580                default_catalog=self.default_catalog,
 581            )
 582            model = self._models.get(normalized_name)
 583        elif isinstance(model_or_snapshot, Snapshot):
 584            model = model_or_snapshot.model
 585        else:
 586            model = model_or_snapshot
 587
 588        if raise_if_missing and not model:
 589            raise SQLMeshError(f"Cannot find model for '{model_or_snapshot}'")
 590
 591        return model
 592
 593    @t.overload
 594    def get_snapshot(self, node_or_snapshot: NodeOrSnapshot) -> t.Optional[Snapshot]: ...
 595
 596    @t.overload
 597    def get_snapshot(
 598        self, node_or_snapshot: NodeOrSnapshot, raise_if_missing: Literal[True]
 599    ) -> Snapshot: ...
 600
 601    @t.overload
 602    def get_snapshot(
 603        self, node_or_snapshot: NodeOrSnapshot, raise_if_missing: Literal[False]
 604    ) -> t.Optional[Snapshot]: ...
 605
 606    def get_snapshot(
 607        self, node_or_snapshot: NodeOrSnapshot, raise_if_missing: bool = False
 608    ) -> t.Optional[Snapshot]:
 609        """Returns a snapshot with the given name or None if a snapshot with such name doesn't exist.
 610
 611        Args:
 612            node_or_snapshot: A node name, node, or snapshot.
 613            raise_if_missing: Raises an error if a snapshot is not found.
 614
 615        Returns:
 616            The expected snapshot.
 617        """
 618        if isinstance(node_or_snapshot, Snapshot):
 619            return node_or_snapshot
 620        if isinstance(node_or_snapshot, str) and not self.standalone_audits.get(node_or_snapshot):
 621            node_or_snapshot = normalize_model_name(
 622                node_or_snapshot,
 623                dialect=self.default_dialect,
 624                default_catalog=self.default_catalog,
 625            )
 626        fqn = node_or_snapshot if isinstance(node_or_snapshot, str) else node_or_snapshot.fqn
 627        snapshot = self.snapshots.get(fqn)
 628
 629        if raise_if_missing and not snapshot:
 630            raise SQLMeshError(f"Cannot find snapshot for '{fqn}'")
 631
 632        return snapshot
 633
 634    def config_for_path(self, path: Path) -> Config:
 635        for config_path, config in self.configs.items():
 636            try:
 637                path.relative_to(config_path)
 638                return config
 639            except ValueError:
 640                pass
 641        return self.config
 642
 643    def config_for_node(self, node: str | Model | StandaloneAudit) -> Config:
 644        if isinstance(node, str):
 645            return self.config_for_path(self.get_snapshot(node, raise_if_missing=True).node._path)  # type: ignore
 646        return self.config_for_path(node._path)  # type: ignore
 647
 648    @property
 649    def models(self) -> MappingProxyType[str, Model]:
 650        """Returns all registered models in this context."""
 651        return MappingProxyType(self._models)
 652
 653    @property
 654    def metrics(self) -> MappingProxyType[str, Metric]:
 655        """Returns all registered metrics in this context."""
 656        return MappingProxyType(self._metrics)
 657
 658    @property
 659    def standalone_audits(self) -> MappingProxyType[str, StandaloneAudit]:
 660        """Returns all registered standalone audits in this context."""
 661        return MappingProxyType(self._standalone_audits)
 662
 663    @property
 664    def snapshots(self) -> t.Dict[str, Snapshot]:
 665        """Generates and returns snapshots based on models registered in this context.
 666
 667        If one of the snapshots has been previously stored in the persisted state, the stored
 668        instance will be returned.
 669        """
 670        return self._snapshots()
 671
 672    @property
 673    def default_catalog(self) -> t.Optional[str]:
 674        if self._default_catalog is None:
 675            self._default_catalog = self._scheduler.get_default_catalog(self)
 676        return self._default_catalog
 677
 678    def render(
 679        self,
 680        model_or_snapshot: ModelOrSnapshot,
 681        *,
 682        start: t.Optional[TimeLike] = None,
 683        end: t.Optional[TimeLike] = None,
 684        execution_time: t.Optional[TimeLike] = None,
 685        expand: t.Union[bool, t.Iterable[str]] = False,
 686        **kwargs: t.Any,
 687    ) -> exp.Expression:
 688        """Renders a model's query, expanding macros with provided kwargs, and optionally expanding referenced models.
 689
 690        Args:
 691            model_or_snapshot: The model, model name, or snapshot to render.
 692            start: The start of the interval to render.
 693            end: The end of the interval to render.
 694            execution_time: The date/time time reference to use for execution time. Defaults to now.
 695            expand: Whether or not to use expand materialized models, defaults to False.
 696                If True, all referenced models are expanded as raw queries.
 697                If a list, only referenced models are expanded as raw queries.
 698
 699        Returns:
 700            The rendered expression.
 701        """
 702        execution_time = execution_time or now_ds()
 703
 704        model = self.get_model(model_or_snapshot, raise_if_missing=True)
 705
 706        if expand and not isinstance(expand, bool):
 707            expand = {
 708                normalize_model_name(
 709                    x, default_catalog=self.default_catalog, dialect=self.default_dialect
 710                )
 711                for x in expand
 712            }
 713
 714        expand = self.dag.upstream(model.fqn) if expand is True else expand or []
 715
 716        if model.is_seed:
 717            df = next(
 718                model.render(
 719                    context=self.execution_context(),
 720                    start=start,
 721                    end=end,
 722                    execution_time=execution_time,
 723                    **kwargs,
 724                )
 725            )
 726            return next(pandas_to_sql(t.cast(pd.DataFrame, df), model.columns_to_types))
 727
 728        return model.render_query_or_raise(
 729            start=start,
 730            end=end,
 731            execution_time=execution_time,
 732            snapshots=self.snapshots,
 733            expand=expand,
 734            engine_adapter=self.engine_adapter,
 735            **kwargs,
 736        )
 737
 738    def evaluate(
 739        self,
 740        model_or_snapshot: ModelOrSnapshot,
 741        start: TimeLike,
 742        end: TimeLike,
 743        execution_time: TimeLike,
 744        limit: t.Optional[int] = None,
 745        **kwargs: t.Any,
 746    ) -> DF:
 747        """Evaluate a model or snapshot (running its query against a DB/Engine).
 748
 749        This method is used to test or iterate on models without side effects.
 750
 751        Args:
 752            model_or_snapshot: The model, model name, or snapshot to render.
 753            start: The start of the interval to evaluate.
 754            end: The end of the interval to evaluate.
 755            execution_time: The date/time time reference to use for execution time.
 756            limit: A limit applied to the model.
 757        """
 758        snapshot = self.get_snapshot(model_or_snapshot, raise_if_missing=True)
 759
 760        df = self.snapshot_evaluator.evaluate_and_fetch(
 761            snapshot,
 762            start=start,
 763            end=end,
 764            execution_time=execution_time,
 765            snapshots=self.snapshots,
 766            limit=limit or c.DEFAULT_MAX_LIMIT,
 767        )
 768
 769        if df is None:
 770            raise RuntimeError(f"Error evaluating {snapshot.name}")
 771
 772        return df
 773
 774    def format(
 775        self,
 776        transpile: t.Optional[str] = None,
 777        append_newline: t.Optional[bool] = None,
 778        **kwargs: t.Any,
 779    ) -> None:
 780        """Format all SQL models."""
 781        for model in self._models.values():
 782            if not model._path.suffix == ".sql":
 783                continue
 784            with open(model._path, "r+", encoding="utf-8") as file:
 785                expressions = parse(
 786                    file.read(), default_dialect=self.config_for_node(model).dialect
 787                )
 788                if transpile:
 789                    for prop in expressions[0].expressions:
 790                        if prop.name.lower() == "dialect":
 791                            prop.replace(
 792                                exp.Property(
 793                                    this="dialect",
 794                                    value=exp.Literal.string(transpile or model.dialect),
 795                                )
 796                            )
 797                format = self.config_for_node(model).format
 798                opts = {**format.generator_options, **kwargs}
 799                file.seek(0)
 800                file.write(
 801                    format_model_expressions(expressions, transpile or model.dialect, **opts)
 802                )
 803                if append_newline is None:
 804                    append_newline = format.append_newline
 805                if append_newline:
 806                    file.write("\n")
 807                file.truncate()
 808
 809    def plan(
 810        self,
 811        environment: t.Optional[str] = None,
 812        *,
 813        start: t.Optional[TimeLike] = None,
 814        end: t.Optional[TimeLike] = None,
 815        execution_time: t.Optional[TimeLike] = None,
 816        create_from: t.Optional[str] = None,
 817        skip_tests: bool = False,
 818        restate_models: t.Optional[t.Iterable[str]] = None,
 819        no_gaps: bool = False,
 820        skip_backfill: bool = False,
 821        forward_only: t.Optional[bool] = None,
 822        no_prompts: t.Optional[bool] = None,
 823        auto_apply: t.Optional[bool] = None,
 824        no_auto_categorization: t.Optional[bool] = None,
 825        effective_from: t.Optional[TimeLike] = None,
 826        include_unmodified: t.Optional[bool] = None,
 827        select_models: t.Optional[t.Collection[str]] = None,
 828        backfill_models: t.Optional[t.Collection[str]] = None,
 829        categorizer_config: t.Optional[CategorizerConfig] = None,
 830        enable_preview: t.Optional[bool] = None,
 831        no_diff: t.Optional[bool] = None,
 832        run: bool = False,
 833    ) -> Plan:
 834        """Interactively creates a plan.
 835
 836        This method compares the current context with the target environment. It then presents
 837        the differences and asks whether to backfill each modified model.
 838
 839        Args:
 840            environment: The environment to diff and plan against.
 841            start: The start date of the backfill if there is one.
 842            end: The end date of the backfill if there is one.
 843            execution_time: The date/time reference to use for execution time. Defaults to now.
 844            create_from: The environment to create the target environment from if it
 845                doesn't exist. If not specified, the "prod" environment will be used.
 846            skip_tests: Unit tests are run by default so this will skip them if enabled
 847            restate_models: A list of either internal or external models, or tags, that need to be restated
 848                for the given plan interval. If the target environment is a production environment,
 849                ALL snapshots that depended on these upstream tables will have their intervals deleted
 850                (even ones not in this current environment). Only the snapshots in this environment will
 851                be backfilled whereas others need to be recovered on a future plan application. For development
 852                environments only snapshots that are part of this plan will be affected.
 853            no_gaps:  Whether to ensure that new snapshots for models that are already a
 854                part of the target environment have no data gaps when compared against previous
 855                snapshots for same models.
 856            skip_backfill: Whether to skip the backfill step. Default: False.
 857            forward_only: Whether the purpose of the plan is to make forward only changes.
 858            no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that
 859                if this flag is set to true and there are uncategorized changes the plan creation will
 860                fail. Default: False.
 861            auto_apply: Whether to automatically apply the new plan after creation. Default: False.
 862            no_auto_categorization: Indicates whether to disable automatic categorization of model
 863                changes (breaking / non-breaking). If not provided, then the corresponding configuration
 864                option determines the behavior.
 865            categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the
 866                project config by default.
 867            effective_from: The effective date from which to apply forward-only changes on production.
 868            include_unmodified: Indicates whether to include unmodified models in the target development environment.
 869            select_models: A list of model selection strings to filter the models that should be included into this plan.
 870            backfill_models: A list of model selection strings to filter the models for which the data should be backfilled.
 871            enable_preview: Indicates whether to enable preview for forward-only models in development environments.
 872            no_diff: Hide text differences for changed models.
 873            run: Whether to run latest intervals as part of the plan application.
 874
 875        Returns:
 876            The populated Plan object.
 877        """
 878        plan_builder = self.plan_builder(
 879            environment,
 880            start=start,
 881            end=end,
 882            execution_time=execution_time,
 883            create_from=create_from,
 884            skip_tests=skip_tests,
 885            restate_models=restate_models,
 886            no_gaps=no_gaps,
 887            skip_backfill=skip_backfill,
 888            forward_only=forward_only,
 889            no_auto_categorization=no_auto_categorization,
 890            effective_from=effective_from,
 891            include_unmodified=include_unmodified,
 892            select_models=select_models,
 893            backfill_models=backfill_models,
 894            categorizer_config=categorizer_config,
 895            enable_preview=enable_preview,
 896            run=run,
 897        )
 898
 899        self.console.plan(
 900            plan_builder,
 901            auto_apply if auto_apply is not None else self.config.plan.auto_apply,
 902            self.default_catalog,
 903            no_diff=no_diff if no_diff is not None else self.config.plan.no_diff,
 904            no_prompts=no_prompts if no_prompts is not None else self.config.plan.no_prompts,
 905        )
 906
 907        return plan_builder.build()
 908
 909    def plan_builder(
 910        self,
 911        environment: t.Optional[str] = None,
 912        *,
 913        start: t.Optional[TimeLike] = None,
 914        end: t.Optional[TimeLike] = None,
 915        execution_time: t.Optional[TimeLike] = None,
 916        create_from: t.Optional[str] = None,
 917        skip_tests: bool = False,
 918        restate_models: t.Optional[t.Iterable[str]] = None,
 919        no_gaps: bool = False,
 920        skip_backfill: bool = False,
 921        forward_only: t.Optional[bool] = None,
 922        no_auto_categorization: t.Optional[bool] = None,
 923        effective_from: t.Optional[TimeLike] = None,
 924        include_unmodified: t.Optional[bool] = None,
 925        select_models: t.Optional[t.Collection[str]] = None,
 926        backfill_models: t.Optional[t.Collection[str]] = None,
 927        categorizer_config: t.Optional[CategorizerConfig] = None,
 928        enable_preview: t.Optional[bool] = None,
 929        run: bool = False,
 930    ) -> PlanBuilder:
 931        """Creates a plan builder.
 932
 933        Args:
 934            environment: The environment to diff and plan against.
 935            start: The start date of the backfill if there is one.
 936            end: The end date of the backfill if there is one.
 937            execution_time: The date/time reference to use for execution time. Defaults to now.
 938            create_from: The environment to create the target environment from if it
 939                doesn't exist. If not specified, the "prod" environment will be used.
 940            skip_tests: Unit tests are run by default so this will skip them if enabled
 941            restate_models: A list of either internal or external models, or tags, that need to be restated
 942                for the given plan interval. If the target environment is a production environment,
 943                ALL snapshots that depended on these upstream tables will have their intervals deleted
 944                (even ones not in this current environment). Only the snapshots in this environment will
 945                be backfilled whereas others need to be recovered on a future plan application. For development
 946                environments only snapshots that are part of this plan will be affected.
 947            no_gaps:  Whether to ensure that new snapshots for models that are already a
 948                part of the target environment have no data gaps when compared against previous
 949                snapshots for same models.
 950            skip_backfill: Whether to skip the backfill step. Default: False.
 951            forward_only: Whether the purpose of the plan is to make forward only changes.
 952            no_auto_categorization: Indicates whether to disable automatic categorization of model
 953                changes (breaking / non-breaking). If not provided, then the corresponding configuration
 954                option determines the behavior.
 955            categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the
 956                project config by default.
 957            effective_from: The effective date from which to apply forward-only changes on production.
 958            include_unmodified: Indicates whether to include unmodified models in the target development environment.
 959            select_models: A list of model selection strings to filter the models that should be included into this plan.
 960            backfill_models: A list of model selection strings to filter the models for which the data should be backfilled.
 961            enable_preview: Indicates whether to enable preview for forward-only models in development environments.
 962            run: Whether to run latest intervals as part of the plan application.
 963
 964        Returns:
 965            The plan builder.
 966        """
 967        environment = environment or self.config.default_target_environment
 968        environment = Environment.normalize_name(environment)
 969        is_dev = environment != c.PROD
 970
 971        if skip_backfill and not no_gaps and not is_dev:
 972            raise ConfigError(
 973                "When targeting the production environment either the backfill should not be skipped or the lack of data gaps should be enforced (--no-gaps flag)."
 974            )
 975
 976        if run and is_dev:
 977            raise ConfigError("The '--run' flag is only supported for the production environment.")
 978
 979        self._run_plan_tests(skip_tests=skip_tests)
 980
 981        environment_ttl = (
 982            self.environment_ttl if environment not in self.pinned_environments else None
 983        )
 984
 985        model_selector = self._new_selector()
 986
 987        if backfill_models:
 988            backfill_models = model_selector.expand_model_selections(backfill_models)
 989        else:
 990            backfill_models = None
 991
 992        models_override: t.Optional[UniqueKeyDict[str, Model]] = None
 993        if select_models:
 994            models_override = model_selector.select_models(
 995                select_models,
 996                environment,
 997                fallback_env_name=create_from or c.PROD,
 998                ensure_finalized_snapshots=self.config.plan.use_finalized_state,
 999            )
1000            if not backfill_models:
1001                # Only backfill selected models unless explicitly specified.
1002                backfill_models = model_selector.expand_model_selections(select_models)
1003
1004        expanded_restate_models = None
1005        if restate_models is not None:
1006            expanded_restate_models = model_selector.expand_model_selections(restate_models)
1007            if not expanded_restate_models:
1008                self.console.log_error(
1009                    f"Provided restated models do not match any models. No models will be included in plan. Provided: {', '.join(restate_models)}"
1010                )
1011
1012        snapshots = self._snapshots(models_override)
1013        context_diff = self._context_diff(
1014            environment or c.PROD,
1015            snapshots=snapshots,
1016            create_from=create_from,
1017            force_no_diff=(restate_models is not None and not expanded_restate_models)
1018            or (backfill_models is not None and not backfill_models),
1019            ensure_finalized_snapshots=self.config.plan.use_finalized_state,
1020        )
1021
1022        # If no end date is specified, use the max interval end from prod
1023        # to prevent unintended evaluation of the entire DAG.
1024        if not run:
1025            if backfill_models is not None:
1026                # Only consider selected models for the default end value.
1027                models_for_default_end = backfill_models.copy()
1028                for name in backfill_models:
1029                    if name not in snapshots:
1030                        continue
1031                    snapshot = snapshots[name]
1032                    snapshot_id = snapshot.snapshot_id
1033                    if (
1034                        snapshot_id in context_diff.added
1035                        and snapshot_id in context_diff.new_snapshots
1036                    ):
1037                        # If the selected model is a newly added model, then we should narrow down the intervals
1038                        # that should be considered for the default plan end value by including its parents.
1039                        models_for_default_end |= {s.name for s in snapshot.parents}
1040                default_end = self.state_sync.greatest_common_interval_end(
1041                    c.PROD,
1042                    models_for_default_end,
1043                    ensure_finalized_snapshots=self.config.plan.use_finalized_state,
1044                )
1045            else:
1046                default_end = self.state_sync.max_interval_end_for_environment(
1047                    c.PROD, ensure_finalized_snapshots=self.config.plan.use_finalized_state
1048                )
1049        else:
1050            default_end = None
1051
1052        default_start = to_date(default_end) - timedelta(days=1) if default_end and is_dev else None
1053
1054        return PlanBuilder(
1055            context_diff=context_diff,
1056            start=start,
1057            end=end,
1058            execution_time=execution_time,
1059            apply=self.apply,
1060            restate_models=expanded_restate_models,
1061            backfill_models=backfill_models,
1062            no_gaps=no_gaps,
1063            skip_backfill=skip_backfill,
1064            is_dev=is_dev,
1065            forward_only=(
1066                forward_only if forward_only is not None else self.config.plan.forward_only
1067            ),
1068            environment_ttl=environment_ttl,
1069            environment_suffix_target=self.config.environment_suffix_target,
1070            environment_catalog_mapping=self.config.environment_catalog_mapping,
1071            categorizer_config=categorizer_config or self.auto_categorize_changes,
1072            auto_categorization_enabled=not no_auto_categorization,
1073            effective_from=effective_from,
1074            include_unmodified=(
1075                include_unmodified
1076                if include_unmodified is not None
1077                else self.config.plan.include_unmodified
1078            ),
1079            default_start=default_start,
1080            default_end=default_end,
1081            enable_preview=(
1082                enable_preview if enable_preview is not None else self.config.plan.enable_preview
1083            ),
1084            end_bounded=not run,
1085            ensure_finalized_snapshots=self.config.plan.use_finalized_state,
1086        )
1087
1088    def apply(
1089        self,
1090        plan: Plan,
1091        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
1092    ) -> None:
1093        """Applies a plan by pushing snapshots and backfilling data.
1094
1095        Given a plan, it pushes snapshots into the state sync and then uses the scheduler
1096        to backfill all models.
1097
1098        Args:
1099            plan: The plan to apply.
1100            circuit_breaker: An optional handler which checks if the apply should be aborted.
1101        """
1102        if (
1103            not plan.context_diff.has_changes
1104            and not plan.requires_backfill
1105            and not plan.has_unmodified_unpromoted
1106        ):
1107            return
1108        if plan.uncategorized:
1109            raise UncategorizedPlanError("Can't apply a plan with uncategorized changes.")
1110        self.notification_target_manager.notify(
1111            NotificationEvent.APPLY_START,
1112            environment=plan.environment_naming_info.name,
1113            plan_id=plan.plan_id,
1114        )
1115        try:
1116            self._apply(plan, circuit_breaker)
1117        except Exception as e:
1118            self.notification_target_manager.notify(
1119                NotificationEvent.APPLY_FAILURE,
1120                environment=plan.environment_naming_info.name,
1121                plan_id=plan.plan_id,
1122                exc=traceback.format_exc(),
1123            )
1124            logger.error(f"Apply Failure: {traceback.format_exc()}")
1125            raise e
1126        self.notification_target_manager.notify(
1127            NotificationEvent.APPLY_END,
1128            environment=plan.environment_naming_info.name,
1129            plan_id=plan.plan_id,
1130        )
1131
1132    def invalidate_environment(self, name: str, sync: bool = False) -> None:
1133        """Invalidates the target environment by setting its expiration timestamp to now.
1134
1135        Args:
1136            name: The name of the environment to invalidate.
1137            sync: If True, the call blocks until the environment is deleted. Otherwise, the environment will
1138                be deleted asynchronously by the janitor process.
1139        """
1140        self.state_sync.invalidate_environment(name)
1141        if sync:
1142            self._cleanup_environments()
1143            self.console.log_success(f"Environment '{name}' has been deleted.")
1144        else:
1145            self.console.log_success(f"Environment '{name}' has been invalidated.")
1146
1147    def diff(self, environment: t.Optional[str] = None, detailed: bool = False) -> bool:
1148        """Show a diff of the current context with a given environment.
1149
1150        Args:
1151            environment: The environment to diff against.
1152            detailed: Show the actual SQL differences if True.
1153
1154        Returns:
1155            True if there are changes, False otherwise.
1156        """
1157        environment = environment or self.config.default_target_environment
1158        environment = Environment.normalize_name(environment)
1159        context_diff = self._context_diff(environment)
1160        self.console.show_model_difference_summary(
1161            context_diff,
1162            EnvironmentNamingInfo.from_environment_catalog_mapping(
1163                self.config.environment_catalog_mapping,
1164                name=environment,
1165                suffix_target=self.config.environment_suffix_target,
1166            ),
1167            self.default_catalog,
1168            no_diff=not detailed,
1169        )
1170        return context_diff.has_changes
1171
1172    def table_diff(
1173        self,
1174        source: str,
1175        target: str,
1176        on: t.List[str] | exp.Condition | None = None,
1177        model_or_snapshot: t.Optional[ModelOrSnapshot] = None,
1178        where: t.Optional[str | exp.Condition] = None,
1179        limit: int = 20,
1180        show: bool = True,
1181        show_sample: bool = True,
1182    ) -> TableDiff:
1183        """Show a diff between two tables.
1184
1185        Args:
1186            source: The source environment or table.
1187            target: The target environment or table.
1188            on: The join condition, table aliases must be "s" and "t" for source and target.
1189                If omitted, the table's grain will be used.
1190            model_or_snapshot: The model or snapshot to use when environments are passed in.
1191            where: An optional where statement to filter results.
1192            limit: The limit of the sample dataframe.
1193            show: Show the table diff output in the console.
1194            show_sample: Show the sample dataframe in the console. Requires show=True.
1195
1196        Returns:
1197            The TableDiff object containing schema and summary differences.
1198        """
1199        source_alias, target_alias = source, target
1200        if model_or_snapshot:
1201            model = self.get_model(model_or_snapshot, raise_if_missing=True)
1202            source_env = self.state_reader.get_environment(source)
1203            target_env = self.state_reader.get_environment(target)
1204
1205            if not source_env:
1206                raise SQLMeshError(f"Could not find environment '{source}'")
1207            if not target_env:
1208                raise SQLMeshError(f"Could not find environment '{target}')")
1209
1210            source = next(
1211                snapshot for snapshot in source_env.snapshots if snapshot.name == model.fqn
1212            ).table_name()
1213            target = next(
1214                snapshot for snapshot in target_env.snapshots if snapshot.name == model.fqn
1215            ).table_name()
1216            source_alias = source_env.name
1217            target_alias = target_env.name
1218
1219            if not on:
1220                for ref in model.all_references:
1221                    if ref.unique:
1222                        on = ref.columns
1223
1224        if not on:
1225            raise SQLMeshError(
1226                "SQLMesh doesn't know how to join the two tables. Specify the `grains` in each model definition or pass join column names in separate `-o` flags."
1227            )
1228
1229        table_diff = TableDiff(
1230            adapter=self._engine_adapter,
1231            source=source,
1232            target=target,
1233            on=on,
1234            where=where,
1235            source_alias=source_alias,
1236            target_alias=target_alias,
1237            model_name=model.name if model_or_snapshot else None,
1238            limit=limit,
1239        )
1240        if show:
1241            self.console.show_schema_diff(table_diff.schema_diff())
1242            self.console.show_row_diff(table_diff.row_diff(), show_sample=show_sample)
1243        return table_diff
1244
1245    def get_dag(
1246        self, select_models: t.Optional[t.Collection[str]] = None, **options: t.Any
1247    ) -> GraphHTML:
1248        """Gets an HTML object representation of the DAG.
1249
1250        Args:
1251            select_models: A list of model selection strings that should be included in the dag.
1252        Returns:
1253            An html object that renders the dag.
1254        """
1255        dag = (
1256            self.dag.prune(*self._new_selector().expand_model_selections(select_models))
1257            if select_models
1258            else self.dag
1259        )
1260
1261        nodes = {}
1262        edges: t.List[t.Dict] = []
1263
1264        for node, deps in dag.graph.items():
1265            nodes[node] = {
1266                "id": node,
1267                "label": node.split(".")[-1],
1268                "title": f"<span>{node}</span>",
1269            }
1270            edges.extend({"from": d, "to": node} for d in deps)
1271
1272        return GraphHTML(
1273            nodes,
1274            edges,
1275            options={
1276                "height": "100%",
1277                "width": "100%",
1278                "interaction": {},
1279                "layout": {
1280                    "hierarchical": {
1281                        "enabled": True,
1282                        "nodeSpacing": 200,
1283                        "sortMethod": "directed",
1284                    },
1285                },
1286                "nodes": {
1287                    "shape": "box",
1288                },
1289                **options,
1290            },
1291        )
1292
1293    def render_dag(self, path: str, select_models: t.Optional[t.Collection[str]] = None) -> None:
1294        """Render the dag as HTML and save it to a file.
1295
1296        Args:
1297            path: filename to save the dag html to
1298            select_models: A list of model selection strings that should be included in the dag.
1299        """
1300        file_path = Path(path)
1301        suffix = file_path.suffix
1302        if suffix != ".html":
1303            if suffix:
1304                logger.warning(
1305                    f"The extension {suffix} does not designate an html file. A file with a `.html` extension will be created instead."
1306                )
1307            path = str(file_path.with_suffix(".html"))
1308
1309        with open(path, "w", encoding="utf-8") as file:
1310            file.write(str(self.get_dag(select_models)))
1311
1312    def create_test(
1313        self,
1314        model: str,
1315        input_queries: t.Dict[str, str],
1316        overwrite: bool = False,
1317        variables: t.Optional[t.Dict[str, str]] = None,
1318        path: t.Optional[str] = None,
1319        name: t.Optional[str] = None,
1320        include_ctes: bool = False,
1321    ) -> None:
1322        """Generate a unit test fixture for a given model.
1323
1324        Args:
1325            model: The model to test.
1326            input_queries: Mapping of model names to queries. Each model included in this mapping
1327                will be populated in the test based on the results of the corresponding query.
1328            overwrite: Whether to overwrite the existing test in case of a file path collision.
1329                When set to False, an error will be raised if there is such a collision.
1330            variables: Key-value pairs that will define variables needed by the model.
1331            path: The file path corresponding to the fixture, relative to the test directory.
1332                By default, the fixture will be created under the test directory and the file name
1333                will be inferred from the test's name.
1334            name: The name of the test. This is inferred from the model name by default.
1335            include_ctes: When true, CTE fixtures will also be generated.
1336        """
1337        input_queries = {
1338            # The get_model here has two purposes: return normalized names & check for missing deps
1339            self.get_model(dep, raise_if_missing=True).fqn: query
1340            for dep, query in input_queries.items()
1341        }
1342
1343        try:
1344            test_adapter = self._test_connection_config.create_engine_adapter(
1345                register_comments_override=False
1346            )
1347            generate_test(
1348                model=self.get_model(model, raise_if_missing=True),
1349                input_queries=input_queries,
1350                models=self._models,
1351                engine_adapter=self._engine_adapter,
1352                test_engine_adapter=test_adapter,
1353                project_path=self.path,
1354                overwrite=overwrite,
1355                variables=variables,
1356                path=path,
1357                name=name,
1358                include_ctes=include_ctes,
1359            )
1360        finally:
1361            test_adapter.close()
1362
1363    def test(
1364        self,
1365        match_patterns: t.Optional[t.List[str]] = None,
1366        tests: t.Optional[t.List[str]] = None,
1367        verbose: bool = False,
1368        preserve_fixtures: bool = False,
1369        stream: t.Optional[t.TextIO] = None,
1370    ) -> ModelTextTestResult:
1371        """Discover and run model tests"""
1372        if verbose:
1373            pd.set_option("display.max_columns", None)
1374            verbosity = 2
1375        else:
1376            verbosity = 1
1377
1378        if tests:
1379            result = run_model_tests(
1380                tests=tests,
1381                models=self._models,
1382                config=self.config,
1383                gateway=self.gateway,
1384                dialect=self.default_dialect,
1385                verbosity=verbosity,
1386                patterns=match_patterns,
1387                preserve_fixtures=preserve_fixtures,
1388                stream=stream,
1389                default_catalog=self.default_catalog,
1390                default_catalog_dialect=self.engine_adapter.DIALECT,
1391            )
1392        else:
1393            test_meta = []
1394
1395            for path, config in self.configs.items():
1396                test_meta.extend(
1397                    get_all_model_tests(
1398                        path / c.TESTS,
1399                        patterns=match_patterns,
1400                        ignore_patterns=config.ignore_patterns,
1401                    )
1402                )
1403
1404            result = run_tests(
1405                model_test_metadata=test_meta,
1406                models=self._models,
1407                config=self.config,
1408                gateway=self.gateway,
1409                dialect=self.default_dialect,
1410                verbosity=verbosity,
1411                preserve_fixtures=preserve_fixtures,
1412                stream=stream,
1413                default_catalog=self.default_catalog,
1414                default_catalog_dialect=self.engine_adapter.DIALECT,
1415            )
1416
1417        return result
1418
1419    def audit(
1420        self,
1421        start: TimeLike,
1422        end: TimeLike,
1423        *,
1424        models: t.Optional[t.Iterator[str]] = None,
1425        execution_time: t.Optional[TimeLike] = None,
1426    ) -> None:
1427        """Audit models.
1428
1429        Args:
1430            start: The start of the interval to audit.
1431            end: The end of the interval to audit.
1432            models: The models to audit. All models will be audited if not specified.
1433            execution_time: The date/time time reference to use for execution time. Defaults to now.
1434        """
1435
1436        snapshots = (
1437            [self.get_snapshot(model, raise_if_missing=True) for model in models]
1438            if models
1439            else self.snapshots.values()
1440        )
1441
1442        num_audits = sum(len(snapshot.audits_with_args) for snapshot in snapshots)
1443        self.console.log_status_update(f"Found {num_audits} audit(s).")
1444        errors = []
1445        skipped_count = 0
1446        for snapshot in snapshots:
1447            for audit_result in self.snapshot_evaluator.audit(
1448                snapshot=snapshot,
1449                start=start,
1450                end=end,
1451                snapshots=self.snapshots,
1452                raise_exception=False,
1453            ):
1454                audit_id = f"{audit_result.audit.name}"
1455                if audit_result.model:
1456                    audit_id += f" on model {audit_result.model.name}"
1457
1458                if audit_result.skipped:
1459                    self.console.log_status_update(f"{audit_id} ⏸️ SKIPPED.")
1460                    skipped_count += 1
1461                elif audit_result.count:
1462                    errors.append(audit_result)
1463                    self.console.log_status_update(
1464                        f"{audit_id} ❌ [red]FAIL [{audit_result.count}][/red]."
1465                    )
1466                else:
1467                    self.console.log_status_update(f"{audit_id} ✅ [green]PASS[/green].")
1468
1469        self.console.log_status_update(
1470            f"\nFinished with {len(errors)} audit error{'' if len(errors) == 1 else 's'} "
1471            f"and {skipped_count} audit{'' if skipped_count == 1 else 's'} skipped."
1472        )
1473        for error in errors:
1474            self.console.log_status_update(
1475                f"\nFailure in audit {error.audit.name} ({error.audit._path})."
1476            )
1477            self.console.log_status_update(f"Got {error.count} results, expected 0.")
1478            if error.query:
1479                self.console.show_sql(
1480                    f"{error.query.sql(dialect=self.snapshot_evaluator.adapter.dialect)}"
1481                )
1482
1483        self.console.log_status_update("Done.")
1484
1485    def rewrite(self, sql: str, dialect: str = "") -> exp.Expression:
1486        """Rewrite a sql expression with semantic references into an executable query.
1487
1488        https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/
1489
1490        Args:
1491            sql: The sql string to rewrite.
1492            dialect: The dialect of the sql string, defaults to the project dialect.
1493
1494        Returns:
1495            A SQLGlot expression with semantic references expanded.
1496        """
1497        return rewrite(
1498            sql,
1499            graph=ReferenceGraph(self.models.values()),
1500            metrics=self._metrics,
1501            dialect=dialect or self.default_dialect,
1502        )
1503
1504    def migrate(self) -> None:
1505        """Migrates SQLMesh to the current running version.
1506
1507        Please contact your SQLMesh administrator before doing this.
1508        """
1509        self.notification_target_manager.notify(NotificationEvent.MIGRATION_START)
1510        try:
1511            self._new_state_sync().migrate(
1512                default_catalog=self.default_catalog,
1513                promoted_snapshots_only=self.config.migration.promoted_snapshots_only,
1514            )
1515        except Exception as e:
1516            self.notification_target_manager.notify(
1517                NotificationEvent.MIGRATION_FAILURE, traceback.format_exc()
1518            )
1519            raise e
1520        self.notification_target_manager.notify(NotificationEvent.MIGRATION_END)
1521
1522    def rollback(self) -> None:
1523        """Rolls back SQLMesh to the previous migration.
1524
1525        Please contact your SQLMesh administrator before doing this. This action cannot be undone.
1526        """
1527        self._new_state_sync().rollback()
1528
1529    def create_external_models(self) -> None:
1530        """Create a schema file with all external models.
1531
1532        The schema file contains all columns and types of external models, allowing for more robust
1533        lineage, validation, and optimizations.
1534        """
1535        if not self._models:
1536            self.load(update_schemas=False)
1537
1538        for path, config in self.configs.items():
1539            create_schema_file(
1540                path=path / c.SCHEMA_YAML,
1541                models=UniqueKeyDict(
1542                    "models",
1543                    {
1544                        fqn: model
1545                        for fqn, model in self._models.items()
1546                        if self.config_for_node(model) is config
1547                    },
1548                ),
1549                adapter=self._engine_adapter,
1550                state_reader=self.state_reader,
1551                dialect=config.model_defaults.dialect,
1552                max_workers=self.concurrent_tasks,
1553            )
1554
1555    def print_info(self) -> None:
1556        """Prints information about connections, models, macros, etc. to the console."""
1557        self.console.log_status_update(f"Models: {len(self.models)}")
1558        self.console.log_status_update(f"Macros: {len(self._macros) - len(macro.get_registry())}")
1559
1560        self._try_connection("data warehouse", self._engine_adapter)
1561
1562        state_connection = self.config.get_state_connection(self.gateway)
1563        if state_connection:
1564            self._try_connection("state backend", state_connection.create_engine_adapter())
1565
1566    def close(self) -> None:
1567        """Releases all resources allocated by this context."""
1568        self.snapshot_evaluator.close()
1569        self.state_sync.close()
1570
1571    def _run(
1572        self,
1573        environment: str,
1574        *,
1575        start: t.Optional[TimeLike],
1576        end: t.Optional[TimeLike],
1577        execution_time: t.Optional[TimeLike],
1578        skip_janitor: bool,
1579        ignore_cron: bool,
1580    ) -> bool:
1581        if not skip_janitor and environment.lower() == c.PROD:
1582            self._run_janitor()
1583
1584        env_check_attempts_num = max(
1585            1,
1586            self.config.run.environment_check_max_wait
1587            // self.config.run.environment_check_interval,
1588        )
1589
1590        def _block_until_finalized() -> str:
1591            for _ in range(env_check_attempts_num):
1592                assert environment is not None  # mypy
1593                environment_state = self.state_sync.get_environment(environment)
1594                if not environment_state:
1595                    raise SQLMeshError(f"Environment '{environment}' was not found.")
1596                if environment_state.finalized_ts:
1597                    return environment_state.plan_id
1598                logger.warning(
1599                    "Environment '%s' is being updated by plan '%s'. Retrying in %s seconds...",
1600                    environment,
1601                    environment_state.plan_id,
1602                    self.config.run.environment_check_interval,
1603                )
1604                time.sleep(self.config.run.environment_check_interval)
1605            raise SQLMeshError(
1606                f"Exceeded the maximum wait time for environment '{environment}' to be ready. "
1607                "This means that the environment either failed to update or the update is taking longer than expected. "
1608                "See https://sqlmesh.readthedocs.io/en/stable/reference/configuration/#run to adjust the timeout settings."
1609            )
1610
1611        done = False
1612        while not done:
1613            plan_id_at_start = _block_until_finalized()
1614
1615            def _has_environment_changed() -> bool:
1616                assert environment is not None  # mypy
1617                current_environment_state = self.state_sync.get_environment(environment)
1618                return (
1619                    not current_environment_state
1620                    or current_environment_state.plan_id != plan_id_at_start
1621                    or not current_environment_state.finalized_ts
1622                )
1623
1624            try:
1625                success = self.scheduler(environment=environment).run(
1626                    environment,
1627                    start=start,
1628                    end=end,
1629                    execution_time=execution_time,
1630                    ignore_cron=ignore_cron,
1631                    circuit_breaker=_has_environment_changed,
1632                )
1633                done = True
1634            except CircuitBreakerError:
1635                logger.warning(
1636                    "Environment '%s' has been modified while running. Restarting the run...",
1637                    environment,
1638                )
1639
1640        return success
1641
1642    def _apply(self, plan: Plan, circuit_breaker: t.Optional[t.Callable[[], bool]]) -> None:
1643        self._scheduler.create_plan_evaluator(self).evaluate(plan, circuit_breaker=circuit_breaker)
1644
1645    def table_name(self, model_name: str, dev: bool) -> str:
1646        """Returns the name of the pysical table for the given model name.
1647
1648        Args:
1649            model_name: The name of the model.
1650            dev: Whether to use the deployability index for the table name.
1651
1652        Returns:
1653            The name of the physical table.
1654        """
1655        deployability_index = (
1656            DeployabilityIndex.create(self.snapshots.values())
1657            if dev
1658            else DeployabilityIndex.all_deployable()
1659        )
1660        snapshot = self.get_snapshot(model_name)
1661        if not snapshot:
1662            raise SQLMeshError(f"Model '{model_name}' was not found.")
1663        return snapshot.table_name(is_deployable=deployability_index.is_deployable(snapshot))
1664
1665    def clear_caches(self) -> None:
1666        for path in self.configs:
1667            rmtree(path / c.CACHE)
1668
1669    def _run_tests(self, verbose: bool = False) -> t.Tuple[unittest.result.TestResult, str]:
1670        test_output_io = StringIO()
1671        result = self.test(stream=test_output_io, verbose=verbose)
1672        return result, test_output_io.getvalue()
1673
1674    def _run_plan_tests(
1675        self, skip_tests: bool = False
1676    ) -> t.Tuple[t.Optional[unittest.result.TestResult], t.Optional[str]]:
1677        if not skip_tests:
1678            result, test_output = self._run_tests()
1679            if result.testsRun > 0:
1680                self.console.log_test_results(
1681                    result, test_output, self._test_connection_config._engine_adapter.DIALECT
1682                )
1683            if not result.wasSuccessful():
1684                raise PlanError(
1685                    "Cannot generate plan due to failing test(s). Fix test(s) and run again"
1686                )
1687            return result, test_output
1688        return None, None
1689
1690    @property
1691    def _model_tables(self) -> t.Dict[str, str]:
1692        """Mapping of model name to physical table name.
1693
1694        If a snapshot has not been versioned yet, its view name will be returned.
1695        """
1696        return {
1697            fqn: (
1698                snapshot.table_name()
1699                if snapshot.version
1700                else snapshot.qualified_view_name.for_environment(
1701                    EnvironmentNamingInfo.from_environment_catalog_mapping(
1702                        self.config.environment_catalog_mapping,
1703                        name=c.PROD,
1704                        suffix_target=self.config.environment_suffix_target,
1705                    )
1706                )
1707            )
1708            for fqn, snapshot in self.snapshots.items()
1709        }
1710
1711    def _snapshots(
1712        self, models_override: t.Optional[UniqueKeyDict[str, Model]] = None
1713    ) -> t.Dict[str, Snapshot]:
1714        prod = self.state_reader.get_environment(c.PROD)
1715        remote_snapshots = (
1716            {
1717                snapshot.name: snapshot
1718                for snapshot in self.state_reader.get_snapshots(prod.snapshots).values()
1719            }
1720            if prod
1721            else {}
1722        )
1723
1724        local_nodes = {**(models_override or self._models), **self._standalone_audits}
1725        nodes = local_nodes.copy()
1726        audits = self._audits.copy()
1727        projects = {config.project for config in self.configs.values()}
1728
1729        for name, snapshot in remote_snapshots.items():
1730            if name not in nodes and snapshot.node.project not in projects:
1731                nodes[name] = snapshot.node
1732                if snapshot.is_model:
1733                    for audit in snapshot.audits:
1734                        if name not in audits:
1735                            audits[name] = audit
1736
1737        def _nodes_to_snapshots(nodes: t.Dict[str, Node]) -> t.Dict[str, Snapshot]:
1738            snapshots: t.Dict[str, Snapshot] = {}
1739            fingerprint_cache: t.Dict[str, SnapshotFingerprint] = {}
1740
1741            for node in nodes.values():
1742                if node.fqn not in local_nodes and node.fqn in remote_snapshots:
1743                    ttl = remote_snapshots[node.fqn].ttl
1744                else:
1745                    config = self.config_for_node(node)
1746                    ttl = config.snapshot_ttl
1747
1748                snapshot = Snapshot.from_node(
1749                    node,
1750                    nodes=nodes,
1751                    audits=audits,
1752                    cache=fingerprint_cache,
1753                    ttl=ttl,
1754                )
1755                snapshots[snapshot.name] = snapshot
1756            return snapshots
1757
1758        snapshots = _nodes_to_snapshots(nodes)
1759        stored_snapshots = self.state_reader.get_snapshots(snapshots.values())
1760
1761        unrestorable_snapshots = {
1762            snapshot
1763            for snapshot in stored_snapshots.values()
1764            if snapshot.name in local_nodes and snapshot.unrestorable
1765        }
1766        if unrestorable_snapshots:
1767            for snapshot in unrestorable_snapshots:
1768                logger.info(
1769                    "Found a unrestorable snapshot %s. Restamping the model...", snapshot.name
1770                )
1771                node = local_nodes[snapshot.name]
1772                nodes[snapshot.name] = node.copy(
1773                    update={"stamp": f"revert to {snapshot.identifier}"}
1774                )
1775            snapshots = _nodes_to_snapshots(nodes)
1776            stored_snapshots = self.state_reader.get_snapshots(snapshots.values())
1777
1778        for snapshot in stored_snapshots.values():
1779            # Keep the original model instance to preserve the query cache.
1780            snapshot.node = snapshots[snapshot.name].node
1781
1782        return {name: stored_snapshots.get(s.snapshot_id, s) for name, s in snapshots.items()}
1783
1784    def _context_diff(
1785        self,
1786        environment: str,
1787        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
1788        create_from: t.Optional[str] = None,
1789        force_no_diff: bool = False,
1790        ensure_finalized_snapshots: bool = False,
1791    ) -> ContextDiff:
1792        environment = Environment.normalize_name(environment)
1793        if force_no_diff:
1794            return ContextDiff.create_no_diff(environment)
1795        return ContextDiff.create(
1796            environment,
1797            snapshots=snapshots or self.snapshots,
1798            create_from=create_from or c.PROD,
1799            state_reader=self.state_reader,
1800            ensure_finalized_snapshots=ensure_finalized_snapshots,
1801        )
1802
1803    def _run_janitor(self) -> None:
1804        self._cleanup_environments()
1805        expired_snapshots = self.state_sync.delete_expired_snapshots()
1806        self.snapshot_evaluator.cleanup(
1807            expired_snapshots, on_complete=self.console.update_cleanup_progress
1808        )
1809
1810        self.state_sync.compact_intervals()
1811
1812    def _cleanup_environments(self) -> None:
1813        expired_environments = self.state_sync.delete_expired_environments()
1814        cleanup_expired_views(self.engine_adapter, expired_environments, console=self.console)
1815
1816    def _try_connection(self, connection_name: str, engine_adapter: EngineAdapter) -> None:
1817        connection_name = connection_name.capitalize()
1818        try:
1819            engine_adapter.fetchall("SELECT 1")
1820            self.console.log_status_update(f"{connection_name} connection [green]succeeded[/green]")
1821        except Exception as ex:
1822            self.console.log_error(f"{connection_name} connection failed. {ex}")
1823
1824    def _new_state_sync(self) -> StateSync:
1825        return self._provided_state_sync or self._scheduler.create_state_sync(self)
1826
1827    def _new_selector(self) -> Selector:
1828        return Selector(
1829            self.state_reader,
1830            self._models,
1831            context_path=self.path,
1832            default_catalog=self.default_catalog,
1833            dialect=self.default_dialect,
1834        )
1835
1836    def _register_notification_targets(self) -> None:
1837        event_notifications = collections.defaultdict(set)
1838        for target in self.notification_targets:
1839            if target.is_configured:
1840                for event in target.notify_on:
1841                    event_notifications[event].add(target)
1842        user_notification_targets = {
1843            user.username: set(
1844                target for target in user.notification_targets if target.is_configured
1845            )
1846            for user in self.users
1847        }
1848        self.notification_target_manager = NotificationTargetManager(
1849            event_notifications, user_notification_targets, username=self.config.username
1850        )
1851
1852
1853class Context(GenericContext[Config]):
1854    CONFIG_TYPE = Config
class BaseContext(abc.ABC):
131class BaseContext(abc.ABC):
132    """The base context which defines methods to execute a model."""
133
134    @property
135    @abc.abstractmethod
136    def default_dialect(self) -> t.Optional[str]:
137        """Returns the default dialect."""
138
139    @property
140    @abc.abstractmethod
141    def _model_tables(self) -> t.Dict[str, str]:
142        """Returns a mapping of model names to tables."""
143
144    @property
145    @abc.abstractmethod
146    def engine_adapter(self) -> EngineAdapter:
147        """Returns an engine adapter."""
148
149    @property
150    def spark(self) -> t.Optional[PySparkSession]:
151        """Returns the spark session if it exists."""
152        return self.engine_adapter.spark
153
154    @property
155    def default_catalog(self) -> t.Optional[str]:
156        raise NotImplementedError
157
158    def table(self, model_name: str) -> str:
159        """Gets the physical table name for a given model.
160
161        Args:
162            model_name: The model name.
163
164        Returns:
165            The physical table name.
166        """
167        model_name = normalize_model_name(model_name, self.default_catalog, self.default_dialect)
168
169        # We generate SQL for the default dialect because the table name may be used in a
170        # fetchdf call and so the quotes need to be correct (eg. backticks for bigquery)
171        return parse_one(self._model_tables[model_name]).sql(dialect=self.default_dialect)
172
173    def fetchdf(
174        self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False
175    ) -> pd.DataFrame:
176        """Fetches a dataframe given a sql string or sqlglot expression.
177
178        Args:
179            query: SQL string or sqlglot expression.
180            quote_identifiers: Whether to quote all identifiers in the query.
181
182        Returns:
183            The default dataframe is Pandas, but for Spark a PySpark dataframe is returned.
184        """
185        return self.engine_adapter.fetchdf(query, quote_identifiers=quote_identifiers)
186
187    def fetch_pyspark_df(
188        self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False
189    ) -> PySparkDataFrame:
190        """Fetches a PySpark dataframe given a sql string or sqlglot expression.
191
192        Args:
193            query: SQL string or sqlglot expression.
194            quote_identifiers: Whether to quote all identifiers in the query.
195
196        Returns:
197            A PySpark dataframe.
198        """
199        return self.engine_adapter.fetch_pyspark_df(query, quote_identifiers=quote_identifiers)

The base context which defines methods to execute a model.

default_dialect: Union[str, NoneType]

Returns the default dialect.

Returns an engine adapter.

spark: Union[<MagicMock id='140338279429600'>, NoneType]

Returns the spark session if it exists.

def table(self, model_name: str) -> str:
158    def table(self, model_name: str) -> str:
159        """Gets the physical table name for a given model.
160
161        Args:
162            model_name: The model name.
163
164        Returns:
165            The physical table name.
166        """
167        model_name = normalize_model_name(model_name, self.default_catalog, self.default_dialect)
168
169        # We generate SQL for the default dialect because the table name may be used in a
170        # fetchdf call and so the quotes need to be correct (eg. backticks for bigquery)
171        return parse_one(self._model_tables[model_name]).sql(dialect=self.default_dialect)

Gets the physical table name for a given model.

Arguments:
  • model_name: The model name.
Returns:

The physical table name.

def fetchdf( self, query: Union[sqlglot.expressions.Expression, str], quote_identifiers: bool = False) -> pandas.core.frame.DataFrame:
173    def fetchdf(
174        self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False
175    ) -> pd.DataFrame:
176        """Fetches a dataframe given a sql string or sqlglot expression.
177
178        Args:
179            query: SQL string or sqlglot expression.
180            quote_identifiers: Whether to quote all identifiers in the query.
181
182        Returns:
183            The default dataframe is Pandas, but for Spark a PySpark dataframe is returned.
184        """
185        return self.engine_adapter.fetchdf(query, quote_identifiers=quote_identifiers)

Fetches a dataframe given a sql string or sqlglot expression.

Arguments:
  • query: SQL string or sqlglot expression.
  • quote_identifiers: Whether to quote all identifiers in the query.
Returns:

The default dataframe is Pandas, but for Spark a PySpark dataframe is returned.

def fetch_pyspark_df( self, query: Union[sqlglot.expressions.Expression, str], quote_identifiers: bool = False) -> <MagicMock id='140338277118832'>:
187    def fetch_pyspark_df(
188        self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False
189    ) -> PySparkDataFrame:
190        """Fetches a PySpark dataframe given a sql string or sqlglot expression.
191
192        Args:
193            query: SQL string or sqlglot expression.
194            quote_identifiers: Whether to quote all identifiers in the query.
195
196        Returns:
197            A PySpark dataframe.
198        """
199        return self.engine_adapter.fetch_pyspark_df(query, quote_identifiers=quote_identifiers)

Fetches a PySpark dataframe given a sql string or sqlglot expression.

Arguments:
  • query: SQL string or sqlglot expression.
  • quote_identifiers: Whether to quote all identifiers in the query.
Returns:

A PySpark dataframe.

class ExecutionContext(BaseContext):
202class ExecutionContext(BaseContext):
203    """The minimal context needed to execute a model.
204
205    Args:
206        engine_adapter: The engine adapter to execute queries against.
207        snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations.
208        deployability_index: Determines snapshots that are deployable in the context of this evaluation.
209    """
210
211    def __init__(
212        self,
213        engine_adapter: EngineAdapter,
214        snapshots: t.Dict[str, Snapshot],
215        deployability_index: t.Optional[DeployabilityIndex] = None,
216        default_dialect: t.Optional[str] = None,
217        default_catalog: t.Optional[str] = None,
218        variables: t.Optional[t.Dict[str, t.Any]] = None,
219    ):
220        self.snapshots = snapshots
221        self.deployability_index = deployability_index
222        self._engine_adapter = engine_adapter
223        self._default_catalog = default_catalog
224        self._default_dialect = default_dialect
225        self._variables = variables or {}
226
227    @property
228    def default_dialect(self) -> t.Optional[str]:
229        return self._default_dialect
230
231    @property
232    def engine_adapter(self) -> EngineAdapter:
233        """Returns an engine adapter."""
234        return self._engine_adapter
235
236    @cached_property
237    def _model_tables(self) -> t.Dict[str, str]:
238        """Returns a mapping of model names to tables."""
239        return to_table_mapping(self.snapshots.values(), self.deployability_index)
240
241    @property
242    def default_catalog(self) -> t.Optional[str]:
243        return self._default_catalog
244
245    @property
246    def gateway(self) -> t.Optional[str]:
247        """Returns the gateway name."""
248        return self.var(c.GATEWAY)
249
250    def var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]:
251        """Returns a variable value."""
252        return self._variables.get(var_name.lower(), default)
253
254    def with_variables(self, variables: t.Dict[str, t.Any]) -> ExecutionContext:
255        """Returns a new ExecutionContext with additional variables."""
256        return ExecutionContext(
257            self._engine_adapter,
258            self.snapshots,
259            self.deployability_index,
260            self._default_dialect,
261            self._default_catalog,
262            variables=variables,
263        )

The minimal context needed to execute a model.

Arguments:
  • engine_adapter: The engine adapter to execute queries against.
  • snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations.
  • deployability_index: Determines snapshots that are deployable in the context of this evaluation.
ExecutionContext( engine_adapter: sqlmesh.core.engine_adapter.base.EngineAdapter, snapshots: Dict[str, sqlmesh.core.snapshot.definition.Snapshot], deployability_index: Union[sqlmesh.core.snapshot.definition.DeployabilityIndex, NoneType] = None, default_dialect: Union[str, NoneType] = None, default_catalog: Union[str, NoneType] = None, variables: Union[Dict[str, Any], NoneType] = None)
211    def __init__(
212        self,
213        engine_adapter: EngineAdapter,
214        snapshots: t.Dict[str, Snapshot],
215        deployability_index: t.Optional[DeployabilityIndex] = None,
216        default_dialect: t.Optional[str] = None,
217        default_catalog: t.Optional[str] = None,
218        variables: t.Optional[t.Dict[str, t.Any]] = None,
219    ):
220        self.snapshots = snapshots
221        self.deployability_index = deployability_index
222        self._engine_adapter = engine_adapter
223        self._default_catalog = default_catalog
224        self._default_dialect = default_dialect
225        self._variables = variables or {}
default_dialect: Union[str, NoneType]

Returns the default dialect.

Returns an engine adapter.

gateway: Union[str, NoneType]

Returns the gateway name.

def var( self, var_name: str, default: Union[Any, NoneType] = None) -> Union[Any, NoneType]:
250    def var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]:
251        """Returns a variable value."""
252        return self._variables.get(var_name.lower(), default)

Returns a variable value.

def with_variables(self, variables: Dict[str, Any]) -> sqlmesh.core.context.ExecutionContext:
254    def with_variables(self, variables: t.Dict[str, t.Any]) -> ExecutionContext:
255        """Returns a new ExecutionContext with additional variables."""
256        return ExecutionContext(
257            self._engine_adapter,
258            self.snapshots,
259            self.deployability_index,
260            self._default_dialect,
261            self._default_catalog,
262            variables=variables,
263        )

Returns a new ExecutionContext with additional variables.

class GenericContext(BaseContext, typing.Generic[~C]):
 266class GenericContext(BaseContext, t.Generic[C]):
 267    """Encapsulates a SQLMesh environment supplying convenient functions to perform various tasks.
 268
 269    Args:
 270        engine_adapter: The default engine adapter to use.
 271        notification_targets: The notification target to use. Defaults to what is defined in config.
 272        paths: The directories containing SQLMesh files.
 273        config: A Config object or the name of a Config object in config.py.
 274        connection: The name of the connection. If not specified the first connection as it appears
 275            in configuration will be used.
 276        test_connection: The name of the connection to use for tests. If not specified the first
 277            connection as it appears in configuration will be used.
 278        concurrent_tasks: The maximum number of tasks that can use the connection concurrently.
 279        load: Whether or not to automatically load all models and macros (default True).
 280        console: The rich instance used for printing out CLI command results.
 281        users: A list of users to make known to SQLMesh.
 282        config_type: The type of config object to use (default Config).
 283    """
 284
 285    CONFIG_TYPE: t.Type[C]
 286
 287    def __init__(
 288        self,
 289        engine_adapter: t.Optional[EngineAdapter] = None,
 290        notification_targets: t.Optional[t.List[NotificationTarget]] = None,
 291        state_sync: t.Optional[StateSync] = None,
 292        paths: t.Union[str | Path, t.Iterable[str | Path]] = "",
 293        config: t.Optional[t.Union[C, str, t.Dict[Path, C]]] = None,
 294        gateway: t.Optional[str] = None,
 295        concurrent_tasks: t.Optional[int] = None,
 296        loader: t.Optional[t.Type[Loader]] = None,
 297        load: bool = True,
 298        console: t.Optional[Console] = None,
 299        users: t.Optional[t.List[User]] = None,
 300    ):
 301        self.console = console or get_console()
 302        self.configs = (
 303            config if isinstance(config, dict) else load_configs(config, self.CONFIG_TYPE, paths)
 304        )
 305        self.dag: DAG[str] = DAG()
 306        self._models: UniqueKeyDict[str, Model] = UniqueKeyDict("models")
 307        self._audits: UniqueKeyDict[str, Audit] = UniqueKeyDict("audits")
 308        self._standalone_audits: UniqueKeyDict[str, StandaloneAudit] = UniqueKeyDict(
 309            "standaloneaudits"
 310        )
 311        self._macros: UniqueKeyDict[str, ExecutableOrMacro] = UniqueKeyDict("macros")
 312        self._metrics: UniqueKeyDict[str, Metric] = UniqueKeyDict("metrics")
 313        self._jinja_macros = JinjaMacroRegistry()
 314        self._default_catalog: t.Optional[str] = None
 315
 316        self.path, self.config = t.cast(t.Tuple[Path, C], next(iter(self.configs.items())))
 317
 318        self.gateway = gateway
 319        self._scheduler = self.config.get_scheduler(self.gateway)
 320        self.environment_ttl = self.config.environment_ttl
 321        self.pinned_environments = Environment.normalize_names(self.config.pinned_environments)
 322        self.auto_categorize_changes = self.config.plan.auto_categorize_changes
 323
 324        self._connection_config = self.config.get_connection(self.gateway)
 325        self.concurrent_tasks = concurrent_tasks or self._connection_config.concurrent_tasks
 326        self._engine_adapter = engine_adapter or self._connection_config.create_engine_adapter()
 327
 328        self._test_connection_config = self.config.get_test_connection(
 329            self.gateway, self.default_catalog, default_catalog_dialect=self.engine_adapter.DIALECT
 330        )
 331
 332        self._snapshot_evaluator: t.Optional[SnapshotEvaluator] = None
 333
 334        self._provided_state_sync: t.Optional[StateSync] = state_sync
 335        self._state_sync: t.Optional[StateSync] = None
 336
 337        self._loader = (loader or self.config.loader)(**self.config.loader_kwargs)
 338
 339        # Should we dedupe notification_targets? If so how?
 340        self.notification_targets = (notification_targets or []) + self.config.notification_targets
 341        self.users = (users or []) + self.config.users
 342        self.users = list({user.username: user for user in self.users}.values())
 343        self._register_notification_targets()
 344
 345        if (
 346            self.config.environment_catalog_mapping
 347            and not self.engine_adapter.CATALOG_SUPPORT.is_multi_catalog_supported
 348        ):
 349            raise SQLMeshError(
 350                "Environment catalog mapping is only supported for engine adapters that support multiple catalogs"
 351            )
 352
 353        if load:
 354            self.load()
 355
 356    @property
 357    def default_dialect(self) -> t.Optional[str]:
 358        return self.config.dialect
 359
 360    @property
 361    def engine_adapter(self) -> EngineAdapter:
 362        """Returns an engine adapter."""
 363        return self._engine_adapter
 364
 365    @property
 366    def snapshot_evaluator(self) -> SnapshotEvaluator:
 367        if not self._snapshot_evaluator:
 368            self._snapshot_evaluator = SnapshotEvaluator(
 369                self.engine_adapter.with_log_level(logging.INFO),
 370                ddl_concurrent_tasks=self.concurrent_tasks,
 371            )
 372        return self._snapshot_evaluator
 373
 374    def execution_context(
 375        self, deployability_index: t.Optional[DeployabilityIndex] = None
 376    ) -> ExecutionContext:
 377        """Returns an execution context."""
 378        return ExecutionContext(
 379            engine_adapter=self._engine_adapter,
 380            snapshots=self.snapshots,
 381            deployability_index=deployability_index,
 382            default_dialect=self.default_dialect,
 383            default_catalog=self.default_catalog,
 384        )
 385
 386    def upsert_model(self, model: t.Union[str, Model], **kwargs: t.Any) -> Model:
 387        """Update or insert a model.
 388
 389        The context's models dictionary will be updated to include these changes.
 390
 391        Args:
 392            model: Model name or instance to update.
 393            kwargs: The kwargs to update the model with.
 394
 395        Returns:
 396            A new instance of the updated or inserted model.
 397        """
 398        model = self.get_model(model, raise_if_missing=True)
 399        path = model._path
 400
 401        # model.copy() can't be used here due to a cached state that can be a part of a model instance.
 402        model = t.cast(Model, type(model)(**{**t.cast(Model, model).dict(), **kwargs}))
 403        model._path = path
 404
 405        self._models.update({model.fqn: model})
 406        self.dag.add(model.fqn, model.depends_on)
 407        update_model_schemas(
 408            self.dag,
 409            self._models,
 410            self.path,
 411        )
 412
 413        model.validate_definition()
 414
 415        return model
 416
 417    def scheduler(self, environment: t.Optional[str] = None) -> Scheduler:
 418        """Returns the built-in scheduler.
 419
 420        Args:
 421            environment: The target environment to source model snapshots from, or None
 422                if snapshots should be sourced from the currently loaded local state.
 423
 424        Returns:
 425            The built-in scheduler instance.
 426        """
 427        snapshots: t.Iterable[Snapshot]
 428        if environment is not None:
 429            stored_environment = self.state_sync.get_environment(environment)
 430            if stored_environment is None:
 431                raise ConfigError(f"Environment '{environment}' was not found.")
 432            snapshots = self.state_sync.get_snapshots(stored_environment.snapshots).values()
 433        else:
 434            snapshots = self.snapshots.values()
 435
 436        if not snapshots:
 437            raise ConfigError("No models were found")
 438
 439        return Scheduler(
 440            snapshots,
 441            self.snapshot_evaluator,
 442            self.state_sync,
 443            default_catalog=self.default_catalog,
 444            max_workers=self.concurrent_tasks,
 445            console=self.console,
 446            notification_target_manager=self.notification_target_manager,
 447        )
 448
 449    @property
 450    def state_sync(self) -> StateSync:
 451        if not self._state_sync:
 452            self._state_sync = self._new_state_sync()
 453
 454            if self._state_sync.get_versions(validate=False).schema_version == 0:
 455                self._state_sync.migrate(default_catalog=self.default_catalog)
 456            self._state_sync.get_versions()
 457            self._state_sync = CachingStateSync(self._state_sync)  # type: ignore
 458        return self._state_sync
 459
 460    @property
 461    def state_reader(self) -> StateReader:
 462        return self.state_sync
 463
 464    def refresh(self) -> None:
 465        """Refresh all models that have been updated."""
 466        if self._loader.reload_needed():
 467            self.load()
 468
 469    def load(self, update_schemas: bool = True) -> GenericContext[C]:
 470        """Load all files in the context's path."""
 471        with sys_path(*self.configs):
 472            gc.disable()
 473            project = self._loader.load(self, update_schemas)
 474            self._macros = project.macros
 475            self._jinja_macros = project.jinja_macros
 476            self._models = project.models
 477            self._metrics = project.metrics
 478            self._standalone_audits.clear()
 479            self._audits.clear()
 480            for name, audit in project.audits.items():
 481                if isinstance(audit, StandaloneAudit):
 482                    self._standalone_audits[name] = audit
 483                else:
 484                    self._audits[name] = audit
 485            self.dag = project.dag
 486            gc.enable()
 487
 488            duplicates = set(self._models) & set(self._standalone_audits)
 489            if duplicates:
 490                raise ConfigError(
 491                    f"Models and Standalone audits cannot have the same name: {duplicates}"
 492                )
 493
 494        return self
 495
 496    def run(
 497        self,
 498        environment: t.Optional[str] = None,
 499        *,
 500        start: t.Optional[TimeLike] = None,
 501        end: t.Optional[TimeLike] = None,
 502        execution_time: t.Optional[TimeLike] = None,
 503        skip_janitor: bool = False,
 504        ignore_cron: bool = False,
 505    ) -> bool:
 506        """Run the entire dag through the scheduler.
 507
 508        Args:
 509            environment: The target environment to source model snapshots from and virtually update. Default: prod.
 510            start: The start of the interval to render.
 511            end: The end of the interval to render.
 512            execution_time: The date/time time reference to use for execution time. Defaults to now.
 513            skip_janitor: Whether to skip the janitor task.
 514            ignore_cron: Whether to ignore the model's cron schedule and run all available missing intervals.
 515
 516        Returns:
 517            True if the run was successful, False otherwise.
 518        """
 519        environment = environment or self.config.default_target_environment
 520
 521        self.notification_target_manager.notify(
 522            NotificationEvent.RUN_START, environment=environment
 523        )
 524        success = False
 525        try:
 526            success = self._run(
 527                environment=environment,
 528                start=start,
 529                end=end,
 530                execution_time=execution_time,
 531                skip_janitor=skip_janitor,
 532                ignore_cron=ignore_cron,
 533            )
 534        except Exception as e:
 535            self.notification_target_manager.notify(
 536                NotificationEvent.RUN_FAILURE, traceback.format_exc()
 537            )
 538            logger.error(f"Run Failure: {traceback.format_exc()}")
 539            raise e
 540
 541        if success:
 542            self.notification_target_manager.notify(
 543                NotificationEvent.RUN_END, environment=environment
 544            )
 545            self.console.log_success(f"Run finished for environment '{environment}'")
 546        else:
 547            self.notification_target_manager.notify(
 548                NotificationEvent.RUN_FAILURE, "See console logs for details."
 549            )
 550
 551        return success
 552
 553    @t.overload
 554    def get_model(
 555        self, model_or_snapshot: ModelOrSnapshot, raise_if_missing: Literal[True] = True
 556    ) -> Model: ...
 557
 558    @t.overload
 559    def get_model(
 560        self,
 561        model_or_snapshot: ModelOrSnapshot,
 562        raise_if_missing: Literal[False] = False,
 563    ) -> t.Optional[Model]: ...
 564
 565    def get_model(
 566        self, model_or_snapshot: ModelOrSnapshot, raise_if_missing: bool = False
 567    ) -> t.Optional[Model]:
 568        """Returns a model with the given name or None if a model with such name doesn't exist.
 569
 570        Args:
 571            model_or_snapshot: A model name, model, or snapshot.
 572            raise_if_missing: Raises an error if a model is not found.
 573
 574        Returns:
 575            The expected model.
 576        """
 577        if isinstance(model_or_snapshot, str):
 578            normalized_name = normalize_model_name(
 579                model_or_snapshot,
 580                dialect=self.default_dialect,
 581                default_catalog=self.default_catalog,
 582            )
 583            model = self._models.get(normalized_name)
 584        elif isinstance(model_or_snapshot, Snapshot):
 585            model = model_or_snapshot.model
 586        else:
 587            model = model_or_snapshot
 588
 589        if raise_if_missing and not model:
 590            raise SQLMeshError(f"Cannot find model for '{model_or_snapshot}'")
 591
 592        return model
 593
 594    @t.overload
 595    def get_snapshot(self, node_or_snapshot: NodeOrSnapshot) -> t.Optional[Snapshot]: ...
 596
 597    @t.overload
 598    def get_snapshot(
 599        self, node_or_snapshot: NodeOrSnapshot, raise_if_missing: Literal[True]
 600    ) -> Snapshot: ...
 601
 602    @t.overload
 603    def get_snapshot(
 604        self, node_or_snapshot: NodeOrSnapshot, raise_if_missing: Literal[False]
 605    ) -> t.Optional[Snapshot]: ...
 606
 607    def get_snapshot(
 608        self, node_or_snapshot: NodeOrSnapshot, raise_if_missing: bool = False
 609    ) -> t.Optional[Snapshot]:
 610        """Returns a snapshot with the given name or None if a snapshot with such name doesn't exist.
 611
 612        Args:
 613            node_or_snapshot: A node name, node, or snapshot.
 614            raise_if_missing: Raises an error if a snapshot is not found.
 615
 616        Returns:
 617            The expected snapshot.
 618        """
 619        if isinstance(node_or_snapshot, Snapshot):
 620            return node_or_snapshot
 621        if isinstance(node_or_snapshot, str) and not self.standalone_audits.get(node_or_snapshot):
 622            node_or_snapshot = normalize_model_name(
 623                node_or_snapshot,
 624                dialect=self.default_dialect,
 625                default_catalog=self.default_catalog,
 626            )
 627        fqn = node_or_snapshot if isinstance(node_or_snapshot, str) else node_or_snapshot.fqn
 628        snapshot = self.snapshots.get(fqn)
 629
 630        if raise_if_missing and not snapshot:
 631            raise SQLMeshError(f"Cannot find snapshot for '{fqn}'")
 632
 633        return snapshot
 634
 635    def config_for_path(self, path: Path) -> Config:
 636        for config_path, config in self.configs.items():
 637            try:
 638                path.relative_to(config_path)
 639                return config
 640            except ValueError:
 641                pass
 642        return self.config
 643
 644    def config_for_node(self, node: str | Model | StandaloneAudit) -> Config:
 645        if isinstance(node, str):
 646            return self.config_for_path(self.get_snapshot(node, raise_if_missing=True).node._path)  # type: ignore
 647        return self.config_for_path(node._path)  # type: ignore
 648
 649    @property
 650    def models(self) -> MappingProxyType[str, Model]:
 651        """Returns all registered models in this context."""
 652        return MappingProxyType(self._models)
 653
 654    @property
 655    def metrics(self) -> MappingProxyType[str, Metric]:
 656        """Returns all registered metrics in this context."""
 657        return MappingProxyType(self._metrics)
 658
 659    @property
 660    def standalone_audits(self) -> MappingProxyType[str, StandaloneAudit]:
 661        """Returns all registered standalone audits in this context."""
 662        return MappingProxyType(self._standalone_audits)
 663
 664    @property
 665    def snapshots(self) -> t.Dict[str, Snapshot]:
 666        """Generates and returns snapshots based on models registered in this context.
 667
 668        If one of the snapshots has been previously stored in the persisted state, the stored
 669        instance will be returned.
 670        """
 671        return self._snapshots()
 672
 673    @property
 674    def default_catalog(self) -> t.Optional[str]:
 675        if self._default_catalog is None:
 676            self._default_catalog = self._scheduler.get_default_catalog(self)
 677        return self._default_catalog
 678
 679    def render(
 680        self,
 681        model_or_snapshot: ModelOrSnapshot,
 682        *,
 683        start: t.Optional[TimeLike] = None,
 684        end: t.Optional[TimeLike] = None,
 685        execution_time: t.Optional[TimeLike] = None,
 686        expand: t.Union[bool, t.Iterable[str]] = False,
 687        **kwargs: t.Any,
 688    ) -> exp.Expression:
 689        """Renders a model's query, expanding macros with provided kwargs, and optionally expanding referenced models.
 690
 691        Args:
 692            model_or_snapshot: The model, model name, or snapshot to render.
 693            start: The start of the interval to render.
 694            end: The end of the interval to render.
 695            execution_time: The date/time time reference to use for execution time. Defaults to now.
 696            expand: Whether or not to use expand materialized models, defaults to False.
 697                If True, all referenced models are expanded as raw queries.
 698                If a list, only referenced models are expanded as raw queries.
 699
 700        Returns:
 701            The rendered expression.
 702        """
 703        execution_time = execution_time or now_ds()
 704
 705        model = self.get_model(model_or_snapshot, raise_if_missing=True)
 706
 707        if expand and not isinstance(expand, bool):
 708            expand = {
 709                normalize_model_name(
 710                    x, default_catalog=self.default_catalog, dialect=self.default_dialect
 711                )
 712                for x in expand
 713            }
 714
 715        expand = self.dag.upstream(model.fqn) if expand is True else expand or []
 716
 717        if model.is_seed:
 718            df = next(
 719                model.render(
 720                    context=self.execution_context(),
 721                    start=start,
 722                    end=end,
 723                    execution_time=execution_time,
 724                    **kwargs,
 725                )
 726            )
 727            return next(pandas_to_sql(t.cast(pd.DataFrame, df), model.columns_to_types))
 728
 729        return model.render_query_or_raise(
 730            start=start,
 731            end=end,
 732            execution_time=execution_time,
 733            snapshots=self.snapshots,
 734            expand=expand,
 735            engine_adapter=self.engine_adapter,
 736            **kwargs,
 737        )
 738
 739    def evaluate(
 740        self,
 741        model_or_snapshot: ModelOrSnapshot,
 742        start: TimeLike,
 743        end: TimeLike,
 744        execution_time: TimeLike,
 745        limit: t.Optional[int] = None,
 746        **kwargs: t.Any,
 747    ) -> DF:
 748        """Evaluate a model or snapshot (running its query against a DB/Engine).
 749
 750        This method is used to test or iterate on models without side effects.
 751
 752        Args:
 753            model_or_snapshot: The model, model name, or snapshot to render.
 754            start: The start of the interval to evaluate.
 755            end: The end of the interval to evaluate.
 756            execution_time: The date/time time reference to use for execution time.
 757            limit: A limit applied to the model.
 758        """
 759        snapshot = self.get_snapshot(model_or_snapshot, raise_if_missing=True)
 760
 761        df = self.snapshot_evaluator.evaluate_and_fetch(
 762            snapshot,
 763            start=start,
 764            end=end,
 765            execution_time=execution_time,
 766            snapshots=self.snapshots,
 767            limit=limit or c.DEFAULT_MAX_LIMIT,
 768        )
 769
 770        if df is None:
 771            raise RuntimeError(f"Error evaluating {snapshot.name}")
 772
 773        return df
 774
 775    def format(
 776        self,
 777        transpile: t.Optional[str] = None,
 778        append_newline: t.Optional[bool] = None,
 779        **kwargs: t.Any,
 780    ) -> None:
 781        """Format all SQL models."""
 782        for model in self._models.values():
 783            if not model._path.suffix == ".sql":
 784                continue
 785            with open(model._path, "r+", encoding="utf-8") as file:
 786                expressions = parse(
 787                    file.read(), default_dialect=self.config_for_node(model).dialect
 788                )
 789                if transpile:
 790                    for prop in expressions[0].expressions:
 791                        if prop.name.lower() == "dialect":
 792                            prop.replace(
 793                                exp.Property(
 794                                    this="dialect",
 795                                    value=exp.Literal.string(transpile or model.dialect),
 796                                )
 797                            )
 798                format = self.config_for_node(model).format
 799                opts = {**format.generator_options, **kwargs}
 800                file.seek(0)
 801                file.write(
 802                    format_model_expressions(expressions, transpile or model.dialect, **opts)
 803                )
 804                if append_newline is None:
 805                    append_newline = format.append_newline
 806                if append_newline:
 807                    file.write("\n")
 808                file.truncate()
 809
 810    def plan(
 811        self,
 812        environment: t.Optional[str] = None,
 813        *,
 814        start: t.Optional[TimeLike] = None,
 815        end: t.Optional[TimeLike] = None,
 816        execution_time: t.Optional[TimeLike] = None,
 817        create_from: t.Optional[str] = None,
 818        skip_tests: bool = False,
 819        restate_models: t.Optional[t.Iterable[str]] = None,
 820        no_gaps: bool = False,
 821        skip_backfill: bool = False,
 822        forward_only: t.Optional[bool] = None,
 823        no_prompts: t.Optional[bool] = None,
 824        auto_apply: t.Optional[bool] = None,
 825        no_auto_categorization: t.Optional[bool] = None,
 826        effective_from: t.Optional[TimeLike] = None,
 827        include_unmodified: t.Optional[bool] = None,
 828        select_models: t.Optional[t.Collection[str]] = None,
 829        backfill_models: t.Optional[t.Collection[str]] = None,
 830        categorizer_config: t.Optional[CategorizerConfig] = None,
 831        enable_preview: t.Optional[bool] = None,
 832        no_diff: t.Optional[bool] = None,
 833        run: bool = False,
 834    ) -> Plan:
 835        """Interactively creates a plan.
 836
 837        This method compares the current context with the target environment. It then presents
 838        the differences and asks whether to backfill each modified model.
 839
 840        Args:
 841            environment: The environment to diff and plan against.
 842            start: The start date of the backfill if there is one.
 843            end: The end date of the backfill if there is one.
 844            execution_time: The date/time reference to use for execution time. Defaults to now.
 845            create_from: The environment to create the target environment from if it
 846                doesn't exist. If not specified, the "prod" environment will be used.
 847            skip_tests: Unit tests are run by default so this will skip them if enabled
 848            restate_models: A list of either internal or external models, or tags, that need to be restated
 849                for the given plan interval. If the target environment is a production environment,
 850                ALL snapshots that depended on these upstream tables will have their intervals deleted
 851                (even ones not in this current environment). Only the snapshots in this environment will
 852                be backfilled whereas others need to be recovered on a future plan application. For development
 853                environments only snapshots that are part of this plan will be affected.
 854            no_gaps:  Whether to ensure that new snapshots for models that are already a
 855                part of the target environment have no data gaps when compared against previous
 856                snapshots for same models.
 857            skip_backfill: Whether to skip the backfill step. Default: False.
 858            forward_only: Whether the purpose of the plan is to make forward only changes.
 859            no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that
 860                if this flag is set to true and there are uncategorized changes the plan creation will
 861                fail. Default: False.
 862            auto_apply: Whether to automatically apply the new plan after creation. Default: False.
 863            no_auto_categorization: Indicates whether to disable automatic categorization of model
 864                changes (breaking / non-breaking). If not provided, then the corresponding configuration
 865                option determines the behavior.
 866            categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the
 867                project config by default.
 868            effective_from: The effective date from which to apply forward-only changes on production.
 869            include_unmodified: Indicates whether to include unmodified models in the target development environment.
 870            select_models: A list of model selection strings to filter the models that should be included into this plan.
 871            backfill_models: A list of model selection strings to filter the models for which the data should be backfilled.
 872            enable_preview: Indicates whether to enable preview for forward-only models in development environments.
 873            no_diff: Hide text differences for changed models.
 874            run: Whether to run latest intervals as part of the plan application.
 875
 876        Returns:
 877            The populated Plan object.
 878        """
 879        plan_builder = self.plan_builder(
 880            environment,
 881            start=start,
 882            end=end,
 883            execution_time=execution_time,
 884            create_from=create_from,
 885            skip_tests=skip_tests,
 886            restate_models=restate_models,
 887            no_gaps=no_gaps,
 888            skip_backfill=skip_backfill,
 889            forward_only=forward_only,
 890            no_auto_categorization=no_auto_categorization,
 891            effective_from=effective_from,
 892            include_unmodified=include_unmodified,
 893            select_models=select_models,
 894            backfill_models=backfill_models,
 895            categorizer_config=categorizer_config,
 896            enable_preview=enable_preview,
 897            run=run,
 898        )
 899
 900        self.console.plan(
 901            plan_builder,
 902            auto_apply if auto_apply is not None else self.config.plan.auto_apply,
 903            self.default_catalog,
 904            no_diff=no_diff if no_diff is not None else self.config.plan.no_diff,
 905            no_prompts=no_prompts if no_prompts is not None else self.config.plan.no_prompts,
 906        )
 907
 908        return plan_builder.build()
 909
 910    def plan_builder(
 911        self,
 912        environment: t.Optional[str] = None,
 913        *,
 914        start: t.Optional[TimeLike] = None,
 915        end: t.Optional[TimeLike] = None,
 916        execution_time: t.Optional[TimeLike] = None,
 917        create_from: t.Optional[str] = None,
 918        skip_tests: bool = False,
 919        restate_models: t.Optional[t.Iterable[str]] = None,
 920        no_gaps: bool = False,
 921        skip_backfill: bool = False,
 922        forward_only: t.Optional[bool] = None,
 923        no_auto_categorization: t.Optional[bool] = None,
 924        effective_from: t.Optional[TimeLike] = None,
 925        include_unmodified: t.Optional[bool] = None,
 926        select_models: t.Optional[t.Collection[str]] = None,
 927        backfill_models: t.Optional[t.Collection[str]] = None,
 928        categorizer_config: t.Optional[CategorizerConfig] = None,
 929        enable_preview: t.Optional[bool] = None,
 930        run: bool = False,
 931    ) -> PlanBuilder:
 932        """Creates a plan builder.
 933
 934        Args:
 935            environment: The environment to diff and plan against.
 936            start: The start date of the backfill if there is one.
 937            end: The end date of the backfill if there is one.
 938            execution_time: The date/time reference to use for execution time. Defaults to now.
 939            create_from: The environment to create the target environment from if it
 940                doesn't exist. If not specified, the "prod" environment will be used.
 941            skip_tests: Unit tests are run by default so this will skip them if enabled
 942            restate_models: A list of either internal or external models, or tags, that need to be restated
 943                for the given plan interval. If the target environment is a production environment,
 944                ALL snapshots that depended on these upstream tables will have their intervals deleted
 945                (even ones not in this current environment). Only the snapshots in this environment will
 946                be backfilled whereas others need to be recovered on a future plan application. For development
 947                environments only snapshots that are part of this plan will be affected.
 948            no_gaps:  Whether to ensure that new snapshots for models that are already a
 949                part of the target environment have no data gaps when compared against previous
 950                snapshots for same models.
 951            skip_backfill: Whether to skip the backfill step. Default: False.
 952            forward_only: Whether the purpose of the plan is to make forward only changes.
 953            no_auto_categorization: Indicates whether to disable automatic categorization of model
 954                changes (breaking / non-breaking). If not provided, then the corresponding configuration
 955                option determines the behavior.
 956            categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the
 957                project config by default.
 958            effective_from: The effective date from which to apply forward-only changes on production.
 959            include_unmodified: Indicates whether to include unmodified models in the target development environment.
 960            select_models: A list of model selection strings to filter the models that should be included into this plan.
 961            backfill_models: A list of model selection strings to filter the models for which the data should be backfilled.
 962            enable_preview: Indicates whether to enable preview for forward-only models in development environments.
 963            run: Whether to run latest intervals as part of the plan application.
 964
 965        Returns:
 966            The plan builder.
 967        """
 968        environment = environment or self.config.default_target_environment
 969        environment = Environment.normalize_name(environment)
 970        is_dev = environment != c.PROD
 971
 972        if skip_backfill and not no_gaps and not is_dev:
 973            raise ConfigError(
 974                "When targeting the production environment either the backfill should not be skipped or the lack of data gaps should be enforced (--no-gaps flag)."
 975            )
 976
 977        if run and is_dev:
 978            raise ConfigError("The '--run' flag is only supported for the production environment.")
 979
 980        self._run_plan_tests(skip_tests=skip_tests)
 981
 982        environment_ttl = (
 983            self.environment_ttl if environment not in self.pinned_environments else None
 984        )
 985
 986        model_selector = self._new_selector()
 987
 988        if backfill_models:
 989            backfill_models = model_selector.expand_model_selections(backfill_models)
 990        else:
 991            backfill_models = None
 992
 993        models_override: t.Optional[UniqueKeyDict[str, Model]] = None
 994        if select_models:
 995            models_override = model_selector.select_models(
 996                select_models,
 997                environment,
 998                fallback_env_name=create_from or c.PROD,
 999                ensure_finalized_snapshots=self.config.plan.use_finalized_state,
1000            )
1001            if not backfill_models:
1002                # Only backfill selected models unless explicitly specified.
1003                backfill_models = model_selector.expand_model_selections(select_models)
1004
1005        expanded_restate_models = None
1006        if restate_models is not None:
1007            expanded_restate_models = model_selector.expand_model_selections(restate_models)
1008            if not expanded_restate_models:
1009                self.console.log_error(
1010                    f"Provided restated models do not match any models. No models will be included in plan. Provided: {', '.join(restate_models)}"
1011                )
1012
1013        snapshots = self._snapshots(models_override)
1014        context_diff = self._context_diff(
1015            environment or c.PROD,
1016            snapshots=snapshots,
1017            create_from=create_from,
1018            force_no_diff=(restate_models is not None and not expanded_restate_models)
1019            or (backfill_models is not None and not backfill_models),
1020            ensure_finalized_snapshots=self.config.plan.use_finalized_state,
1021        )
1022
1023        # If no end date is specified, use the max interval end from prod
1024        # to prevent unintended evaluation of the entire DAG.
1025        if not run:
1026            if backfill_models is not None:
1027                # Only consider selected models for the default end value.
1028                models_for_default_end = backfill_models.copy()
1029                for name in backfill_models:
1030                    if name not in snapshots:
1031                        continue
1032                    snapshot = snapshots[name]
1033                    snapshot_id = snapshot.snapshot_id
1034                    if (
1035                        snapshot_id in context_diff.added
1036                        and snapshot_id in context_diff.new_snapshots
1037                    ):
1038                        # If the selected model is a newly added model, then we should narrow down the intervals
1039                        # that should be considered for the default plan end value by including its parents.
1040                        models_for_default_end |= {s.name for s in snapshot.parents}
1041                default_end = self.state_sync.greatest_common_interval_end(
1042                    c.PROD,
1043                    models_for_default_end,
1044                    ensure_finalized_snapshots=self.config.plan.use_finalized_state,
1045                )
1046            else:
1047                default_end = self.state_sync.max_interval_end_for_environment(
1048                    c.PROD, ensure_finalized_snapshots=self.config.plan.use_finalized_state
1049                )
1050        else:
1051            default_end = None
1052
1053        default_start = to_date(default_end) - timedelta(days=1) if default_end and is_dev else None
1054
1055        return PlanBuilder(
1056            context_diff=context_diff,
1057            start=start,
1058            end=end,
1059            execution_time=execution_time,
1060            apply=self.apply,
1061            restate_models=expanded_restate_models,
1062            backfill_models=backfill_models,
1063            no_gaps=no_gaps,
1064            skip_backfill=skip_backfill,
1065            is_dev=is_dev,
1066            forward_only=(
1067                forward_only if forward_only is not None else self.config.plan.forward_only
1068            ),
1069            environment_ttl=environment_ttl,
1070            environment_suffix_target=self.config.environment_suffix_target,
1071            environment_catalog_mapping=self.config.environment_catalog_mapping,
1072            categorizer_config=categorizer_config or self.auto_categorize_changes,
1073            auto_categorization_enabled=not no_auto_categorization,
1074            effective_from=effective_from,
1075            include_unmodified=(
1076                include_unmodified
1077                if include_unmodified is not None
1078                else self.config.plan.include_unmodified
1079            ),
1080            default_start=default_start,
1081            default_end=default_end,
1082            enable_preview=(
1083                enable_preview if enable_preview is not None else self.config.plan.enable_preview
1084            ),
1085            end_bounded=not run,
1086            ensure_finalized_snapshots=self.config.plan.use_finalized_state,
1087        )
1088
1089    def apply(
1090        self,
1091        plan: Plan,
1092        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
1093    ) -> None:
1094        """Applies a plan by pushing snapshots and backfilling data.
1095
1096        Given a plan, it pushes snapshots into the state sync and then uses the scheduler
1097        to backfill all models.
1098
1099        Args:
1100            plan: The plan to apply.
1101            circuit_breaker: An optional handler which checks if the apply should be aborted.
1102        """
1103        if (
1104            not plan.context_diff.has_changes
1105            and not plan.requires_backfill
1106            and not plan.has_unmodified_unpromoted
1107        ):
1108            return
1109        if plan.uncategorized:
1110            raise UncategorizedPlanError("Can't apply a plan with uncategorized changes.")
1111        self.notification_target_manager.notify(
1112            NotificationEvent.APPLY_START,
1113            environment=plan.environment_naming_info.name,
1114            plan_id=plan.plan_id,
1115        )
1116        try:
1117            self._apply(plan, circuit_breaker)
1118        except Exception as e:
1119            self.notification_target_manager.notify(
1120                NotificationEvent.APPLY_FAILURE,
1121                environment=plan.environment_naming_info.name,
1122                plan_id=plan.plan_id,
1123                exc=traceback.format_exc(),
1124            )
1125            logger.error(f"Apply Failure: {traceback.format_exc()}")
1126            raise e
1127        self.notification_target_manager.notify(
1128            NotificationEvent.APPLY_END,
1129            environment=plan.environment_naming_info.name,
1130            plan_id=plan.plan_id,
1131        )
1132
1133    def invalidate_environment(self, name: str, sync: bool = False) -> None:
1134        """Invalidates the target environment by setting its expiration timestamp to now.
1135
1136        Args:
1137            name: The name of the environment to invalidate.
1138            sync: If True, the call blocks until the environment is deleted. Otherwise, the environment will
1139                be deleted asynchronously by the janitor process.
1140        """
1141        self.state_sync.invalidate_environment(name)
1142        if sync:
1143            self._cleanup_environments()
1144            self.console.log_success(f"Environment '{name}' has been deleted.")
1145        else:
1146            self.console.log_success(f"Environment '{name}' has been invalidated.")
1147
1148    def diff(self, environment: t.Optional[str] = None, detailed: bool = False) -> bool:
1149        """Show a diff of the current context with a given environment.
1150
1151        Args:
1152            environment: The environment to diff against.
1153            detailed: Show the actual SQL differences if True.
1154
1155        Returns:
1156            True if there are changes, False otherwise.
1157        """
1158        environment = environment or self.config.default_target_environment
1159        environment = Environment.normalize_name(environment)
1160        context_diff = self._context_diff(environment)
1161        self.console.show_model_difference_summary(
1162            context_diff,
1163            EnvironmentNamingInfo.from_environment_catalog_mapping(
1164                self.config.environment_catalog_mapping,
1165                name=environment,
1166                suffix_target=self.config.environment_suffix_target,
1167            ),
1168            self.default_catalog,
1169            no_diff=not detailed,
1170        )
1171        return context_diff.has_changes
1172
1173    def table_diff(
1174        self,
1175        source: str,
1176        target: str,
1177        on: t.List[str] | exp.Condition | None = None,
1178        model_or_snapshot: t.Optional[ModelOrSnapshot] = None,
1179        where: t.Optional[str | exp.Condition] = None,
1180        limit: int = 20,
1181        show: bool = True,
1182        show_sample: bool = True,
1183    ) -> TableDiff:
1184        """Show a diff between two tables.
1185
1186        Args:
1187            source: The source environment or table.
1188            target: The target environment or table.
1189            on: The join condition, table aliases must be "s" and "t" for source and target.
1190                If omitted, the table's grain will be used.
1191            model_or_snapshot: The model or snapshot to use when environments are passed in.
1192            where: An optional where statement to filter results.
1193            limit: The limit of the sample dataframe.
1194            show: Show the table diff output in the console.
1195            show_sample: Show the sample dataframe in the console. Requires show=True.
1196
1197        Returns:
1198            The TableDiff object containing schema and summary differences.
1199        """
1200        source_alias, target_alias = source, target
1201        if model_or_snapshot:
1202            model = self.get_model(model_or_snapshot, raise_if_missing=True)
1203            source_env = self.state_reader.get_environment(source)
1204            target_env = self.state_reader.get_environment(target)
1205
1206            if not source_env:
1207                raise SQLMeshError(f"Could not find environment '{source}'")
1208            if not target_env:
1209                raise SQLMeshError(f"Could not find environment '{target}')")
1210
1211            source = next(
1212                snapshot for snapshot in source_env.snapshots if snapshot.name == model.fqn
1213            ).table_name()
1214            target = next(
1215                snapshot for snapshot in target_env.snapshots if snapshot.name == model.fqn
1216            ).table_name()
1217            source_alias = source_env.name
1218            target_alias = target_env.name
1219
1220            if not on:
1221                for ref in model.all_references:
1222                    if ref.unique:
1223                        on = ref.columns
1224
1225        if not on:
1226            raise SQLMeshError(
1227                "SQLMesh doesn't know how to join the two tables. Specify the `grains` in each model definition or pass join column names in separate `-o` flags."
1228            )
1229
1230        table_diff = TableDiff(
1231            adapter=self._engine_adapter,
1232            source=source,
1233            target=target,
1234            on=on,
1235            where=where,
1236            source_alias=source_alias,
1237            target_alias=target_alias,
1238            model_name=model.name if model_or_snapshot else None,
1239            limit=limit,
1240        )
1241        if show:
1242            self.console.show_schema_diff(table_diff.schema_diff())
1243            self.console.show_row_diff(table_diff.row_diff(), show_sample=show_sample)
1244        return table_diff
1245
1246    def get_dag(
1247        self, select_models: t.Optional[t.Collection[str]] = None, **options: t.Any
1248    ) -> GraphHTML:
1249        """Gets an HTML object representation of the DAG.
1250
1251        Args:
1252            select_models: A list of model selection strings that should be included in the dag.
1253        Returns:
1254            An html object that renders the dag.
1255        """
1256        dag = (
1257            self.dag.prune(*self._new_selector().expand_model_selections(select_models))
1258            if select_models
1259            else self.dag
1260        )
1261
1262        nodes = {}
1263        edges: t.List[t.Dict] = []
1264
1265        for node, deps in dag.graph.items():
1266            nodes[node] = {
1267                "id": node,
1268                "label": node.split(".")[-1],
1269                "title": f"<span>{node}</span>",
1270            }
1271            edges.extend({"from": d, "to": node} for d in deps)
1272
1273        return GraphHTML(
1274            nodes,
1275            edges,
1276            options={
1277                "height": "100%",
1278                "width": "100%",
1279                "interaction": {},
1280                "layout": {
1281                    "hierarchical": {
1282                        "enabled": True,
1283                        "nodeSpacing": 200,
1284                        "sortMethod": "directed",
1285                    },
1286                },
1287                "nodes": {
1288                    "shape": "box",
1289                },
1290                **options,
1291            },
1292        )
1293
1294    def render_dag(self, path: str, select_models: t.Optional[t.Collection[str]] = None) -> None:
1295        """Render the dag as HTML and save it to a file.
1296
1297        Args:
1298            path: filename to save the dag html to
1299            select_models: A list of model selection strings that should be included in the dag.
1300        """
1301        file_path = Path(path)
1302        suffix = file_path.suffix
1303        if suffix != ".html":
1304            if suffix:
1305                logger.warning(
1306                    f"The extension {suffix} does not designate an html file. A file with a `.html` extension will be created instead."
1307                )
1308            path = str(file_path.with_suffix(".html"))
1309
1310        with open(path, "w", encoding="utf-8") as file:
1311            file.write(str(self.get_dag(select_models)))
1312
1313    def create_test(
1314        self,
1315        model: str,
1316        input_queries: t.Dict[str, str],
1317        overwrite: bool = False,
1318        variables: t.Optional[t.Dict[str, str]] = None,
1319        path: t.Optional[str] = None,
1320        name: t.Optional[str] = None,
1321        include_ctes: bool = False,
1322    ) -> None:
1323        """Generate a unit test fixture for a given model.
1324
1325        Args:
1326            model: The model to test.
1327            input_queries: Mapping of model names to queries. Each model included in this mapping
1328                will be populated in the test based on the results of the corresponding query.
1329            overwrite: Whether to overwrite the existing test in case of a file path collision.
1330                When set to False, an error will be raised if there is such a collision.
1331            variables: Key-value pairs that will define variables needed by the model.
1332            path: The file path corresponding to the fixture, relative to the test directory.
1333                By default, the fixture will be created under the test directory and the file name
1334                will be inferred from the test's name.
1335            name: The name of the test. This is inferred from the model name by default.
1336            include_ctes: When true, CTE fixtures will also be generated.
1337        """
1338        input_queries = {
1339            # The get_model here has two purposes: return normalized names & check for missing deps
1340            self.get_model(dep, raise_if_missing=True).fqn: query
1341            for dep, query in input_queries.items()
1342        }
1343
1344        try:
1345            test_adapter = self._test_connection_config.create_engine_adapter(
1346                register_comments_override=False
1347            )
1348            generate_test(
1349                model=self.get_model(model, raise_if_missing=True),
1350                input_queries=input_queries,
1351                models=self._models,
1352                engine_adapter=self._engine_adapter,
1353                test_engine_adapter=test_adapter,
1354                project_path=self.path,
1355                overwrite=overwrite,
1356                variables=variables,
1357                path=path,
1358                name=name,
1359                include_ctes=include_ctes,
1360            )
1361        finally:
1362            test_adapter.close()
1363
1364    def test(
1365        self,
1366        match_patterns: t.Optional[t.List[str]] = None,
1367        tests: t.Optional[t.List[str]] = None,
1368        verbose: bool = False,
1369        preserve_fixtures: bool = False,
1370        stream: t.Optional[t.TextIO] = None,
1371    ) -> ModelTextTestResult:
1372        """Discover and run model tests"""
1373        if verbose:
1374            pd.set_option("display.max_columns", None)
1375            verbosity = 2
1376        else:
1377            verbosity = 1
1378
1379        if tests:
1380            result = run_model_tests(
1381                tests=tests,
1382                models=self._models,
1383                config=self.config,
1384                gateway=self.gateway,
1385                dialect=self.default_dialect,
1386                verbosity=verbosity,
1387                patterns=match_patterns,
1388                preserve_fixtures=preserve_fixtures,
1389                stream=stream,
1390                default_catalog=self.default_catalog,
1391                default_catalog_dialect=self.engine_adapter.DIALECT,
1392            )
1393        else:
1394            test_meta = []
1395
1396            for path, config in self.configs.items():
1397                test_meta.extend(
1398                    get_all_model_tests(
1399                        path / c.TESTS,
1400                        patterns=match_patterns,
1401                        ignore_patterns=config.ignore_patterns,
1402                    )
1403                )
1404
1405            result = run_tests(
1406                model_test_metadata=test_meta,
1407                models=self._models,
1408                config=self.config,
1409                gateway=self.gateway,
1410                dialect=self.default_dialect,
1411                verbosity=verbosity,
1412                preserve_fixtures=preserve_fixtures,
1413                stream=stream,
1414                default_catalog=self.default_catalog,
1415                default_catalog_dialect=self.engine_adapter.DIALECT,
1416            )
1417
1418        return result
1419
1420    def audit(
1421        self,
1422        start: TimeLike,
1423        end: TimeLike,
1424        *,
1425        models: t.Optional[t.Iterator[str]] = None,
1426        execution_time: t.Optional[TimeLike] = None,
1427    ) -> None:
1428        """Audit models.
1429
1430        Args:
1431            start: The start of the interval to audit.
1432            end: The end of the interval to audit.
1433            models: The models to audit. All models will be audited if not specified.
1434            execution_time: The date/time time reference to use for execution time. Defaults to now.
1435        """
1436
1437        snapshots = (
1438            [self.get_snapshot(model, raise_if_missing=True) for model in models]
1439            if models
1440            else self.snapshots.values()
1441        )
1442
1443        num_audits = sum(len(snapshot.audits_with_args) for snapshot in snapshots)
1444        self.console.log_status_update(f"Found {num_audits} audit(s).")
1445        errors = []
1446        skipped_count = 0
1447        for snapshot in snapshots:
1448            for audit_result in self.snapshot_evaluator.audit(
1449                snapshot=snapshot,
1450                start=start,
1451                end=end,
1452                snapshots=self.snapshots,
1453                raise_exception=False,
1454            ):
1455                audit_id = f"{audit_result.audit.name}"
1456                if audit_result.model:
1457                    audit_id += f" on model {audit_result.model.name}"
1458
1459                if audit_result.skipped:
1460                    self.console.log_status_update(f"{audit_id} ⏸️ SKIPPED.")
1461                    skipped_count += 1
1462                elif audit_result.count:
1463                    errors.append(audit_result)
1464                    self.console.log_status_update(
1465                        f"{audit_id} ❌ [red]FAIL [{audit_result.count}][/red]."
1466                    )
1467                else:
1468                    self.console.log_status_update(f"{audit_id} ✅ [green]PASS[/green].")
1469
1470        self.console.log_status_update(
1471            f"\nFinished with {len(errors)} audit error{'' if len(errors) == 1 else 's'} "
1472            f"and {skipped_count} audit{'' if skipped_count == 1 else 's'} skipped."
1473        )
1474        for error in errors:
1475            self.console.log_status_update(
1476                f"\nFailure in audit {error.audit.name} ({error.audit._path})."
1477            )
1478            self.console.log_status_update(f"Got {error.count} results, expected 0.")
1479            if error.query:
1480                self.console.show_sql(
1481                    f"{error.query.sql(dialect=self.snapshot_evaluator.adapter.dialect)}"
1482                )
1483
1484        self.console.log_status_update("Done.")
1485
1486    def rewrite(self, sql: str, dialect: str = "") -> exp.Expression:
1487        """Rewrite a sql expression with semantic references into an executable query.
1488
1489        https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/
1490
1491        Args:
1492            sql: The sql string to rewrite.
1493            dialect: The dialect of the sql string, defaults to the project dialect.
1494
1495        Returns:
1496            A SQLGlot expression with semantic references expanded.
1497        """
1498        return rewrite(
1499            sql,
1500            graph=ReferenceGraph(self.models.values()),
1501            metrics=self._metrics,
1502            dialect=dialect or self.default_dialect,
1503        )
1504
1505    def migrate(self) -> None:
1506        """Migrates SQLMesh to the current running version.
1507
1508        Please contact your SQLMesh administrator before doing this.
1509        """
1510        self.notification_target_manager.notify(NotificationEvent.MIGRATION_START)
1511        try:
1512            self._new_state_sync().migrate(
1513                default_catalog=self.default_catalog,
1514                promoted_snapshots_only=self.config.migration.promoted_snapshots_only,
1515            )
1516        except Exception as e:
1517            self.notification_target_manager.notify(
1518                NotificationEvent.MIGRATION_FAILURE, traceback.format_exc()
1519            )
1520            raise e
1521        self.notification_target_manager.notify(NotificationEvent.MIGRATION_END)
1522
1523    def rollback(self) -> None:
1524        """Rolls back SQLMesh to the previous migration.
1525
1526        Please contact your SQLMesh administrator before doing this. This action cannot be undone.
1527        """
1528        self._new_state_sync().rollback()
1529
1530    def create_external_models(self) -> None:
1531        """Create a schema file with all external models.
1532
1533        The schema file contains all columns and types of external models, allowing for more robust
1534        lineage, validation, and optimizations.
1535        """
1536        if not self._models:
1537            self.load(update_schemas=False)
1538
1539        for path, config in self.configs.items():
1540            create_schema_file(
1541                path=path / c.SCHEMA_YAML,
1542                models=UniqueKeyDict(
1543                    "models",
1544                    {
1545                        fqn: model
1546                        for fqn, model in self._models.items()
1547                        if self.config_for_node(model) is config
1548                    },
1549                ),
1550                adapter=self._engine_adapter,
1551                state_reader=self.state_reader,
1552                dialect=config.model_defaults.dialect,
1553                max_workers=self.concurrent_tasks,
1554            )
1555
1556    def print_info(self) -> None:
1557        """Prints information about connections, models, macros, etc. to the console."""
1558        self.console.log_status_update(f"Models: {len(self.models)}")
1559        self.console.log_status_update(f"Macros: {len(self._macros) - len(macro.get_registry())}")
1560
1561        self._try_connection("data warehouse", self._engine_adapter)
1562
1563        state_connection = self.config.get_state_connection(self.gateway)
1564        if state_connection:
1565            self._try_connection("state backend", state_connection.create_engine_adapter())
1566
1567    def close(self) -> None:
1568        """Releases all resources allocated by this context."""
1569        self.snapshot_evaluator.close()
1570        self.state_sync.close()
1571
1572    def _run(
1573        self,
1574        environment: str,
1575        *,
1576        start: t.Optional[TimeLike],
1577        end: t.Optional[TimeLike],
1578        execution_time: t.Optional[TimeLike],
1579        skip_janitor: bool,
1580        ignore_cron: bool,
1581    ) -> bool:
1582        if not skip_janitor and environment.lower() == c.PROD:
1583            self._run_janitor()
1584
1585        env_check_attempts_num = max(
1586            1,
1587            self.config.run.environment_check_max_wait
1588            // self.config.run.environment_check_interval,
1589        )
1590
1591        def _block_until_finalized() -> str:
1592            for _ in range(env_check_attempts_num):
1593                assert environment is not None  # mypy
1594                environment_state = self.state_sync.get_environment(environment)
1595                if not environment_state:
1596                    raise SQLMeshError(f"Environment '{environment}' was not found.")
1597                if environment_state.finalized_ts:
1598                    return environment_state.plan_id
1599                logger.warning(
1600                    "Environment '%s' is being updated by plan '%s'. Retrying in %s seconds...",
1601                    environment,
1602                    environment_state.plan_id,
1603                    self.config.run.environment_check_interval,
1604                )
1605                time.sleep(self.config.run.environment_check_interval)
1606            raise SQLMeshError(
1607                f"Exceeded the maximum wait time for environment '{environment}' to be ready. "
1608                "This means that the environment either failed to update or the update is taking longer than expected. "
1609                "See https://sqlmesh.readthedocs.io/en/stable/reference/configuration/#run to adjust the timeout settings."
1610            )
1611
1612        done = False
1613        while not done:
1614            plan_id_at_start = _block_until_finalized()
1615
1616            def _has_environment_changed() -> bool:
1617                assert environment is not None  # mypy
1618                current_environment_state = self.state_sync.get_environment(environment)
1619                return (
1620                    not current_environment_state
1621                    or current_environment_state.plan_id != plan_id_at_start
1622                    or not current_environment_state.finalized_ts
1623                )
1624
1625            try:
1626                success = self.scheduler(environment=environment).run(
1627                    environment,
1628                    start=start,
1629                    end=end,
1630                    execution_time=execution_time,
1631                    ignore_cron=ignore_cron,
1632                    circuit_breaker=_has_environment_changed,
1633                )
1634                done = True
1635            except CircuitBreakerError:
1636                logger.warning(
1637                    "Environment '%s' has been modified while running. Restarting the run...",
1638                    environment,
1639                )
1640
1641        return success
1642
1643    def _apply(self, plan: Plan, circuit_breaker: t.Optional[t.Callable[[], bool]]) -> None:
1644        self._scheduler.create_plan_evaluator(self).evaluate(plan, circuit_breaker=circuit_breaker)
1645
1646    def table_name(self, model_name: str, dev: bool) -> str:
1647        """Returns the name of the pysical table for the given model name.
1648
1649        Args:
1650            model_name: The name of the model.
1651            dev: Whether to use the deployability index for the table name.
1652
1653        Returns:
1654            The name of the physical table.
1655        """
1656        deployability_index = (
1657            DeployabilityIndex.create(self.snapshots.values())
1658            if dev
1659            else DeployabilityIndex.all_deployable()
1660        )
1661        snapshot = self.get_snapshot(model_name)
1662        if not snapshot:
1663            raise SQLMeshError(f"Model '{model_name}' was not found.")
1664        return snapshot.table_name(is_deployable=deployability_index.is_deployable(snapshot))
1665
1666    def clear_caches(self) -> None:
1667        for path in self.configs:
1668            rmtree(path / c.CACHE)
1669
1670    def _run_tests(self, verbose: bool = False) -> t.Tuple[unittest.result.TestResult, str]:
1671        test_output_io = StringIO()
1672        result = self.test(stream=test_output_io, verbose=verbose)
1673        return result, test_output_io.getvalue()
1674
1675    def _run_plan_tests(
1676        self, skip_tests: bool = False
1677    ) -> t.Tuple[t.Optional[unittest.result.TestResult], t.Optional[str]]:
1678        if not skip_tests:
1679            result, test_output = self._run_tests()
1680            if result.testsRun > 0:
1681                self.console.log_test_results(
1682                    result, test_output, self._test_connection_config._engine_adapter.DIALECT
1683                )
1684            if not result.wasSuccessful():
1685                raise PlanError(
1686                    "Cannot generate plan due to failing test(s). Fix test(s) and run again"
1687                )
1688            return result, test_output
1689        return None, None
1690
1691    @property
1692    def _model_tables(self) -> t.Dict[str, str]:
1693        """Mapping of model name to physical table name.
1694
1695        If a snapshot has not been versioned yet, its view name will be returned.
1696        """
1697        return {
1698            fqn: (
1699                snapshot.table_name()
1700                if snapshot.version
1701                else snapshot.qualified_view_name.for_environment(
1702                    EnvironmentNamingInfo.from_environment_catalog_mapping(
1703                        self.config.environment_catalog_mapping,
1704                        name=c.PROD,
1705                        suffix_target=self.config.environment_suffix_target,
1706                    )
1707                )
1708            )
1709            for fqn, snapshot in self.snapshots.items()
1710        }
1711
1712    def _snapshots(
1713        self, models_override: t.Optional[UniqueKeyDict[str, Model]] = None
1714    ) -> t.Dict[str, Snapshot]:
1715        prod = self.state_reader.get_environment(c.PROD)
1716        remote_snapshots = (
1717            {
1718                snapshot.name: snapshot
1719                for snapshot in self.state_reader.get_snapshots(prod.snapshots).values()
1720            }
1721            if prod
1722            else {}
1723        )
1724
1725        local_nodes = {**(models_override or self._models), **self._standalone_audits}
1726        nodes = local_nodes.copy()
1727        audits = self._audits.copy()
1728        projects = {config.project for config in self.configs.values()}
1729
1730        for name, snapshot in remote_snapshots.items():
1731            if name not in nodes and snapshot.node.project not in projects:
1732                nodes[name] = snapshot.node
1733                if snapshot.is_model:
1734                    for audit in snapshot.audits:
1735                        if name not in audits:
1736                            audits[name] = audit
1737
1738        def _nodes_to_snapshots(nodes: t.Dict[str, Node]) -> t.Dict[str, Snapshot]:
1739            snapshots: t.Dict[str, Snapshot] = {}
1740            fingerprint_cache: t.Dict[str, SnapshotFingerprint] = {}
1741
1742            for node in nodes.values():
1743                if node.fqn not in local_nodes and node.fqn in remote_snapshots:
1744                    ttl = remote_snapshots[node.fqn].ttl
1745                else:
1746                    config = self.config_for_node(node)
1747                    ttl = config.snapshot_ttl
1748
1749                snapshot = Snapshot.from_node(
1750                    node,
1751                    nodes=nodes,
1752                    audits=audits,
1753                    cache=fingerprint_cache,
1754                    ttl=ttl,
1755                )
1756                snapshots[snapshot.name] = snapshot
1757            return snapshots
1758
1759        snapshots = _nodes_to_snapshots(nodes)
1760        stored_snapshots = self.state_reader.get_snapshots(snapshots.values())
1761
1762        unrestorable_snapshots = {
1763            snapshot
1764            for snapshot in stored_snapshots.values()
1765            if snapshot.name in local_nodes and snapshot.unrestorable
1766        }
1767        if unrestorable_snapshots:
1768            for snapshot in unrestorable_snapshots:
1769                logger.info(
1770                    "Found a unrestorable snapshot %s. Restamping the model...", snapshot.name
1771                )
1772                node = local_nodes[snapshot.name]
1773                nodes[snapshot.name] = node.copy(
1774                    update={"stamp": f"revert to {snapshot.identifier}"}
1775                )
1776            snapshots = _nodes_to_snapshots(nodes)
1777            stored_snapshots = self.state_reader.get_snapshots(snapshots.values())
1778
1779        for snapshot in stored_snapshots.values():
1780            # Keep the original model instance to preserve the query cache.
1781            snapshot.node = snapshots[snapshot.name].node
1782
1783        return {name: stored_snapshots.get(s.snapshot_id, s) for name, s in snapshots.items()}
1784
1785    def _context_diff(
1786        self,
1787        environment: str,
1788        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
1789        create_from: t.Optional[str] = None,
1790        force_no_diff: bool = False,
1791        ensure_finalized_snapshots: bool = False,
1792    ) -> ContextDiff:
1793        environment = Environment.normalize_name(environment)
1794        if force_no_diff:
1795            return ContextDiff.create_no_diff(environment)
1796        return ContextDiff.create(
1797            environment,
1798            snapshots=snapshots or self.snapshots,
1799            create_from=create_from or c.PROD,
1800            state_reader=self.state_reader,
1801            ensure_finalized_snapshots=ensure_finalized_snapshots,
1802        )
1803
1804    def _run_janitor(self) -> None:
1805        self._cleanup_environments()
1806        expired_snapshots = self.state_sync.delete_expired_snapshots()
1807        self.snapshot_evaluator.cleanup(
1808            expired_snapshots, on_complete=self.console.update_cleanup_progress
1809        )
1810
1811        self.state_sync.compact_intervals()
1812
1813    def _cleanup_environments(self) -> None:
1814        expired_environments = self.state_sync.delete_expired_environments()
1815        cleanup_expired_views(self.engine_adapter, expired_environments, console=self.console)
1816
1817    def _try_connection(self, connection_name: str, engine_adapter: EngineAdapter) -> None:
1818        connection_name = connection_name.capitalize()
1819        try:
1820            engine_adapter.fetchall("SELECT 1")
1821            self.console.log_status_update(f"{connection_name} connection [green]succeeded[/green]")
1822        except Exception as ex:
1823            self.console.log_error(f"{connection_name} connection failed. {ex}")
1824
1825    def _new_state_sync(self) -> StateSync:
1826        return self._provided_state_sync or self._scheduler.create_state_sync(self)
1827
1828    def _new_selector(self) -> Selector:
1829        return Selector(
1830            self.state_reader,
1831            self._models,
1832            context_path=self.path,
1833            default_catalog=self.default_catalog,
1834            dialect=self.default_dialect,
1835        )
1836
1837    def _register_notification_targets(self) -> None:
1838        event_notifications = collections.defaultdict(set)
1839        for target in self.notification_targets:
1840            if target.is_configured:
1841                for event in target.notify_on:
1842                    event_notifications[event].add(target)
1843        user_notification_targets = {
1844            user.username: set(
1845                target for target in user.notification_targets if target.is_configured
1846            )
1847            for user in self.users
1848        }
1849        self.notification_target_manager = NotificationTargetManager(
1850            event_notifications, user_notification_targets, username=self.config.username
1851        )

Encapsulates a SQLMesh environment supplying convenient functions to perform various tasks.

Arguments:
  • engine_adapter: The default engine adapter to use.
  • notification_targets: The notification target to use. Defaults to what is defined in config.
  • paths: The directories containing SQLMesh files.
  • config: A Config object or the name of a Config object in config.py.
  • connection: The name of the connection. If not specified the first connection as it appears in configuration will be used.
  • test_connection: The name of the connection to use for tests. If not specified the first connection as it appears in configuration will be used.
  • concurrent_tasks: The maximum number of tasks that can use the connection concurrently.
  • load: Whether or not to automatically load all models and macros (default True).
  • console: The rich instance used for printing out CLI command results.
  • users: A list of users to make known to SQLMesh.
  • config_type: The type of config object to use (default Config).
GenericContext( engine_adapter: Union[sqlmesh.core.engine_adapter.base.EngineAdapter, NoneType] = None, notification_targets: Union[List[typing_extensions.Annotated[Union[sqlmesh.core.notification_target.BasicSMTPNotificationTarget, sqlmesh.core.notification_target.ConsoleNotificationTarget, sqlmesh.core.notification_target.SlackApiNotificationTarget, sqlmesh.core.notification_target.SlackWebhookNotificationTarget], FieldInfo(annotation=NoneType, required=True, discriminator='type_')]], NoneType] = None, state_sync: Union[sqlmesh.core.state_sync.base.StateSync, NoneType] = None, paths: 't.Union[str | Path, t.Iterable[str | Path]]' = '', config: Union[~C, str, Dict[pathlib.Path, ~C], NoneType] = None, gateway: Union[str, NoneType] = None, concurrent_tasks: Union[int, NoneType] = None, loader: Union[Type[sqlmesh.core.loader.Loader], NoneType] = None, load: bool = True, console: Union[sqlmesh.core.console.Console, NoneType] = None, users: Union[List[sqlmesh.core.user.User], NoneType] = None)
287    def __init__(
288        self,
289        engine_adapter: t.Optional[EngineAdapter] = None,
290        notification_targets: t.Optional[t.List[NotificationTarget]] = None,
291        state_sync: t.Optional[StateSync] = None,
292        paths: t.Union[str | Path, t.Iterable[str | Path]] = "",
293        config: t.Optional[t.Union[C, str, t.Dict[Path, C]]] = None,
294        gateway: t.Optional[str] = None,
295        concurrent_tasks: t.Optional[int] = None,
296        loader: t.Optional[t.Type[Loader]] = None,
297        load: bool = True,
298        console: t.Optional[Console] = None,
299        users: t.Optional[t.List[User]] = None,
300    ):
301        self.console = console or get_console()
302        self.configs = (
303            config if isinstance(config, dict) else load_configs(config, self.CONFIG_TYPE, paths)
304        )
305        self.dag: DAG[str] = DAG()
306        self._models: UniqueKeyDict[str, Model] = UniqueKeyDict("models")
307        self._audits: UniqueKeyDict[str, Audit] = UniqueKeyDict("audits")
308        self._standalone_audits: UniqueKeyDict[str, StandaloneAudit] = UniqueKeyDict(
309            "standaloneaudits"
310        )
311        self._macros: UniqueKeyDict[str, ExecutableOrMacro] = UniqueKeyDict("macros")
312        self._metrics: UniqueKeyDict[str, Metric] = UniqueKeyDict("metrics")
313        self._jinja_macros = JinjaMacroRegistry()
314        self._default_catalog: t.Optional[str] = None
315
316        self.path, self.config = t.cast(t.Tuple[Path, C], next(iter(self.configs.items())))
317
318        self.gateway = gateway
319        self._scheduler = self.config.get_scheduler(self.gateway)
320        self.environment_ttl = self.config.environment_ttl
321        self.pinned_environments = Environment.normalize_names(self.config.pinned_environments)
322        self.auto_categorize_changes = self.config.plan.auto_categorize_changes
323
324        self._connection_config = self.config.get_connection(self.gateway)
325        self.concurrent_tasks = concurrent_tasks or self._connection_config.concurrent_tasks
326        self._engine_adapter = engine_adapter or self._connection_config.create_engine_adapter()
327
328        self._test_connection_config = self.config.get_test_connection(
329            self.gateway, self.default_catalog, default_catalog_dialect=self.engine_adapter.DIALECT
330        )
331
332        self._snapshot_evaluator: t.Optional[SnapshotEvaluator] = None
333
334        self._provided_state_sync: t.Optional[StateSync] = state_sync
335        self._state_sync: t.Optional[StateSync] = None
336
337        self._loader = (loader or self.config.loader)(**self.config.loader_kwargs)
338
339        # Should we dedupe notification_targets? If so how?
340        self.notification_targets = (notification_targets or []) + self.config.notification_targets
341        self.users = (users or []) + self.config.users
342        self.users = list({user.username: user for user in self.users}.values())
343        self._register_notification_targets()
344
345        if (
346            self.config.environment_catalog_mapping
347            and not self.engine_adapter.CATALOG_SUPPORT.is_multi_catalog_supported
348        ):
349            raise SQLMeshError(
350                "Environment catalog mapping is only supported for engine adapters that support multiple catalogs"
351            )
352
353        if load:
354            self.load()
default_dialect: Union[str, NoneType]

Returns the default dialect.

Returns an engine adapter.

def execution_context( self, deployability_index: Union[sqlmesh.core.snapshot.definition.DeployabilityIndex, NoneType] = None) -> sqlmesh.core.context.ExecutionContext:
374    def execution_context(
375        self, deployability_index: t.Optional[DeployabilityIndex] = None
376    ) -> ExecutionContext:
377        """Returns an execution context."""
378        return ExecutionContext(
379            engine_adapter=self._engine_adapter,
380            snapshots=self.snapshots,
381            deployability_index=deployability_index,
382            default_dialect=self.default_dialect,
383            default_catalog=self.default_catalog,
384        )

Returns an execution context.

386    def upsert_model(self, model: t.Union[str, Model], **kwargs: t.Any) -> Model:
387        """Update or insert a model.
388
389        The context's models dictionary will be updated to include these changes.
390
391        Args:
392            model: Model name or instance to update.
393            kwargs: The kwargs to update the model with.
394
395        Returns:
396            A new instance of the updated or inserted model.
397        """
398        model = self.get_model(model, raise_if_missing=True)
399        path = model._path
400
401        # model.copy() can't be used here due to a cached state that can be a part of a model instance.
402        model = t.cast(Model, type(model)(**{**t.cast(Model, model).dict(), **kwargs}))
403        model._path = path
404
405        self._models.update({model.fqn: model})
406        self.dag.add(model.fqn, model.depends_on)
407        update_model_schemas(
408            self.dag,
409            self._models,
410            self.path,
411        )
412
413        model.validate_definition()
414
415        return model

Update or insert a model.

The context's models dictionary will be updated to include these changes.

Arguments:
  • model: Model name or instance to update.
  • kwargs: The kwargs to update the model with.
Returns:

A new instance of the updated or inserted model.

def scheduler( self, environment: Union[str, NoneType] = None) -> sqlmesh.core.scheduler.Scheduler:
417    def scheduler(self, environment: t.Optional[str] = None) -> Scheduler:
418        """Returns the built-in scheduler.
419
420        Args:
421            environment: The target environment to source model snapshots from, or None
422                if snapshots should be sourced from the currently loaded local state.
423
424        Returns:
425            The built-in scheduler instance.
426        """
427        snapshots: t.Iterable[Snapshot]
428        if environment is not None:
429            stored_environment = self.state_sync.get_environment(environment)
430            if stored_environment is None:
431                raise ConfigError(f"Environment '{environment}' was not found.")
432            snapshots = self.state_sync.get_snapshots(stored_environment.snapshots).values()
433        else:
434            snapshots = self.snapshots.values()
435
436        if not snapshots:
437            raise ConfigError("No models were found")
438
439        return Scheduler(
440            snapshots,
441            self.snapshot_evaluator,
442            self.state_sync,
443            default_catalog=self.default_catalog,
444            max_workers=self.concurrent_tasks,
445            console=self.console,
446            notification_target_manager=self.notification_target_manager,
447        )

Returns the built-in scheduler.

Arguments:
  • environment: The target environment to source model snapshots from, or None if snapshots should be sourced from the currently loaded local state.
Returns:

The built-in scheduler instance.

def refresh(self) -> None:
464    def refresh(self) -> None:
465        """Refresh all models that have been updated."""
466        if self._loader.reload_needed():
467            self.load()

Refresh all models that have been updated.

def load( self, update_schemas: bool = True) -> sqlmesh.core.context.GenericContext[~C]:
469    def load(self, update_schemas: bool = True) -> GenericContext[C]:
470        """Load all files in the context's path."""
471        with sys_path(*self.configs):
472            gc.disable()
473            project = self._loader.load(self, update_schemas)
474            self._macros = project.macros
475            self._jinja_macros = project.jinja_macros
476            self._models = project.models
477            self._metrics = project.metrics
478            self._standalone_audits.clear()
479            self._audits.clear()
480            for name, audit in project.audits.items():
481                if isinstance(audit, StandaloneAudit):
482                    self._standalone_audits[name] = audit
483                else:
484                    self._audits[name] = audit
485            self.dag = project.dag
486            gc.enable()
487
488            duplicates = set(self._models) & set(self._standalone_audits)
489            if duplicates:
490                raise ConfigError(
491                    f"Models and Standalone audits cannot have the same name: {duplicates}"
492                )
493
494        return self

Load all files in the context's path.

def run( self, environment: Union[str, NoneType] = None, *, start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, skip_janitor: bool = False, ignore_cron: bool = False) -> bool:
496    def run(
497        self,
498        environment: t.Optional[str] = None,
499        *,
500        start: t.Optional[TimeLike] = None,
501        end: t.Optional[TimeLike] = None,
502        execution_time: t.Optional[TimeLike] = None,
503        skip_janitor: bool = False,
504        ignore_cron: bool = False,
505    ) -> bool:
506        """Run the entire dag through the scheduler.
507
508        Args:
509            environment: The target environment to source model snapshots from and virtually update. Default: prod.
510            start: The start of the interval to render.
511            end: The end of the interval to render.
512            execution_time: The date/time time reference to use for execution time. Defaults to now.
513            skip_janitor: Whether to skip the janitor task.
514            ignore_cron: Whether to ignore the model's cron schedule and run all available missing intervals.
515
516        Returns:
517            True if the run was successful, False otherwise.
518        """
519        environment = environment or self.config.default_target_environment
520
521        self.notification_target_manager.notify(
522            NotificationEvent.RUN_START, environment=environment
523        )
524        success = False
525        try:
526            success = self._run(
527                environment=environment,
528                start=start,
529                end=end,
530                execution_time=execution_time,
531                skip_janitor=skip_janitor,
532                ignore_cron=ignore_cron,
533            )
534        except Exception as e:
535            self.notification_target_manager.notify(
536                NotificationEvent.RUN_FAILURE, traceback.format_exc()
537            )
538            logger.error(f"Run Failure: {traceback.format_exc()}")
539            raise e
540
541        if success:
542            self.notification_target_manager.notify(
543                NotificationEvent.RUN_END, environment=environment
544            )
545            self.console.log_success(f"Run finished for environment '{environment}'")
546        else:
547            self.notification_target_manager.notify(
548                NotificationEvent.RUN_FAILURE, "See console logs for details."
549            )
550
551        return success

Run the entire dag through the scheduler.

Arguments:
  • environment: The target environment to source model snapshots from and virtually update. Default: prod.
  • start: The start of the interval to render.
  • end: The end of the interval to render.
  • execution_time: The date/time time reference to use for execution time. Defaults to now.
  • skip_janitor: Whether to skip the janitor task.
  • ignore_cron: Whether to ignore the model's cron schedule and run all available missing intervals.
Returns:

True if the run was successful, False otherwise.

def get_model( self, model_or_snapshot: <MagicMock id='140338274404528'>, raise_if_missing: bool = False) -> Union[sqlmesh.core.model.definition.SqlModel, sqlmesh.core.model.definition.SeedModel, sqlmesh.core.model.definition.PythonModel, sqlmesh.core.model.definition.ExternalModel, NoneType]:
565    def get_model(
566        self, model_or_snapshot: ModelOrSnapshot, raise_if_missing: bool = False
567    ) -> t.Optional[Model]:
568        """Returns a model with the given name or None if a model with such name doesn't exist.
569
570        Args:
571            model_or_snapshot: A model name, model, or snapshot.
572            raise_if_missing: Raises an error if a model is not found.
573
574        Returns:
575            The expected model.
576        """
577        if isinstance(model_or_snapshot, str):
578            normalized_name = normalize_model_name(
579                model_or_snapshot,
580                dialect=self.default_dialect,
581                default_catalog=self.default_catalog,
582            )
583            model = self._models.get(normalized_name)
584        elif isinstance(model_or_snapshot, Snapshot):
585            model = model_or_snapshot.model
586        else:
587            model = model_or_snapshot
588
589        if raise_if_missing and not model:
590            raise SQLMeshError(f"Cannot find model for '{model_or_snapshot}'")
591
592        return model

Returns a model with the given name or None if a model with such name doesn't exist.

Arguments:
  • model_or_snapshot: A model name, model, or snapshot.
  • raise_if_missing: Raises an error if a model is not found.
Returns:

The expected model.

def get_snapshot( self, node_or_snapshot: <MagicMock id='140338274433296'>, raise_if_missing: bool = False) -> Union[sqlmesh.core.snapshot.definition.Snapshot, NoneType]:
607    def get_snapshot(
608        self, node_or_snapshot: NodeOrSnapshot, raise_if_missing: bool = False
609    ) -> t.Optional[Snapshot]:
610        """Returns a snapshot with the given name or None if a snapshot with such name doesn't exist.
611
612        Args:
613            node_or_snapshot: A node name, node, or snapshot.
614            raise_if_missing: Raises an error if a snapshot is not found.
615
616        Returns:
617            The expected snapshot.
618        """
619        if isinstance(node_or_snapshot, Snapshot):
620            return node_or_snapshot
621        if isinstance(node_or_snapshot, str) and not self.standalone_audits.get(node_or_snapshot):
622            node_or_snapshot = normalize_model_name(
623                node_or_snapshot,
624                dialect=self.default_dialect,
625                default_catalog=self.default_catalog,
626            )
627        fqn = node_or_snapshot if isinstance(node_or_snapshot, str) else node_or_snapshot.fqn
628        snapshot = self.snapshots.get(fqn)
629
630        if raise_if_missing and not snapshot:
631            raise SQLMeshError(f"Cannot find snapshot for '{fqn}'")
632
633        return snapshot

Returns a snapshot with the given name or None if a snapshot with such name doesn't exist.

Arguments:
  • node_or_snapshot: A node name, node, or snapshot.
  • raise_if_missing: Raises an error if a snapshot is not found.
Returns:

The expected snapshot.

def config_for_path(self, path: pathlib.Path) -> sqlmesh.core.config.root.Config:
635    def config_for_path(self, path: Path) -> Config:
636        for config_path, config in self.configs.items():
637            try:
638                path.relative_to(config_path)
639                return config
640            except ValueError:
641                pass
642        return self.config
def config_for_node( self, node: 'str | Model | StandaloneAudit') -> sqlmesh.core.config.root.Config:
644    def config_for_node(self, node: str | Model | StandaloneAudit) -> Config:
645        if isinstance(node, str):
646            return self.config_for_path(self.get_snapshot(node, raise_if_missing=True).node._path)  # type: ignore
647        return self.config_for_path(node._path)  # type: ignore
models: 'MappingProxyType[str, Model]'

Returns all registered models in this context.

metrics: 'MappingProxyType[str, Metric]'

Returns all registered metrics in this context.

standalone_audits: 'MappingProxyType[str, StandaloneAudit]'

Returns all registered standalone audits in this context.

Generates and returns snapshots based on models registered in this context.

If one of the snapshots has been previously stored in the persisted state, the stored instance will be returned.

def render( self, model_or_snapshot: <MagicMock id='140338273995024'>, *, start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, expand: Union[bool, Iterable[str]] = False, **kwargs: Any) -> sqlglot.expressions.Expression:
679    def render(
680        self,
681        model_or_snapshot: ModelOrSnapshot,
682        *,
683        start: t.Optional[TimeLike] = None,
684        end: t.Optional[TimeLike] = None,
685        execution_time: t.Optional[TimeLike] = None,
686        expand: t.Union[bool, t.Iterable[str]] = False,
687        **kwargs: t.Any,
688    ) -> exp.Expression:
689        """Renders a model's query, expanding macros with provided kwargs, and optionally expanding referenced models.
690
691        Args:
692            model_or_snapshot: The model, model name, or snapshot to render.
693            start: The start of the interval to render.
694            end: The end of the interval to render.
695            execution_time: The date/time time reference to use for execution time. Defaults to now.
696            expand: Whether or not to use expand materialized models, defaults to False.
697                If True, all referenced models are expanded as raw queries.
698                If a list, only referenced models are expanded as raw queries.
699
700        Returns:
701            The rendered expression.
702        """
703        execution_time = execution_time or now_ds()
704
705        model = self.get_model(model_or_snapshot, raise_if_missing=True)
706
707        if expand and not isinstance(expand, bool):
708            expand = {
709                normalize_model_name(
710                    x, default_catalog=self.default_catalog, dialect=self.default_dialect
711                )
712                for x in expand
713            }
714
715        expand = self.dag.upstream(model.fqn) if expand is True else expand or []
716
717        if model.is_seed:
718            df = next(
719                model.render(
720                    context=self.execution_context(),
721                    start=start,
722                    end=end,
723                    execution_time=execution_time,
724                    **kwargs,
725                )
726            )
727            return next(pandas_to_sql(t.cast(pd.DataFrame, df), model.columns_to_types))
728
729        return model.render_query_or_raise(
730            start=start,
731            end=end,
732            execution_time=execution_time,
733            snapshots=self.snapshots,
734            expand=expand,
735            engine_adapter=self.engine_adapter,
736            **kwargs,
737        )

Renders a model's query, expanding macros with provided kwargs, and optionally expanding referenced models.

Arguments:
  • model_or_snapshot: The model, model name, or snapshot to render.
  • start: The start of the interval to render.
  • end: The end of the interval to render.
  • execution_time: The date/time time reference to use for execution time. Defaults to now.
  • expand: Whether or not to use expand materialized models, defaults to False. If True, all referenced models are expanded as raw queries. If a list, only referenced models are expanded as raw queries.
Returns:

The rendered expression.

def evaluate( self, model_or_snapshot: <MagicMock id='140338274051984'>, start: Union[datetime.date, datetime.datetime, str, int, float], end: Union[datetime.date, datetime.datetime, str, int, float], execution_time: Union[datetime.date, datetime.datetime, str, int, float], limit: Union[int, NoneType] = None, **kwargs: Any) -> <MagicMock id='140338274031840'>:
739    def evaluate(
740        self,
741        model_or_snapshot: ModelOrSnapshot,
742        start: TimeLike,
743        end: TimeLike,
744        execution_time: TimeLike,
745        limit: t.Optional[int] = None,
746        **kwargs: t.Any,
747    ) -> DF:
748        """Evaluate a model or snapshot (running its query against a DB/Engine).
749
750        This method is used to test or iterate on models without side effects.
751
752        Args:
753            model_or_snapshot: The model, model name, or snapshot to render.
754            start: The start of the interval to evaluate.
755            end: The end of the interval to evaluate.
756            execution_time: The date/time time reference to use for execution time.
757            limit: A limit applied to the model.
758        """
759        snapshot = self.get_snapshot(model_or_snapshot, raise_if_missing=True)
760
761        df = self.snapshot_evaluator.evaluate_and_fetch(
762            snapshot,
763            start=start,
764            end=end,
765            execution_time=execution_time,
766            snapshots=self.snapshots,
767            limit=limit or c.DEFAULT_MAX_LIMIT,
768        )
769
770        if df is None:
771            raise RuntimeError(f"Error evaluating {snapshot.name}")
772
773        return df

Evaluate a model or snapshot (running its query against a DB/Engine).

This method is used to test or iterate on models without side effects.

Arguments:
  • model_or_snapshot: The model, model name, or snapshot to render.
  • start: The start of the interval to evaluate.
  • end: The end of the interval to evaluate.
  • execution_time: The date/time time reference to use for execution time.
  • limit: A limit applied to the model.
def format( self, transpile: Union[str, NoneType] = None, append_newline: Union[bool, NoneType] = None, **kwargs: Any) -> None:
775    def format(
776        self,
777        transpile: t.Optional[str] = None,
778        append_newline: t.Optional[bool] = None,
779        **kwargs: t.Any,
780    ) -> None:
781        """Format all SQL models."""
782        for model in self._models.values():
783            if not model._path.suffix == ".sql":
784                continue
785            with open(model._path, "r+", encoding="utf-8") as file:
786                expressions = parse(
787                    file.read(), default_dialect=self.config_for_node(model).dialect
788                )
789                if transpile:
790                    for prop in expressions[0].expressions:
791                        if prop.name.lower() == "dialect":
792                            prop.replace(
793                                exp.Property(
794                                    this="dialect",
795                                    value=exp.Literal.string(transpile or model.dialect),
796                                )
797                            )
798                format = self.config_for_node(model).format
799                opts = {**format.generator_options, **kwargs}
800                file.seek(0)
801                file.write(
802                    format_model_expressions(expressions, transpile or model.dialect, **opts)
803                )
804                if append_newline is None:
805                    append_newline = format.append_newline
806                if append_newline:
807                    file.write("\n")
808                file.truncate()

Format all SQL models.

def plan( self, environment: Union[str, NoneType] = None, *, start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, create_from: Union[str, NoneType] = None, skip_tests: bool = False, restate_models: Union[Iterable[str], NoneType] = None, no_gaps: bool = False, skip_backfill: bool = False, forward_only: Union[bool, NoneType] = None, no_prompts: Union[bool, NoneType] = None, auto_apply: Union[bool, NoneType] = None, no_auto_categorization: Union[bool, NoneType] = None, effective_from: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, include_unmodified: Union[bool, NoneType] = None, select_models: Union[Collection[str], NoneType] = None, backfill_models: Union[Collection[str], NoneType] = None, categorizer_config: Union[sqlmesh.core.config.categorizer.CategorizerConfig, NoneType] = None, enable_preview: Union[bool, NoneType] = None, no_diff: Union[bool, NoneType] = None, run: bool = False) -> sqlmesh.core.plan.definition.Plan:
810    def plan(
811        self,
812        environment: t.Optional[str] = None,
813        *,
814        start: t.Optional[TimeLike] = None,
815        end: t.Optional[TimeLike] = None,
816        execution_time: t.Optional[TimeLike] = None,
817        create_from: t.Optional[str] = None,
818        skip_tests: bool = False,
819        restate_models: t.Optional[t.Iterable[str]] = None,
820        no_gaps: bool = False,
821        skip_backfill: bool = False,
822        forward_only: t.Optional[bool] = None,
823        no_prompts: t.Optional[bool] = None,
824        auto_apply: t.Optional[bool] = None,
825        no_auto_categorization: t.Optional[bool] = None,
826        effective_from: t.Optional[TimeLike] = None,
827        include_unmodified: t.Optional[bool] = None,
828        select_models: t.Optional[t.Collection[str]] = None,
829        backfill_models: t.Optional[t.Collection[str]] = None,
830        categorizer_config: t.Optional[CategorizerConfig] = None,
831        enable_preview: t.Optional[bool] = None,
832        no_diff: t.Optional[bool] = None,
833        run: bool = False,
834    ) -> Plan:
835        """Interactively creates a plan.
836
837        This method compares the current context with the target environment. It then presents
838        the differences and asks whether to backfill each modified model.
839
840        Args:
841            environment: The environment to diff and plan against.
842            start: The start date of the backfill if there is one.
843            end: The end date of the backfill if there is one.
844            execution_time: The date/time reference to use for execution time. Defaults to now.
845            create_from: The environment to create the target environment from if it
846                doesn't exist. If not specified, the "prod" environment will be used.
847            skip_tests: Unit tests are run by default so this will skip them if enabled
848            restate_models: A list of either internal or external models, or tags, that need to be restated
849                for the given plan interval. If the target environment is a production environment,
850                ALL snapshots that depended on these upstream tables will have their intervals deleted
851                (even ones not in this current environment). Only the snapshots in this environment will
852                be backfilled whereas others need to be recovered on a future plan application. For development
853                environments only snapshots that are part of this plan will be affected.
854            no_gaps:  Whether to ensure that new snapshots for models that are already a
855                part of the target environment have no data gaps when compared against previous
856                snapshots for same models.
857            skip_backfill: Whether to skip the backfill step. Default: False.
858            forward_only: Whether the purpose of the plan is to make forward only changes.
859            no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that
860                if this flag is set to true and there are uncategorized changes the plan creation will
861                fail. Default: False.
862            auto_apply: Whether to automatically apply the new plan after creation. Default: False.
863            no_auto_categorization: Indicates whether to disable automatic categorization of model
864                changes (breaking / non-breaking). If not provided, then the corresponding configuration
865                option determines the behavior.
866            categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the
867                project config by default.
868            effective_from: The effective date from which to apply forward-only changes on production.
869            include_unmodified: Indicates whether to include unmodified models in the target development environment.
870            select_models: A list of model selection strings to filter the models that should be included into this plan.
871            backfill_models: A list of model selection strings to filter the models for which the data should be backfilled.
872            enable_preview: Indicates whether to enable preview for forward-only models in development environments.
873            no_diff: Hide text differences for changed models.
874            run: Whether to run latest intervals as part of the plan application.
875
876        Returns:
877            The populated Plan object.
878        """
879        plan_builder = self.plan_builder(
880            environment,
881            start=start,
882            end=end,
883            execution_time=execution_time,
884            create_from=create_from,
885            skip_tests=skip_tests,
886            restate_models=restate_models,
887            no_gaps=no_gaps,
888            skip_backfill=skip_backfill,
889            forward_only=forward_only,
890            no_auto_categorization=no_auto_categorization,
891            effective_from=effective_from,
892            include_unmodified=include_unmodified,
893            select_models=select_models,
894            backfill_models=backfill_models,
895            categorizer_config=categorizer_config,
896            enable_preview=enable_preview,
897            run=run,
898        )
899
900        self.console.plan(
901            plan_builder,
902            auto_apply if auto_apply is not None else self.config.plan.auto_apply,
903            self.default_catalog,
904            no_diff=no_diff if no_diff is not None else self.config.plan.no_diff,
905            no_prompts=no_prompts if no_prompts is not None else self.config.plan.no_prompts,
906        )
907
908        return plan_builder.build()

Interactively creates a plan.

This method compares the current context with the target environment. It then presents the differences and asks whether to backfill each modified model.

Arguments:
  • environment: The environment to diff and plan against.
  • start: The start date of the backfill if there is one.
  • end: The end date of the backfill if there is one.
  • execution_time: The date/time reference to use for execution time. Defaults to now.
  • create_from: The environment to create the target environment from if it doesn't exist. If not specified, the "prod" environment will be used.
  • skip_tests: Unit tests are run by default so this will skip them if enabled
  • restate_models: A list of either internal or external models, or tags, that need to be restated for the given plan interval. If the target environment is a production environment, ALL snapshots that depended on these upstream tables will have their intervals deleted (even ones not in this current environment). Only the snapshots in this environment will be backfilled whereas others need to be recovered on a future plan application. For development environments only snapshots that are part of this plan will be affected.
  • no_gaps: Whether to ensure that new snapshots for models that are already a part of the target environment have no data gaps when compared against previous snapshots for same models.
  • skip_backfill: Whether to skip the backfill step. Default: False.
  • forward_only: Whether the purpose of the plan is to make forward only changes.
  • no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that if this flag is set to true and there are uncategorized changes the plan creation will fail. Default: False.
  • auto_apply: Whether to automatically apply the new plan after creation. Default: False.
  • no_auto_categorization: Indicates whether to disable automatic categorization of model changes (breaking / non-breaking). If not provided, then the corresponding configuration option determines the behavior.
  • categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the project config by default.
  • effective_from: The effective date from which to apply forward-only changes on production.
  • include_unmodified: Indicates whether to include unmodified models in the target development environment.
  • select_models: A list of model selection strings to filter the models that should be included into this plan.
  • backfill_models: A list of model selection strings to filter the models for which the data should be backfilled.
  • enable_preview: Indicates whether to enable preview for forward-only models in development environments.
  • no_diff: Hide text differences for changed models.
  • run: Whether to run latest intervals as part of the plan application.
Returns:

The populated Plan object.

def plan_builder( self, environment: Union[str, NoneType] = None, *, start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, create_from: Union[str, NoneType] = None, skip_tests: bool = False, restate_models: Union[Iterable[str], NoneType] = None, no_gaps: bool = False, skip_backfill: bool = False, forward_only: Union[bool, NoneType] = None, no_auto_categorization: Union[bool, NoneType] = None, effective_from: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, include_unmodified: Union[bool, NoneType] = None, select_models: Union[Collection[str], NoneType] = None, backfill_models: Union[Collection[str], NoneType] = None, categorizer_config: Union[sqlmesh.core.config.categorizer.CategorizerConfig, NoneType] = None, enable_preview: Union[bool, NoneType] = None, run: bool = False) -> sqlmesh.core.plan.builder.PlanBuilder:
 910    def plan_builder(
 911        self,
 912        environment: t.Optional[str] = None,
 913        *,
 914        start: t.Optional[TimeLike] = None,
 915        end: t.Optional[TimeLike] = None,
 916        execution_time: t.Optional[TimeLike] = None,
 917        create_from: t.Optional[str] = None,
 918        skip_tests: bool = False,
 919        restate_models: t.Optional[t.Iterable[str]] = None,
 920        no_gaps: bool = False,
 921        skip_backfill: bool = False,
 922        forward_only: t.Optional[bool] = None,
 923        no_auto_categorization: t.Optional[bool] = None,
 924        effective_from: t.Optional[TimeLike] = None,
 925        include_unmodified: t.Optional[bool] = None,
 926        select_models: t.Optional[t.Collection[str]] = None,
 927        backfill_models: t.Optional[t.Collection[str]] = None,
 928        categorizer_config: t.Optional[CategorizerConfig] = None,
 929        enable_preview: t.Optional[bool] = None,
 930        run: bool = False,
 931    ) -> PlanBuilder:
 932        """Creates a plan builder.
 933
 934        Args:
 935            environment: The environment to diff and plan against.
 936            start: The start date of the backfill if there is one.
 937            end: The end date of the backfill if there is one.
 938            execution_time: The date/time reference to use for execution time. Defaults to now.
 939            create_from: The environment to create the target environment from if it
 940                doesn't exist. If not specified, the "prod" environment will be used.
 941            skip_tests: Unit tests are run by default so this will skip them if enabled
 942            restate_models: A list of either internal or external models, or tags, that need to be restated
 943                for the given plan interval. If the target environment is a production environment,
 944                ALL snapshots that depended on these upstream tables will have their intervals deleted
 945                (even ones not in this current environment). Only the snapshots in this environment will
 946                be backfilled whereas others need to be recovered on a future plan application. For development
 947                environments only snapshots that are part of this plan will be affected.
 948            no_gaps:  Whether to ensure that new snapshots for models that are already a
 949                part of the target environment have no data gaps when compared against previous
 950                snapshots for same models.
 951            skip_backfill: Whether to skip the backfill step. Default: False.
 952            forward_only: Whether the purpose of the plan is to make forward only changes.
 953            no_auto_categorization: Indicates whether to disable automatic categorization of model
 954                changes (breaking / non-breaking). If not provided, then the corresponding configuration
 955                option determines the behavior.
 956            categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the
 957                project config by default.
 958            effective_from: The effective date from which to apply forward-only changes on production.
 959            include_unmodified: Indicates whether to include unmodified models in the target development environment.
 960            select_models: A list of model selection strings to filter the models that should be included into this plan.
 961            backfill_models: A list of model selection strings to filter the models for which the data should be backfilled.
 962            enable_preview: Indicates whether to enable preview for forward-only models in development environments.
 963            run: Whether to run latest intervals as part of the plan application.
 964
 965        Returns:
 966            The plan builder.
 967        """
 968        environment = environment or self.config.default_target_environment
 969        environment = Environment.normalize_name(environment)
 970        is_dev = environment != c.PROD
 971
 972        if skip_backfill and not no_gaps and not is_dev:
 973            raise ConfigError(
 974                "When targeting the production environment either the backfill should not be skipped or the lack of data gaps should be enforced (--no-gaps flag)."
 975            )
 976
 977        if run and is_dev:
 978            raise ConfigError("The '--run' flag is only supported for the production environment.")
 979
 980        self._run_plan_tests(skip_tests=skip_tests)
 981
 982        environment_ttl = (
 983            self.environment_ttl if environment not in self.pinned_environments else None
 984        )
 985
 986        model_selector = self._new_selector()
 987
 988        if backfill_models:
 989            backfill_models = model_selector.expand_model_selections(backfill_models)
 990        else:
 991            backfill_models = None
 992
 993        models_override: t.Optional[UniqueKeyDict[str, Model]] = None
 994        if select_models:
 995            models_override = model_selector.select_models(
 996                select_models,
 997                environment,
 998                fallback_env_name=create_from or c.PROD,
 999                ensure_finalized_snapshots=self.config.plan.use_finalized_state,
1000            )
1001            if not backfill_models:
1002                # Only backfill selected models unless explicitly specified.
1003                backfill_models = model_selector.expand_model_selections(select_models)
1004
1005        expanded_restate_models = None
1006        if restate_models is not None:
1007            expanded_restate_models = model_selector.expand_model_selections(restate_models)
1008            if not expanded_restate_models:
1009                self.console.log_error(
1010                    f"Provided restated models do not match any models. No models will be included in plan. Provided: {', '.join(restate_models)}"
1011                )
1012
1013        snapshots = self._snapshots(models_override)
1014        context_diff = self._context_diff(
1015            environment or c.PROD,
1016            snapshots=snapshots,
1017            create_from=create_from,
1018            force_no_diff=(restate_models is not None and not expanded_restate_models)
1019            or (backfill_models is not None and not backfill_models),
1020            ensure_finalized_snapshots=self.config.plan.use_finalized_state,
1021        )
1022
1023        # If no end date is specified, use the max interval end from prod
1024        # to prevent unintended evaluation of the entire DAG.
1025        if not run:
1026            if backfill_models is not None:
1027                # Only consider selected models for the default end value.
1028                models_for_default_end = backfill_models.copy()
1029                for name in backfill_models:
1030                    if name not in snapshots:
1031                        continue
1032                    snapshot = snapshots[name]
1033                    snapshot_id = snapshot.snapshot_id
1034                    if (
1035                        snapshot_id in context_diff.added
1036                        and snapshot_id in context_diff.new_snapshots
1037                    ):
1038                        # If the selected model is a newly added model, then we should narrow down the intervals
1039                        # that should be considered for the default plan end value by including its parents.
1040                        models_for_default_end |= {s.name for s in snapshot.parents}
1041                default_end = self.state_sync.greatest_common_interval_end(
1042                    c.PROD,
1043                    models_for_default_end,
1044                    ensure_finalized_snapshots=self.config.plan.use_finalized_state,
1045                )
1046            else:
1047                default_end = self.state_sync.max_interval_end_for_environment(
1048                    c.PROD, ensure_finalized_snapshots=self.config.plan.use_finalized_state
1049                )
1050        else:
1051            default_end = None
1052
1053        default_start = to_date(default_end) - timedelta(days=1) if default_end and is_dev else None
1054
1055        return PlanBuilder(
1056            context_diff=context_diff,
1057            start=start,
1058            end=end,
1059            execution_time=execution_time,
1060            apply=self.apply,
1061            restate_models=expanded_restate_models,
1062            backfill_models=backfill_models,
1063            no_gaps=no_gaps,
1064            skip_backfill=skip_backfill,
1065            is_dev=is_dev,
1066            forward_only=(
1067                forward_only if forward_only is not None else self.config.plan.forward_only
1068            ),
1069            environment_ttl=environment_ttl,
1070            environment_suffix_target=self.config.environment_suffix_target,
1071            environment_catalog_mapping=self.config.environment_catalog_mapping,
1072            categorizer_config=categorizer_config or self.auto_categorize_changes,
1073            auto_categorization_enabled=not no_auto_categorization,
1074            effective_from=effective_from,
1075            include_unmodified=(
1076                include_unmodified
1077                if include_unmodified is not None
1078                else self.config.plan.include_unmodified
1079            ),
1080            default_start=default_start,
1081            default_end=default_end,
1082            enable_preview=(
1083                enable_preview if enable_preview is not None else self.config.plan.enable_preview
1084            ),
1085            end_bounded=not run,
1086            ensure_finalized_snapshots=self.config.plan.use_finalized_state,
1087        )

Creates a plan builder.

Arguments:
  • environment: The environment to diff and plan against.
  • start: The start date of the backfill if there is one.
  • end: The end date of the backfill if there is one.
  • execution_time: The date/time reference to use for execution time. Defaults to now.
  • create_from: The environment to create the target environment from if it doesn't exist. If not specified, the "prod" environment will be used.
  • skip_tests: Unit tests are run by default so this will skip them if enabled
  • restate_models: A list of either internal or external models, or tags, that need to be restated for the given plan interval. If the target environment is a production environment, ALL snapshots that depended on these upstream tables will have their intervals deleted (even ones not in this current environment). Only the snapshots in this environment will be backfilled whereas others need to be recovered on a future plan application. For development environments only snapshots that are part of this plan will be affected.
  • no_gaps: Whether to ensure that new snapshots for models that are already a part of the target environment have no data gaps when compared against previous snapshots for same models.
  • skip_backfill: Whether to skip the backfill step. Default: False.
  • forward_only: Whether the purpose of the plan is to make forward only changes.
  • no_auto_categorization: Indicates whether to disable automatic categorization of model changes (breaking / non-breaking). If not provided, then the corresponding configuration option determines the behavior.
  • categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the project config by default.
  • effective_from: The effective date from which to apply forward-only changes on production.
  • include_unmodified: Indicates whether to include unmodified models in the target development environment.
  • select_models: A list of model selection strings to filter the models that should be included into this plan.
  • backfill_models: A list of model selection strings to filter the models for which the data should be backfilled.
  • enable_preview: Indicates whether to enable preview for forward-only models in development environments.
  • run: Whether to run latest intervals as part of the plan application.
Returns:

The plan builder.

def apply( self, plan: sqlmesh.core.plan.definition.Plan, circuit_breaker: Union[Callable[[], bool], NoneType] = None) -> None:
1089    def apply(
1090        self,
1091        plan: Plan,
1092        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
1093    ) -> None:
1094        """Applies a plan by pushing snapshots and backfilling data.
1095
1096        Given a plan, it pushes snapshots into the state sync and then uses the scheduler
1097        to backfill all models.
1098
1099        Args:
1100            plan: The plan to apply.
1101            circuit_breaker: An optional handler which checks if the apply should be aborted.
1102        """
1103        if (
1104            not plan.context_diff.has_changes
1105            and not plan.requires_backfill
1106            and not plan.has_unmodified_unpromoted
1107        ):
1108            return
1109        if plan.uncategorized:
1110            raise UncategorizedPlanError("Can't apply a plan with uncategorized changes.")
1111        self.notification_target_manager.notify(
1112            NotificationEvent.APPLY_START,
1113            environment=plan.environment_naming_info.name,
1114            plan_id=plan.plan_id,
1115        )
1116        try:
1117            self._apply(plan, circuit_breaker)
1118        except Exception as e:
1119            self.notification_target_manager.notify(
1120                NotificationEvent.APPLY_FAILURE,
1121                environment=plan.environment_naming_info.name,
1122                plan_id=plan.plan_id,
1123                exc=traceback.format_exc(),
1124            )
1125            logger.error(f"Apply Failure: {traceback.format_exc()}")
1126            raise e
1127        self.notification_target_manager.notify(
1128            NotificationEvent.APPLY_END,
1129            environment=plan.environment_naming_info.name,
1130            plan_id=plan.plan_id,
1131        )

Applies a plan by pushing snapshots and backfilling data.

Given a plan, it pushes snapshots into the state sync and then uses the scheduler to backfill all models.

Arguments:
  • plan: The plan to apply.
  • circuit_breaker: An optional handler which checks if the apply should be aborted.
def invalidate_environment(self, name: str, sync: bool = False) -> None:
1133    def invalidate_environment(self, name: str, sync: bool = False) -> None:
1134        """Invalidates the target environment by setting its expiration timestamp to now.
1135
1136        Args:
1137            name: The name of the environment to invalidate.
1138            sync: If True, the call blocks until the environment is deleted. Otherwise, the environment will
1139                be deleted asynchronously by the janitor process.
1140        """
1141        self.state_sync.invalidate_environment(name)
1142        if sync:
1143            self._cleanup_environments()
1144            self.console.log_success(f"Environment '{name}' has been deleted.")
1145        else:
1146            self.console.log_success(f"Environment '{name}' has been invalidated.")

Invalidates the target environment by setting its expiration timestamp to now.

Arguments:
  • name: The name of the environment to invalidate.
  • sync: If True, the call blocks until the environment is deleted. Otherwise, the environment will be deleted asynchronously by the janitor process.
def diff( self, environment: Union[str, NoneType] = None, detailed: bool = False) -> bool:
1148    def diff(self, environment: t.Optional[str] = None, detailed: bool = False) -> bool:
1149        """Show a diff of the current context with a given environment.
1150
1151        Args:
1152            environment: The environment to diff against.
1153            detailed: Show the actual SQL differences if True.
1154
1155        Returns:
1156            True if there are changes, False otherwise.
1157        """
1158        environment = environment or self.config.default_target_environment
1159        environment = Environment.normalize_name(environment)
1160        context_diff = self._context_diff(environment)
1161        self.console.show_model_difference_summary(
1162            context_diff,
1163            EnvironmentNamingInfo.from_environment_catalog_mapping(
1164                self.config.environment_catalog_mapping,
1165                name=environment,
1166                suffix_target=self.config.environment_suffix_target,
1167            ),
1168            self.default_catalog,
1169            no_diff=not detailed,
1170        )
1171        return context_diff.has_changes

Show a diff of the current context with a given environment.

Arguments:
  • environment: The environment to diff against.
  • detailed: Show the actual SQL differences if True.
Returns:

True if there are changes, False otherwise.

def table_diff( self, source: str, target: str, on: 't.List[str] | exp.Condition | None' = None, model_or_snapshot: Union[<MagicMock id='140338273845792'>, NoneType] = None, where: 't.Optional[str | exp.Condition]' = None, limit: int = 20, show: bool = True, show_sample: bool = True) -> sqlmesh.core.table_diff.TableDiff:
1173    def table_diff(
1174        self,
1175        source: str,
1176        target: str,
1177        on: t.List[str] | exp.Condition | None = None,
1178        model_or_snapshot: t.Optional[ModelOrSnapshot] = None,
1179        where: t.Optional[str | exp.Condition] = None,
1180        limit: int = 20,
1181        show: bool = True,
1182        show_sample: bool = True,
1183    ) -> TableDiff:
1184        """Show a diff between two tables.
1185
1186        Args:
1187            source: The source environment or table.
1188            target: The target environment or table.
1189            on: The join condition, table aliases must be "s" and "t" for source and target.
1190                If omitted, the table's grain will be used.
1191            model_or_snapshot: The model or snapshot to use when environments are passed in.
1192            where: An optional where statement to filter results.
1193            limit: The limit of the sample dataframe.
1194            show: Show the table diff output in the console.
1195            show_sample: Show the sample dataframe in the console. Requires show=True.
1196
1197        Returns:
1198            The TableDiff object containing schema and summary differences.
1199        """
1200        source_alias, target_alias = source, target
1201        if model_or_snapshot:
1202            model = self.get_model(model_or_snapshot, raise_if_missing=True)
1203            source_env = self.state_reader.get_environment(source)
1204            target_env = self.state_reader.get_environment(target)
1205
1206            if not source_env:
1207                raise SQLMeshError(f"Could not find environment '{source}'")
1208            if not target_env:
1209                raise SQLMeshError(f"Could not find environment '{target}')")
1210
1211            source = next(
1212                snapshot for snapshot in source_env.snapshots if snapshot.name == model.fqn
1213            ).table_name()
1214            target = next(
1215                snapshot for snapshot in target_env.snapshots if snapshot.name == model.fqn
1216            ).table_name()
1217            source_alias = source_env.name
1218            target_alias = target_env.name
1219
1220            if not on:
1221                for ref in model.all_references:
1222                    if ref.unique:
1223                        on = ref.columns
1224
1225        if not on:
1226            raise SQLMeshError(
1227                "SQLMesh doesn't know how to join the two tables. Specify the `grains` in each model definition or pass join column names in separate `-o` flags."
1228            )
1229
1230        table_diff = TableDiff(
1231            adapter=self._engine_adapter,
1232            source=source,
1233            target=target,
1234            on=on,
1235            where=where,
1236            source_alias=source_alias,
1237            target_alias=target_alias,
1238            model_name=model.name if model_or_snapshot else None,
1239            limit=limit,
1240        )
1241        if show:
1242            self.console.show_schema_diff(table_diff.schema_diff())
1243            self.console.show_row_diff(table_diff.row_diff(), show_sample=show_sample)
1244        return table_diff

Show a diff between two tables.

Arguments:
  • source: The source environment or table.
  • target: The target environment or table.
  • on: The join condition, table aliases must be "s" and "t" for source and target. If omitted, the table's grain will be used.
  • model_or_snapshot: The model or snapshot to use when environments are passed in.
  • where: An optional where statement to filter results.
  • limit: The limit of the sample dataframe.
  • show: Show the table diff output in the console.
  • show_sample: Show the sample dataframe in the console. Requires show=True.
Returns:

The TableDiff object containing schema and summary differences.

def get_dag( self, select_models: Union[Collection[str], NoneType] = None, **options: Any) -> sqlglot.lineage.GraphHTML:
1246    def get_dag(
1247        self, select_models: t.Optional[t.Collection[str]] = None, **options: t.Any
1248    ) -> GraphHTML:
1249        """Gets an HTML object representation of the DAG.
1250
1251        Args:
1252            select_models: A list of model selection strings that should be included in the dag.
1253        Returns:
1254            An html object that renders the dag.
1255        """
1256        dag = (
1257            self.dag.prune(*self._new_selector().expand_model_selections(select_models))
1258            if select_models
1259            else self.dag
1260        )
1261
1262        nodes = {}
1263        edges: t.List[t.Dict] = []
1264
1265        for node, deps in dag.graph.items():
1266            nodes[node] = {
1267                "id": node,
1268                "label": node.split(".")[-1],
1269                "title": f"<span>{node}</span>",
1270            }
1271            edges.extend({"from": d, "to": node} for d in deps)
1272
1273        return GraphHTML(
1274            nodes,
1275            edges,
1276            options={
1277                "height": "100%",
1278                "width": "100%",
1279                "interaction": {},
1280                "layout": {
1281                    "hierarchical": {
1282                        "enabled": True,
1283                        "nodeSpacing": 200,
1284                        "sortMethod": "directed",
1285                    },
1286                },
1287                "nodes": {
1288                    "shape": "box",
1289                },
1290                **options,
1291            },
1292        )

Gets an HTML object representation of the DAG.

Arguments:
  • select_models: A list of model selection strings that should be included in the dag.
Returns:

An html object that renders the dag.

def render_dag( self, path: str, select_models: Union[Collection[str], NoneType] = None) -> None:
1294    def render_dag(self, path: str, select_models: t.Optional[t.Collection[str]] = None) -> None:
1295        """Render the dag as HTML and save it to a file.
1296
1297        Args:
1298            path: filename to save the dag html to
1299            select_models: A list of model selection strings that should be included in the dag.
1300        """
1301        file_path = Path(path)
1302        suffix = file_path.suffix
1303        if suffix != ".html":
1304            if suffix:
1305                logger.warning(
1306                    f"The extension {suffix} does not designate an html file. A file with a `.html` extension will be created instead."
1307                )
1308            path = str(file_path.with_suffix(".html"))
1309
1310        with open(path, "w", encoding="utf-8") as file:
1311            file.write(str(self.get_dag(select_models)))

Render the dag as HTML and save it to a file.

Arguments:
  • path: filename to save the dag html to
  • select_models: A list of model selection strings that should be included in the dag.
def create_test( self, model: str, input_queries: Dict[str, str], overwrite: bool = False, variables: Union[Dict[str, str], NoneType] = None, path: Union[str, NoneType] = None, name: Union[str, NoneType] = None, include_ctes: bool = False) -> None:
1313    def create_test(
1314        self,
1315        model: str,
1316        input_queries: t.Dict[str, str],
1317        overwrite: bool = False,
1318        variables: t.Optional[t.Dict[str, str]] = None,
1319        path: t.Optional[str] = None,
1320        name: t.Optional[str] = None,
1321        include_ctes: bool = False,
1322    ) -> None:
1323        """Generate a unit test fixture for a given model.
1324
1325        Args:
1326            model: The model to test.
1327            input_queries: Mapping of model names to queries. Each model included in this mapping
1328                will be populated in the test based on the results of the corresponding query.
1329            overwrite: Whether to overwrite the existing test in case of a file path collision.
1330                When set to False, an error will be raised if there is such a collision.
1331            variables: Key-value pairs that will define variables needed by the model.
1332            path: The file path corresponding to the fixture, relative to the test directory.
1333                By default, the fixture will be created under the test directory and the file name
1334                will be inferred from the test's name.
1335            name: The name of the test. This is inferred from the model name by default.
1336            include_ctes: When true, CTE fixtures will also be generated.
1337        """
1338        input_queries = {
1339            # The get_model here has two purposes: return normalized names & check for missing deps
1340            self.get_model(dep, raise_if_missing=True).fqn: query
1341            for dep, query in input_queries.items()
1342        }
1343
1344        try:
1345            test_adapter = self._test_connection_config.create_engine_adapter(
1346                register_comments_override=False
1347            )
1348            generate_test(
1349                model=self.get_model(model, raise_if_missing=True),
1350                input_queries=input_queries,
1351                models=self._models,
1352                engine_adapter=self._engine_adapter,
1353                test_engine_adapter=test_adapter,
1354                project_path=self.path,
1355                overwrite=overwrite,
1356                variables=variables,
1357                path=path,
1358                name=name,
1359                include_ctes=include_ctes,
1360            )
1361        finally:
1362            test_adapter.close()

Generate a unit test fixture for a given model.

Arguments:
  • model: The model to test.
  • input_queries: Mapping of model names to queries. Each model included in this mapping will be populated in the test based on the results of the corresponding query.
  • overwrite: Whether to overwrite the existing test in case of a file path collision. When set to False, an error will be raised if there is such a collision.
  • variables: Key-value pairs that will define variables needed by the model.
  • path: The file path corresponding to the fixture, relative to the test directory. By default, the fixture will be created under the test directory and the file name will be inferred from the test's name.
  • name: The name of the test. This is inferred from the model name by default.
  • include_ctes: When true, CTE fixtures will also be generated.
def test( self, match_patterns: Union[List[str], NoneType] = None, tests: Union[List[str], NoneType] = None, verbose: bool = False, preserve_fixtures: bool = False, stream: Union[TextIO, NoneType] = None) -> sqlmesh.core.test.result.ModelTextTestResult:
1364    def test(
1365        self,
1366        match_patterns: t.Optional[t.List[str]] = None,
1367        tests: t.Optional[t.List[str]] = None,
1368        verbose: bool = False,
1369        preserve_fixtures: bool = False,
1370        stream: t.Optional[t.TextIO] = None,
1371    ) -> ModelTextTestResult:
1372        """Discover and run model tests"""
1373        if verbose:
1374            pd.set_option("display.max_columns", None)
1375            verbosity = 2
1376        else:
1377            verbosity = 1
1378
1379        if tests:
1380            result = run_model_tests(
1381                tests=tests,
1382                models=self._models,
1383                config=self.config,
1384                gateway=self.gateway,
1385                dialect=self.default_dialect,
1386                verbosity=verbosity,
1387                patterns=match_patterns,
1388                preserve_fixtures=preserve_fixtures,
1389                stream=stream,
1390                default_catalog=self.default_catalog,
1391                default_catalog_dialect=self.engine_adapter.DIALECT,
1392            )
1393        else:
1394            test_meta = []
1395
1396            for path, config in self.configs.items():
1397                test_meta.extend(
1398                    get_all_model_tests(
1399                        path / c.TESTS,
1400                        patterns=match_patterns,
1401                        ignore_patterns=config.ignore_patterns,
1402                    )
1403                )
1404
1405            result = run_tests(
1406                model_test_metadata=test_meta,
1407                models=self._models,
1408                config=self.config,
1409                gateway=self.gateway,
1410                dialect=self.default_dialect,
1411                verbosity=verbosity,
1412                preserve_fixtures=preserve_fixtures,
1413                stream=stream,
1414                default_catalog=self.default_catalog,
1415                default_catalog_dialect=self.engine_adapter.DIALECT,
1416            )
1417
1418        return result

Discover and run model tests

def audit( self, start: Union[datetime.date, datetime.datetime, str, int, float], end: Union[datetime.date, datetime.datetime, str, int, float], *, models: Union[Iterator[str], NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None) -> None:
1420    def audit(
1421        self,
1422        start: TimeLike,
1423        end: TimeLike,
1424        *,
1425        models: t.Optional[t.Iterator[str]] = None,
1426        execution_time: t.Optional[TimeLike] = None,
1427    ) -> None:
1428        """Audit models.
1429
1430        Args:
1431            start: The start of the interval to audit.
1432            end: The end of the interval to audit.
1433            models: The models to audit. All models will be audited if not specified.
1434            execution_time: The date/time time reference to use for execution time. Defaults to now.
1435        """
1436
1437        snapshots = (
1438            [self.get_snapshot(model, raise_if_missing=True) for model in models]
1439            if models
1440            else self.snapshots.values()
1441        )
1442
1443        num_audits = sum(len(snapshot.audits_with_args) for snapshot in snapshots)
1444        self.console.log_status_update(f"Found {num_audits} audit(s).")
1445        errors = []
1446        skipped_count = 0
1447        for snapshot in snapshots:
1448            for audit_result in self.snapshot_evaluator.audit(
1449                snapshot=snapshot,
1450                start=start,
1451                end=end,
1452                snapshots=self.snapshots,
1453                raise_exception=False,
1454            ):
1455                audit_id = f"{audit_result.audit.name}"
1456                if audit_result.model:
1457                    audit_id += f" on model {audit_result.model.name}"
1458
1459                if audit_result.skipped:
1460                    self.console.log_status_update(f"{audit_id} ⏸️ SKIPPED.")
1461                    skipped_count += 1
1462                elif audit_result.count:
1463                    errors.append(audit_result)
1464                    self.console.log_status_update(
1465                        f"{audit_id} ❌ [red]FAIL [{audit_result.count}][/red]."
1466                    )
1467                else:
1468                    self.console.log_status_update(f"{audit_id} ✅ [green]PASS[/green].")
1469
1470        self.console.log_status_update(
1471            f"\nFinished with {len(errors)} audit error{'' if len(errors) == 1 else 's'} "
1472            f"and {skipped_count} audit{'' if skipped_count == 1 else 's'} skipped."
1473        )
1474        for error in errors:
1475            self.console.log_status_update(
1476                f"\nFailure in audit {error.audit.name} ({error.audit._path})."
1477            )
1478            self.console.log_status_update(f"Got {error.count} results, expected 0.")
1479            if error.query:
1480                self.console.show_sql(
1481                    f"{error.query.sql(dialect=self.snapshot_evaluator.adapter.dialect)}"
1482                )
1483
1484        self.console.log_status_update("Done.")

Audit models.

Arguments:
  • start: The start of the interval to audit.
  • end: The end of the interval to audit.
  • models: The models to audit. All models will be audited if not specified.
  • execution_time: The date/time time reference to use for execution time. Defaults to now.
def rewrite(self, sql: str, dialect: str = '') -> sqlglot.expressions.Expression:
1486    def rewrite(self, sql: str, dialect: str = "") -> exp.Expression:
1487        """Rewrite a sql expression with semantic references into an executable query.
1488
1489        https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/
1490
1491        Args:
1492            sql: The sql string to rewrite.
1493            dialect: The dialect of the sql string, defaults to the project dialect.
1494
1495        Returns:
1496            A SQLGlot expression with semantic references expanded.
1497        """
1498        return rewrite(
1499            sql,
1500            graph=ReferenceGraph(self.models.values()),
1501            metrics=self._metrics,
1502            dialect=dialect or self.default_dialect,
1503        )

Rewrite a sql expression with semantic references into an executable query.

https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/

Arguments:
  • sql: The sql string to rewrite.
  • dialect: The dialect of the sql string, defaults to the project dialect.
Returns:

A SQLGlot expression with semantic references expanded.

def migrate(self) -> None:
1505    def migrate(self) -> None:
1506        """Migrates SQLMesh to the current running version.
1507
1508        Please contact your SQLMesh administrator before doing this.
1509        """
1510        self.notification_target_manager.notify(NotificationEvent.MIGRATION_START)
1511        try:
1512            self._new_state_sync().migrate(
1513                default_catalog=self.default_catalog,
1514                promoted_snapshots_only=self.config.migration.promoted_snapshots_only,
1515            )
1516        except Exception as e:
1517            self.notification_target_manager.notify(
1518                NotificationEvent.MIGRATION_FAILURE, traceback.format_exc()
1519            )
1520            raise e
1521        self.notification_target_manager.notify(NotificationEvent.MIGRATION_END)

Migrates SQLMesh to the current running version.

Please contact your SQLMesh administrator before doing this.

def rollback(self) -> None:
1523    def rollback(self) -> None:
1524        """Rolls back SQLMesh to the previous migration.
1525
1526        Please contact your SQLMesh administrator before doing this. This action cannot be undone.
1527        """
1528        self._new_state_sync().rollback()

Rolls back SQLMesh to the previous migration.

Please contact your SQLMesh administrator before doing this. This action cannot be undone.

def create_external_models(self) -> None:
1530    def create_external_models(self) -> None:
1531        """Create a schema file with all external models.
1532
1533        The schema file contains all columns and types of external models, allowing for more robust
1534        lineage, validation, and optimizations.
1535        """
1536        if not self._models:
1537            self.load(update_schemas=False)
1538
1539        for path, config in self.configs.items():
1540            create_schema_file(
1541                path=path / c.SCHEMA_YAML,
1542                models=UniqueKeyDict(
1543                    "models",
1544                    {
1545                        fqn: model
1546                        for fqn, model in self._models.items()
1547                        if self.config_for_node(model) is config
1548                    },
1549                ),
1550                adapter=self._engine_adapter,
1551                state_reader=self.state_reader,
1552                dialect=config.model_defaults.dialect,
1553                max_workers=self.concurrent_tasks,
1554            )

Create a schema file with all external models.

The schema file contains all columns and types of external models, allowing for more robust lineage, validation, and optimizations.

def print_info(self) -> None:
1556    def print_info(self) -> None:
1557        """Prints information about connections, models, macros, etc. to the console."""
1558        self.console.log_status_update(f"Models: {len(self.models)}")
1559        self.console.log_status_update(f"Macros: {len(self._macros) - len(macro.get_registry())}")
1560
1561        self._try_connection("data warehouse", self._engine_adapter)
1562
1563        state_connection = self.config.get_state_connection(self.gateway)
1564        if state_connection:
1565            self._try_connection("state backend", state_connection.create_engine_adapter())

Prints information about connections, models, macros, etc. to the console.

def close(self) -> None:
1567    def close(self) -> None:
1568        """Releases all resources allocated by this context."""
1569        self.snapshot_evaluator.close()
1570        self.state_sync.close()

Releases all resources allocated by this context.

def table_name(self, model_name: str, dev: bool) -> str:
1646    def table_name(self, model_name: str, dev: bool) -> str:
1647        """Returns the name of the pysical table for the given model name.
1648
1649        Args:
1650            model_name: The name of the model.
1651            dev: Whether to use the deployability index for the table name.
1652
1653        Returns:
1654            The name of the physical table.
1655        """
1656        deployability_index = (
1657            DeployabilityIndex.create(self.snapshots.values())
1658            if dev
1659            else DeployabilityIndex.all_deployable()
1660        )
1661        snapshot = self.get_snapshot(model_name)
1662        if not snapshot:
1663            raise SQLMeshError(f"Model '{model_name}' was not found.")
1664        return snapshot.table_name(is_deployable=deployability_index.is_deployable(snapshot))

Returns the name of the pysical table for the given model name.

Arguments:
  • model_name: The name of the model.
  • dev: Whether to use the deployability index for the table name.
Returns:

The name of the physical table.

def clear_caches(self) -> None:
1666    def clear_caches(self) -> None:
1667        for path in self.configs:
1668            rmtree(path / c.CACHE)
1854class Context(GenericContext[Config]):
1855    CONFIG_TYPE = Config

Encapsulates a SQLMesh environment supplying convenient functions to perform various tasks.

Arguments:
  • engine_adapter: The default engine adapter to use.
  • notification_targets: The notification target to use. Defaults to what is defined in config.
  • paths: The directories containing SQLMesh files.
  • config: A Config object or the name of a Config object in config.py.
  • connection: The name of the connection. If not specified the first connection as it appears in configuration will be used.
  • test_connection: The name of the connection to use for tests. If not specified the first connection as it appears in configuration will be used.
  • concurrent_tasks: The maximum number of tasks that can use the connection concurrently.
  • load: Whether or not to automatically load all models and macros (default True).
  • console: The rich instance used for printing out CLI command results.
  • users: A list of users to make known to SQLMesh.
  • config_type: The type of config object to use (default Config).