Edit on GitHub

Context

A SQLMesh context encapsulates a SQLMesh environment. When you create a new context, it will discover and load your project's models, macros, and audits. Afterwards, you can use the context to create and apply plans, visualize your model's lineage, run your audits and model tests, and perform various other tasks. For more information regarding what a context can do, see sqlmesh.core.context.Context.

Examples:

Creating and applying a plan against the staging environment.

from sqlmesh.core.context import Context
context = Context(path="example", config="local_config")
plan = context.plan("staging")
context.apply(plan)

Running audits on your data.

from sqlmesh.core.context import Context
context = Context(path="example", config="local_config")
context.audit("yesterday", "now")

Running tests on your models.

from sqlmesh.core.context import Context
context = Context(path="example")
context.test()
   1"""
   2# Context
   3
   4A SQLMesh context encapsulates a SQLMesh environment. When you create a new context, it will discover and
   5load your project's models, macros, and audits. Afterwards, you can use the context to create and apply
   6plans, visualize your model's lineage, run your audits and model tests, and perform various other tasks.
   7For more information regarding what a context can do, see `sqlmesh.core.context.Context`.
   8
   9# Examples:
  10
  11Creating and applying a plan against the staging environment.
  12```python
  13from sqlmesh.core.context import Context
  14context = Context(path="example", config="local_config")
  15plan = context.plan("staging")
  16context.apply(plan)
  17```
  18
  19Running audits on your data.
  20```python
  21from sqlmesh.core.context import Context
  22context = Context(path="example", config="local_config")
  23context.audit("yesterday", "now")
  24```
  25
  26Running tests on your models.
  27```python
  28from sqlmesh.core.context import Context
  29context = Context(path="example")
  30context.test()
  31```
  32"""
  33
  34from __future__ import annotations
  35
  36import abc
  37import collections
  38import gc
  39import logging
  40import time
  41import traceback
  42import typing as t
  43import unittest.result
  44from datetime import timedelta
  45from functools import cached_property
  46from io import StringIO
  47from pathlib import Path
  48from shutil import rmtree
  49from types import MappingProxyType
  50
  51import pandas as pd
  52from sqlglot import exp
  53from sqlglot.lineage import GraphHTML
  54
  55from sqlmesh.core import constants as c
  56from sqlmesh.core.audit import Audit, StandaloneAudit
  57from sqlmesh.core.config import CategorizerConfig, Config, load_configs
  58from sqlmesh.core.config.loader import C
  59from sqlmesh.core.console import Console, get_console
  60from sqlmesh.core.context_diff import ContextDiff
  61from sqlmesh.core.dialect import (
  62    format_model_expressions,
  63    normalize_model_name,
  64    pandas_to_sql,
  65    parse,
  66    parse_one,
  67)
  68from sqlmesh.core.engine_adapter import EngineAdapter
  69from sqlmesh.core.environment import Environment, EnvironmentNamingInfo
  70from sqlmesh.core.loader import Loader, update_model_schemas
  71from sqlmesh.core.macros import ExecutableOrMacro, macro
  72from sqlmesh.core.metric import Metric, rewrite
  73from sqlmesh.core.model import Model
  74from sqlmesh.core.notification_target import (
  75    NotificationEvent,
  76    NotificationTarget,
  77    NotificationTargetManager,
  78)
  79from sqlmesh.core.plan import Plan, PlanBuilder
  80from sqlmesh.core.reference import ReferenceGraph
  81from sqlmesh.core.scheduler import Scheduler
  82from sqlmesh.core.schema_loader import create_schema_file
  83from sqlmesh.core.selector import Selector
  84from sqlmesh.core.snapshot import (
  85    DeployabilityIndex,
  86    Snapshot,
  87    SnapshotEvaluator,
  88    SnapshotFingerprint,
  89    to_table_mapping,
  90)
  91from sqlmesh.core.state_sync import (
  92    CachingStateSync,
  93    StateReader,
  94    StateSync,
  95    cleanup_expired_views,
  96)
  97from sqlmesh.core.table_diff import TableDiff
  98from sqlmesh.core.test import (
  99    ModelTextTestResult,
 100    generate_test,
 101    get_all_model_tests,
 102    run_model_tests,
 103    run_tests,
 104)
 105from sqlmesh.core.user import User
 106from sqlmesh.utils import UniqueKeyDict, sys_path
 107from sqlmesh.utils.dag import DAG
 108from sqlmesh.utils.date import TimeLike, now_ds, to_date
 109from sqlmesh.utils.errors import (
 110    CircuitBreakerError,
 111    ConfigError,
 112    PlanError,
 113    SQLMeshError,
 114    UncategorizedPlanError,
 115)
 116from sqlmesh.utils.jinja import JinjaMacroRegistry
 117
 118if t.TYPE_CHECKING:
 119    from typing_extensions import Literal
 120
 121    from sqlmesh.core.engine_adapter._typing import DF, PySparkDataFrame, PySparkSession
 122    from sqlmesh.core.snapshot import Node
 123
 124    ModelOrSnapshot = t.Union[str, Model, Snapshot]
 125    NodeOrSnapshot = t.Union[str, Model, StandaloneAudit, Snapshot]
 126
 127logger = logging.getLogger(__name__)
 128
 129
 130class BaseContext(abc.ABC):
 131    """The base context which defines methods to execute a model."""
 132
 133    @property
 134    @abc.abstractmethod
 135    def default_dialect(self) -> t.Optional[str]:
 136        """Returns the default dialect."""
 137
 138    @property
 139    @abc.abstractmethod
 140    def _model_tables(self) -> t.Dict[str, str]:
 141        """Returns a mapping of model names to tables."""
 142
 143    @property
 144    @abc.abstractmethod
 145    def engine_adapter(self) -> EngineAdapter:
 146        """Returns an engine adapter."""
 147
 148    @property
 149    def spark(self) -> t.Optional[PySparkSession]:
 150        """Returns the spark session if it exists."""
 151        return self.engine_adapter.spark
 152
 153    @property
 154    def default_catalog(self) -> t.Optional[str]:
 155        raise NotImplementedError
 156
 157    def table(self, model_name: str) -> str:
 158        """Gets the physical table name for a given model.
 159
 160        Args:
 161            model_name: The model name.
 162
 163        Returns:
 164            The physical table name.
 165        """
 166        model_name = normalize_model_name(model_name, self.default_catalog, self.default_dialect)
 167
 168        # We generate SQL for the default dialect because the table name may be used in a
 169        # fetchdf call and so the quotes need to be correct (eg. backticks for bigquery)
 170        return parse_one(self._model_tables[model_name]).sql(dialect=self.default_dialect)
 171
 172    def fetchdf(
 173        self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False
 174    ) -> pd.DataFrame:
 175        """Fetches a dataframe given a sql string or sqlglot expression.
 176
 177        Args:
 178            query: SQL string or sqlglot expression.
 179            quote_identifiers: Whether to quote all identifiers in the query.
 180
 181        Returns:
 182            The default dataframe is Pandas, but for Spark a PySpark dataframe is returned.
 183        """
 184        return self.engine_adapter.fetchdf(query, quote_identifiers=quote_identifiers)
 185
 186    def fetch_pyspark_df(
 187        self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False
 188    ) -> PySparkDataFrame:
 189        """Fetches a PySpark dataframe given a sql string or sqlglot expression.
 190
 191        Args:
 192            query: SQL string or sqlglot expression.
 193            quote_identifiers: Whether to quote all identifiers in the query.
 194
 195        Returns:
 196            A PySpark dataframe.
 197        """
 198        return self.engine_adapter.fetch_pyspark_df(query, quote_identifiers=quote_identifiers)
 199
 200
 201class ExecutionContext(BaseContext):
 202    """The minimal context needed to execute a model.
 203
 204    Args:
 205        engine_adapter: The engine adapter to execute queries against.
 206        snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations.
 207        deployability_index: Determines snapshots that are deployable in the context of this evaluation.
 208    """
 209
 210    def __init__(
 211        self,
 212        engine_adapter: EngineAdapter,
 213        snapshots: t.Dict[str, Snapshot],
 214        deployability_index: t.Optional[DeployabilityIndex] = None,
 215        default_dialect: t.Optional[str] = None,
 216        default_catalog: t.Optional[str] = None,
 217        variables: t.Optional[t.Dict[str, t.Any]] = None,
 218    ):
 219        self.snapshots = snapshots
 220        self.deployability_index = deployability_index
 221        self._engine_adapter = engine_adapter
 222        self._default_catalog = default_catalog
 223        self._default_dialect = default_dialect
 224        self._variables = variables or {}
 225
 226    @property
 227    def default_dialect(self) -> t.Optional[str]:
 228        return self._default_dialect
 229
 230    @property
 231    def engine_adapter(self) -> EngineAdapter:
 232        """Returns an engine adapter."""
 233        return self._engine_adapter
 234
 235    @cached_property
 236    def _model_tables(self) -> t.Dict[str, str]:
 237        """Returns a mapping of model names to tables."""
 238        return to_table_mapping(self.snapshots.values(), self.deployability_index)
 239
 240    @property
 241    def default_catalog(self) -> t.Optional[str]:
 242        return self._default_catalog
 243
 244    @property
 245    def gateway(self) -> t.Optional[str]:
 246        """Returns the gateway name."""
 247        return self.var(c.GATEWAY)
 248
 249    def var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]:
 250        """Returns a variable value."""
 251        return self._variables.get(var_name.lower(), default)
 252
 253    def with_variables(self, variables: t.Dict[str, t.Any]) -> ExecutionContext:
 254        """Returns a new ExecutionContext with additional variables."""
 255        return ExecutionContext(
 256            self._engine_adapter,
 257            self.snapshots,
 258            self.deployability_index,
 259            self._default_dialect,
 260            self._default_catalog,
 261            variables=variables,
 262        )
 263
 264
 265class GenericContext(BaseContext, t.Generic[C]):
 266    """Encapsulates a SQLMesh environment supplying convenient functions to perform various tasks.
 267
 268    Args:
 269        engine_adapter: The default engine adapter to use.
 270        notification_targets: The notification target to use. Defaults to what is defined in config.
 271        paths: The directories containing SQLMesh files.
 272        config: A Config object or the name of a Config object in config.py.
 273        connection: The name of the connection. If not specified the first connection as it appears
 274            in configuration will be used.
 275        test_connection: The name of the connection to use for tests. If not specified the first
 276            connection as it appears in configuration will be used.
 277        concurrent_tasks: The maximum number of tasks that can use the connection concurrently.
 278        load: Whether or not to automatically load all models and macros (default True).
 279        console: The rich instance used for printing out CLI command results.
 280        users: A list of users to make known to SQLMesh.
 281        config_type: The type of config object to use (default Config).
 282    """
 283
 284    CONFIG_TYPE: t.Type[C]
 285
 286    def __init__(
 287        self,
 288        engine_adapter: t.Optional[EngineAdapter] = None,
 289        notification_targets: t.Optional[t.List[NotificationTarget]] = None,
 290        state_sync: t.Optional[StateSync] = None,
 291        paths: t.Union[str | Path, t.Iterable[str | Path]] = "",
 292        config: t.Optional[t.Union[C, str, t.Dict[Path, C]]] = None,
 293        gateway: t.Optional[str] = None,
 294        concurrent_tasks: t.Optional[int] = None,
 295        loader: t.Optional[t.Type[Loader]] = None,
 296        load: bool = True,
 297        console: t.Optional[Console] = None,
 298        users: t.Optional[t.List[User]] = None,
 299    ):
 300        self.console = console or get_console()
 301        self.configs = (
 302            config if isinstance(config, dict) else load_configs(config, self.CONFIG_TYPE, paths)
 303        )
 304        self.dag: DAG[str] = DAG()
 305        self._models: UniqueKeyDict[str, Model] = UniqueKeyDict("models")
 306        self._audits: UniqueKeyDict[str, Audit] = UniqueKeyDict("audits")
 307        self._standalone_audits: UniqueKeyDict[str, StandaloneAudit] = UniqueKeyDict(
 308            "standaloneaudits"
 309        )
 310        self._macros: UniqueKeyDict[str, ExecutableOrMacro] = UniqueKeyDict("macros")
 311        self._metrics: UniqueKeyDict[str, Metric] = UniqueKeyDict("metrics")
 312        self._jinja_macros = JinjaMacroRegistry()
 313        self._default_catalog: t.Optional[str] = None
 314
 315        self.path, self.config = t.cast(t.Tuple[Path, C], next(iter(self.configs.items())))
 316
 317        self.gateway = gateway
 318        self._scheduler = self.config.get_scheduler(self.gateway)
 319        self.environment_ttl = self.config.environment_ttl
 320        self.pinned_environments = Environment.normalize_names(self.config.pinned_environments)
 321        self.auto_categorize_changes = self.config.plan.auto_categorize_changes
 322
 323        self._connection_config = self.config.get_connection(self.gateway)
 324        self.concurrent_tasks = concurrent_tasks or self._connection_config.concurrent_tasks
 325        self._engine_adapter = engine_adapter or self._connection_config.create_engine_adapter()
 326
 327        test_connection_config = self.config.get_test_connection(
 328            self.gateway, self.default_catalog, default_catalog_dialect=self.engine_adapter.DIALECT
 329        )
 330        self._test_engine_adapter = test_connection_config.create_engine_adapter(
 331            register_comments_override=False
 332        )
 333
 334        self._snapshot_evaluator: t.Optional[SnapshotEvaluator] = None
 335
 336        self._provided_state_sync: t.Optional[StateSync] = state_sync
 337        self._state_sync: t.Optional[StateSync] = None
 338
 339        self._loader = (loader or self.config.loader)(**self.config.loader_kwargs)
 340
 341        # Should we dedupe notification_targets? If so how?
 342        self.notification_targets = (notification_targets or []) + self.config.notification_targets
 343        self.users = (users or []) + self.config.users
 344        self.users = list({user.username: user for user in self.users}.values())
 345        self._register_notification_targets()
 346
 347        if (
 348            self.config.environment_catalog_mapping
 349            and not self.engine_adapter.CATALOG_SUPPORT.is_multi_catalog_supported
 350        ):
 351            raise SQLMeshError(
 352                "Environment catalog mapping is only supported for engine adapters that support multiple catalogs"
 353            )
 354
 355        if load:
 356            self.load()
 357
 358    @property
 359    def default_dialect(self) -> t.Optional[str]:
 360        return self.config.dialect
 361
 362    @property
 363    def engine_adapter(self) -> EngineAdapter:
 364        """Returns an engine adapter."""
 365        return self._engine_adapter
 366
 367    @property
 368    def snapshot_evaluator(self) -> SnapshotEvaluator:
 369        if not self._snapshot_evaluator:
 370            self._snapshot_evaluator = SnapshotEvaluator(
 371                self.engine_adapter.with_log_level(logging.INFO),
 372                ddl_concurrent_tasks=self.concurrent_tasks,
 373            )
 374        return self._snapshot_evaluator
 375
 376    def execution_context(
 377        self, deployability_index: t.Optional[DeployabilityIndex] = None
 378    ) -> ExecutionContext:
 379        """Returns an execution context."""
 380        return ExecutionContext(
 381            engine_adapter=self._engine_adapter,
 382            snapshots=self.snapshots,
 383            deployability_index=deployability_index,
 384            default_dialect=self.default_dialect,
 385            default_catalog=self.default_catalog,
 386        )
 387
 388    def upsert_model(self, model: t.Union[str, Model], **kwargs: t.Any) -> Model:
 389        """Update or insert a model.
 390
 391        The context's models dictionary will be updated to include these changes.
 392
 393        Args:
 394            model: Model name or instance to update.
 395            kwargs: The kwargs to update the model with.
 396
 397        Returns:
 398            A new instance of the updated or inserted model.
 399        """
 400        model = self.get_model(model, raise_if_missing=True)
 401        path = model._path
 402
 403        # model.copy() can't be used here due to a cached state that can be a part of a model instance.
 404        model = t.cast(Model, type(model)(**{**t.cast(Model, model).dict(), **kwargs}))
 405        model._path = path
 406
 407        self._models.update({model.fqn: model})
 408        self.dag.add(model.fqn, model.depends_on)
 409        update_model_schemas(
 410            self.dag,
 411            self._models,
 412            self.path,
 413        )
 414
 415        model.validate_definition()
 416
 417        return model
 418
 419    def scheduler(self, environment: t.Optional[str] = None) -> Scheduler:
 420        """Returns the built-in scheduler.
 421
 422        Args:
 423            environment: The target environment to source model snapshots from, or None
 424                if snapshots should be sourced from the currently loaded local state.
 425
 426        Returns:
 427            The built-in scheduler instance.
 428        """
 429        snapshots: t.Iterable[Snapshot]
 430        if environment is not None:
 431            stored_environment = self.state_sync.get_environment(environment)
 432            if stored_environment is None:
 433                raise ConfigError(f"Environment '{environment}' was not found.")
 434            snapshots = self.state_sync.get_snapshots(stored_environment.snapshots).values()
 435        else:
 436            snapshots = self.snapshots.values()
 437
 438        if not snapshots:
 439            raise ConfigError("No models were found")
 440
 441        return Scheduler(
 442            snapshots,
 443            self.snapshot_evaluator,
 444            self.state_sync,
 445            default_catalog=self.default_catalog,
 446            max_workers=self.concurrent_tasks,
 447            console=self.console,
 448            notification_target_manager=self.notification_target_manager,
 449        )
 450
 451    @property
 452    def state_sync(self) -> StateSync:
 453        if not self._state_sync:
 454            self._state_sync = self._new_state_sync()
 455
 456            if self._state_sync.get_versions(validate=False).schema_version == 0:
 457                self._state_sync.migrate(default_catalog=self.default_catalog)
 458            self._state_sync.get_versions()
 459            self._state_sync = CachingStateSync(self._state_sync)  # type: ignore
 460        return self._state_sync
 461
 462    @property
 463    def state_reader(self) -> StateReader:
 464        return self.state_sync
 465
 466    def refresh(self) -> None:
 467        """Refresh all models that have been updated."""
 468        if self._loader.reload_needed():
 469            self.load()
 470
 471    def load(self, update_schemas: bool = True) -> GenericContext[C]:
 472        """Load all files in the context's path."""
 473        with sys_path(*self.configs):
 474            gc.disable()
 475            project = self._loader.load(self, update_schemas)
 476            self._macros = project.macros
 477            self._jinja_macros = project.jinja_macros
 478            self._models = project.models
 479            self._metrics = project.metrics
 480            self._standalone_audits.clear()
 481            self._audits.clear()
 482            for name, audit in project.audits.items():
 483                if isinstance(audit, StandaloneAudit):
 484                    self._standalone_audits[name] = audit
 485                else:
 486                    self._audits[name] = audit
 487            self.dag = project.dag
 488            gc.enable()
 489
 490            duplicates = set(self._models) & set(self._standalone_audits)
 491            if duplicates:
 492                raise ConfigError(
 493                    f"Models and Standalone audits cannot have the same name: {duplicates}"
 494                )
 495
 496        return self
 497
 498    def run(
 499        self,
 500        environment: t.Optional[str] = None,
 501        *,
 502        start: t.Optional[TimeLike] = None,
 503        end: t.Optional[TimeLike] = None,
 504        execution_time: t.Optional[TimeLike] = None,
 505        skip_janitor: bool = False,
 506        ignore_cron: bool = False,
 507    ) -> bool:
 508        """Run the entire dag through the scheduler.
 509
 510        Args:
 511            environment: The target environment to source model snapshots from and virtually update. Default: prod.
 512            start: The start of the interval to render.
 513            end: The end of the interval to render.
 514            execution_time: The date/time time reference to use for execution time. Defaults to now.
 515            skip_janitor: Whether to skip the janitor task.
 516            ignore_cron: Whether to ignore the model's cron schedule and run all available missing intervals.
 517
 518        Returns:
 519            True if the run was successful, False otherwise.
 520        """
 521        environment = environment or self.config.default_target_environment
 522
 523        self.notification_target_manager.notify(
 524            NotificationEvent.RUN_START, environment=environment
 525        )
 526        success = False
 527        try:
 528            success = self._run(
 529                environment=environment,
 530                start=start,
 531                end=end,
 532                execution_time=execution_time,
 533                skip_janitor=skip_janitor,
 534                ignore_cron=ignore_cron,
 535            )
 536        except Exception as e:
 537            self.notification_target_manager.notify(
 538                NotificationEvent.RUN_FAILURE, traceback.format_exc()
 539            )
 540            logger.error(f"Run Failure: {traceback.format_exc()}")
 541            raise e
 542
 543        if success:
 544            self.notification_target_manager.notify(
 545                NotificationEvent.RUN_END, environment=environment
 546            )
 547            self.console.log_success(f"Run finished for environment '{environment}'")
 548        else:
 549            self.notification_target_manager.notify(
 550                NotificationEvent.RUN_FAILURE, "See console logs for details."
 551            )
 552
 553        return success
 554
 555    @t.overload
 556    def get_model(
 557        self, model_or_snapshot: ModelOrSnapshot, raise_if_missing: Literal[True] = True
 558    ) -> Model: ...
 559
 560    @t.overload
 561    def get_model(
 562        self,
 563        model_or_snapshot: ModelOrSnapshot,
 564        raise_if_missing: Literal[False] = False,
 565    ) -> t.Optional[Model]: ...
 566
 567    def get_model(
 568        self, model_or_snapshot: ModelOrSnapshot, raise_if_missing: bool = False
 569    ) -> t.Optional[Model]:
 570        """Returns a model with the given name or None if a model with such name doesn't exist.
 571
 572        Args:
 573            model_or_snapshot: A model name, model, or snapshot.
 574            raise_if_missing: Raises an error if a model is not found.
 575
 576        Returns:
 577            The expected model.
 578        """
 579        if isinstance(model_or_snapshot, str):
 580            normalized_name = normalize_model_name(
 581                model_or_snapshot,
 582                dialect=self.default_dialect,
 583                default_catalog=self.default_catalog,
 584            )
 585            model = self._models.get(normalized_name)
 586        elif isinstance(model_or_snapshot, Snapshot):
 587            model = model_or_snapshot.model
 588        else:
 589            model = model_or_snapshot
 590
 591        if raise_if_missing and not model:
 592            raise SQLMeshError(f"Cannot find model for '{model_or_snapshot}'")
 593
 594        return model
 595
 596    @t.overload
 597    def get_snapshot(self, node_or_snapshot: NodeOrSnapshot) -> t.Optional[Snapshot]: ...
 598
 599    @t.overload
 600    def get_snapshot(
 601        self, node_or_snapshot: NodeOrSnapshot, raise_if_missing: Literal[True]
 602    ) -> Snapshot: ...
 603
 604    @t.overload
 605    def get_snapshot(
 606        self, node_or_snapshot: NodeOrSnapshot, raise_if_missing: Literal[False]
 607    ) -> t.Optional[Snapshot]: ...
 608
 609    def get_snapshot(
 610        self, node_or_snapshot: NodeOrSnapshot, raise_if_missing: bool = False
 611    ) -> t.Optional[Snapshot]:
 612        """Returns a snapshot with the given name or None if a snapshot with such name doesn't exist.
 613
 614        Args:
 615            node_or_snapshot: A node name, node, or snapshot.
 616            raise_if_missing: Raises an error if a snapshot is not found.
 617
 618        Returns:
 619            The expected snapshot.
 620        """
 621        if isinstance(node_or_snapshot, Snapshot):
 622            return node_or_snapshot
 623        if isinstance(node_or_snapshot, str) and not self.standalone_audits.get(node_or_snapshot):
 624            node_or_snapshot = normalize_model_name(
 625                node_or_snapshot,
 626                dialect=self.default_dialect,
 627                default_catalog=self.default_catalog,
 628            )
 629        fqn = node_or_snapshot if isinstance(node_or_snapshot, str) else node_or_snapshot.fqn
 630        snapshot = self.snapshots.get(fqn)
 631
 632        if raise_if_missing and not snapshot:
 633            raise SQLMeshError(f"Cannot find snapshot for '{fqn}'")
 634
 635        return snapshot
 636
 637    def config_for_path(self, path: Path) -> Config:
 638        for config_path, config in self.configs.items():
 639            try:
 640                path.relative_to(config_path)
 641                return config
 642            except ValueError:
 643                pass
 644        return self.config
 645
 646    def config_for_node(self, node: str | Model | StandaloneAudit) -> Config:
 647        if isinstance(node, str):
 648            return self.config_for_path(self.get_snapshot(node, raise_if_missing=True).node._path)  # type: ignore
 649        return self.config_for_path(node._path)  # type: ignore
 650
 651    @property
 652    def models(self) -> MappingProxyType[str, Model]:
 653        """Returns all registered models in this context."""
 654        return MappingProxyType(self._models)
 655
 656    @property
 657    def metrics(self) -> MappingProxyType[str, Metric]:
 658        """Returns all registered metrics in this context."""
 659        return MappingProxyType(self._metrics)
 660
 661    @property
 662    def standalone_audits(self) -> MappingProxyType[str, StandaloneAudit]:
 663        """Returns all registered standalone audits in this context."""
 664        return MappingProxyType(self._standalone_audits)
 665
 666    @property
 667    def snapshots(self) -> t.Dict[str, Snapshot]:
 668        """Generates and returns snapshots based on models registered in this context.
 669
 670        If one of the snapshots has been previously stored in the persisted state, the stored
 671        instance will be returned.
 672        """
 673        return self._snapshots()
 674
 675    @property
 676    def default_catalog(self) -> t.Optional[str]:
 677        if self._default_catalog is None:
 678            self._default_catalog = self._scheduler.get_default_catalog(self)
 679        return self._default_catalog
 680
 681    def render(
 682        self,
 683        model_or_snapshot: ModelOrSnapshot,
 684        *,
 685        start: t.Optional[TimeLike] = None,
 686        end: t.Optional[TimeLike] = None,
 687        execution_time: t.Optional[TimeLike] = None,
 688        expand: t.Union[bool, t.Iterable[str]] = False,
 689        **kwargs: t.Any,
 690    ) -> exp.Expression:
 691        """Renders a model's query, expanding macros with provided kwargs, and optionally expanding referenced models.
 692
 693        Args:
 694            model_or_snapshot: The model, model name, or snapshot to render.
 695            start: The start of the interval to render.
 696            end: The end of the interval to render.
 697            execution_time: The date/time time reference to use for execution time. Defaults to now.
 698            expand: Whether or not to use expand materialized models, defaults to False.
 699                If True, all referenced models are expanded as raw queries.
 700                If a list, only referenced models are expanded as raw queries.
 701
 702        Returns:
 703            The rendered expression.
 704        """
 705        execution_time = execution_time or now_ds()
 706
 707        model = self.get_model(model_or_snapshot, raise_if_missing=True)
 708
 709        if expand and not isinstance(expand, bool):
 710            expand = {
 711                normalize_model_name(
 712                    x, default_catalog=self.default_catalog, dialect=self.default_dialect
 713                )
 714                for x in expand
 715            }
 716
 717        expand = self.dag.upstream(model.fqn) if expand is True else expand or []
 718
 719        if model.is_seed:
 720            df = next(
 721                model.render(
 722                    context=self.execution_context(),
 723                    start=start,
 724                    end=end,
 725                    execution_time=execution_time,
 726                    **kwargs,
 727                )
 728            )
 729            return next(pandas_to_sql(t.cast(pd.DataFrame, df), model.columns_to_types))
 730
 731        return model.render_query_or_raise(
 732            start=start,
 733            end=end,
 734            execution_time=execution_time,
 735            snapshots=self.snapshots,
 736            expand=expand,
 737            engine_adapter=self.engine_adapter,
 738            **kwargs,
 739        )
 740
 741    def evaluate(
 742        self,
 743        model_or_snapshot: ModelOrSnapshot,
 744        start: TimeLike,
 745        end: TimeLike,
 746        execution_time: TimeLike,
 747        limit: t.Optional[int] = None,
 748        **kwargs: t.Any,
 749    ) -> DF:
 750        """Evaluate a model or snapshot (running its query against a DB/Engine).
 751
 752        This method is used to test or iterate on models without side effects.
 753
 754        Args:
 755            model_or_snapshot: The model, model name, or snapshot to render.
 756            start: The start of the interval to evaluate.
 757            end: The end of the interval to evaluate.
 758            execution_time: The date/time time reference to use for execution time.
 759            limit: A limit applied to the model.
 760        """
 761        snapshot = self.get_snapshot(model_or_snapshot, raise_if_missing=True)
 762
 763        df = self.snapshot_evaluator.evaluate_and_fetch(
 764            snapshot,
 765            start=start,
 766            end=end,
 767            execution_time=execution_time,
 768            snapshots=self.snapshots,
 769            limit=limit or c.DEFAULT_MAX_LIMIT,
 770        )
 771
 772        if df is None:
 773            raise RuntimeError(f"Error evaluating {snapshot.name}")
 774
 775        return df
 776
 777    def format(
 778        self,
 779        transpile: t.Optional[str] = None,
 780        append_newline: t.Optional[bool] = None,
 781        **kwargs: t.Any,
 782    ) -> None:
 783        """Format all SQL models."""
 784        for model in self._models.values():
 785            if not model._path.suffix == ".sql":
 786                continue
 787            with open(model._path, "r+", encoding="utf-8") as file:
 788                expressions = parse(
 789                    file.read(), default_dialect=self.config_for_node(model).dialect
 790                )
 791                if transpile:
 792                    for prop in expressions[0].expressions:
 793                        if prop.name.lower() == "dialect":
 794                            prop.replace(
 795                                exp.Property(
 796                                    this="dialect",
 797                                    value=exp.Literal.string(transpile or model.dialect),
 798                                )
 799                            )
 800                format = self.config_for_node(model).format
 801                opts = {**format.generator_options, **kwargs}
 802                file.seek(0)
 803                file.write(
 804                    format_model_expressions(expressions, transpile or model.dialect, **opts)
 805                )
 806                if append_newline is None:
 807                    append_newline = format.append_newline
 808                if append_newline:
 809                    file.write("\n")
 810                file.truncate()
 811
 812    def plan(
 813        self,
 814        environment: t.Optional[str] = None,
 815        *,
 816        start: t.Optional[TimeLike] = None,
 817        end: t.Optional[TimeLike] = None,
 818        execution_time: t.Optional[TimeLike] = None,
 819        create_from: t.Optional[str] = None,
 820        skip_tests: bool = False,
 821        restate_models: t.Optional[t.Iterable[str]] = None,
 822        no_gaps: bool = False,
 823        skip_backfill: bool = False,
 824        forward_only: t.Optional[bool] = None,
 825        no_prompts: t.Optional[bool] = None,
 826        auto_apply: t.Optional[bool] = None,
 827        no_auto_categorization: t.Optional[bool] = None,
 828        effective_from: t.Optional[TimeLike] = None,
 829        include_unmodified: t.Optional[bool] = None,
 830        select_models: t.Optional[t.Collection[str]] = None,
 831        backfill_models: t.Optional[t.Collection[str]] = None,
 832        categorizer_config: t.Optional[CategorizerConfig] = None,
 833        enable_preview: t.Optional[bool] = None,
 834        no_diff: t.Optional[bool] = None,
 835        run: bool = False,
 836    ) -> Plan:
 837        """Interactively creates a plan.
 838
 839        This method compares the current context with the target environment. It then presents
 840        the differences and asks whether to backfill each modified model.
 841
 842        Args:
 843            environment: The environment to diff and plan against.
 844            start: The start date of the backfill if there is one.
 845            end: The end date of the backfill if there is one.
 846            execution_time: The date/time reference to use for execution time. Defaults to now.
 847            create_from: The environment to create the target environment from if it
 848                doesn't exist. If not specified, the "prod" environment will be used.
 849            skip_tests: Unit tests are run by default so this will skip them if enabled
 850            restate_models: A list of either internal or external models, or tags, that need to be restated
 851                for the given plan interval. If the target environment is a production environment,
 852                ALL snapshots that depended on these upstream tables will have their intervals deleted
 853                (even ones not in this current environment). Only the snapshots in this environment will
 854                be backfilled whereas others need to be recovered on a future plan application. For development
 855                environments only snapshots that are part of this plan will be affected.
 856            no_gaps:  Whether to ensure that new snapshots for models that are already a
 857                part of the target environment have no data gaps when compared against previous
 858                snapshots for same models.
 859            skip_backfill: Whether to skip the backfill step. Default: False.
 860            forward_only: Whether the purpose of the plan is to make forward only changes.
 861            no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that
 862                if this flag is set to true and there are uncategorized changes the plan creation will
 863                fail. Default: False.
 864            auto_apply: Whether to automatically apply the new plan after creation. Default: False.
 865            no_auto_categorization: Indicates whether to disable automatic categorization of model
 866                changes (breaking / non-breaking). If not provided, then the corresponding configuration
 867                option determines the behavior.
 868            categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the
 869                project config by default.
 870            effective_from: The effective date from which to apply forward-only changes on production.
 871            include_unmodified: Indicates whether to include unmodified models in the target development environment.
 872            select_models: A list of model selection strings to filter the models that should be included into this plan.
 873            backfill_models: A list of model selection strings to filter the models for which the data should be backfilled.
 874            enable_preview: Indicates whether to enable preview for forward-only models in development environments.
 875            no_diff: Hide text differences for changed models.
 876            run: Whether to run latest intervals as part of the plan application.
 877
 878        Returns:
 879            The populated Plan object.
 880        """
 881        plan_builder = self.plan_builder(
 882            environment,
 883            start=start,
 884            end=end,
 885            execution_time=execution_time,
 886            create_from=create_from,
 887            skip_tests=skip_tests,
 888            restate_models=restate_models,
 889            no_gaps=no_gaps,
 890            skip_backfill=skip_backfill,
 891            forward_only=forward_only,
 892            no_auto_categorization=no_auto_categorization,
 893            effective_from=effective_from,
 894            include_unmodified=include_unmodified,
 895            select_models=select_models,
 896            backfill_models=backfill_models,
 897            categorizer_config=categorizer_config,
 898            enable_preview=enable_preview,
 899            run=run,
 900        )
 901
 902        self.console.plan(
 903            plan_builder,
 904            auto_apply if auto_apply is not None else self.config.plan.auto_apply,
 905            self.default_catalog,
 906            no_diff=no_diff if no_diff is not None else self.config.plan.no_diff,
 907            no_prompts=no_prompts if no_prompts is not None else self.config.plan.no_prompts,
 908        )
 909
 910        return plan_builder.build()
 911
 912    def plan_builder(
 913        self,
 914        environment: t.Optional[str] = None,
 915        *,
 916        start: t.Optional[TimeLike] = None,
 917        end: t.Optional[TimeLike] = None,
 918        execution_time: t.Optional[TimeLike] = None,
 919        create_from: t.Optional[str] = None,
 920        skip_tests: bool = False,
 921        restate_models: t.Optional[t.Iterable[str]] = None,
 922        no_gaps: bool = False,
 923        skip_backfill: bool = False,
 924        forward_only: t.Optional[bool] = None,
 925        no_auto_categorization: t.Optional[bool] = None,
 926        effective_from: t.Optional[TimeLike] = None,
 927        include_unmodified: t.Optional[bool] = None,
 928        select_models: t.Optional[t.Collection[str]] = None,
 929        backfill_models: t.Optional[t.Collection[str]] = None,
 930        categorizer_config: t.Optional[CategorizerConfig] = None,
 931        enable_preview: t.Optional[bool] = None,
 932        run: bool = False,
 933    ) -> PlanBuilder:
 934        """Creates a plan builder.
 935
 936        Args:
 937            environment: The environment to diff and plan against.
 938            start: The start date of the backfill if there is one.
 939            end: The end date of the backfill if there is one.
 940            execution_time: The date/time reference to use for execution time. Defaults to now.
 941            create_from: The environment to create the target environment from if it
 942                doesn't exist. If not specified, the "prod" environment will be used.
 943            skip_tests: Unit tests are run by default so this will skip them if enabled
 944            restate_models: A list of either internal or external models, or tags, that need to be restated
 945                for the given plan interval. If the target environment is a production environment,
 946                ALL snapshots that depended on these upstream tables will have their intervals deleted
 947                (even ones not in this current environment). Only the snapshots in this environment will
 948                be backfilled whereas others need to be recovered on a future plan application. For development
 949                environments only snapshots that are part of this plan will be affected.
 950            no_gaps:  Whether to ensure that new snapshots for models that are already a
 951                part of the target environment have no data gaps when compared against previous
 952                snapshots for same models.
 953            skip_backfill: Whether to skip the backfill step. Default: False.
 954            forward_only: Whether the purpose of the plan is to make forward only changes.
 955            no_auto_categorization: Indicates whether to disable automatic categorization of model
 956                changes (breaking / non-breaking). If not provided, then the corresponding configuration
 957                option determines the behavior.
 958            categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the
 959                project config by default.
 960            effective_from: The effective date from which to apply forward-only changes on production.
 961            include_unmodified: Indicates whether to include unmodified models in the target development environment.
 962            select_models: A list of model selection strings to filter the models that should be included into this plan.
 963            backfill_models: A list of model selection strings to filter the models for which the data should be backfilled.
 964            enable_preview: Indicates whether to enable preview for forward-only models in development environments.
 965            run: Whether to run latest intervals as part of the plan application.
 966
 967        Returns:
 968            The plan builder.
 969        """
 970        environment = environment or self.config.default_target_environment
 971        environment = Environment.normalize_name(environment)
 972        is_dev = environment != c.PROD
 973
 974        if skip_backfill and not no_gaps and not is_dev:
 975            raise ConfigError(
 976                "When targeting the production environment either the backfill should not be skipped or the lack of data gaps should be enforced (--no-gaps flag)."
 977            )
 978
 979        if run and is_dev:
 980            raise ConfigError("The '--run' flag is only supported for the production environment.")
 981
 982        self._run_plan_tests(skip_tests=skip_tests)
 983
 984        environment_ttl = (
 985            self.environment_ttl if environment not in self.pinned_environments else None
 986        )
 987
 988        model_selector = self._new_selector()
 989
 990        if backfill_models:
 991            backfill_models = model_selector.expand_model_selections(backfill_models)
 992        else:
 993            backfill_models = None
 994
 995        models_override: t.Optional[UniqueKeyDict[str, Model]] = None
 996        if select_models:
 997            models_override = model_selector.select_models(
 998                select_models,
 999                environment,
1000                fallback_env_name=create_from or c.PROD,
1001                ensure_finalized_snapshots=self.config.plan.use_finalized_state,
1002            )
1003            if not backfill_models:
1004                # Only backfill selected models unless explicitly specified.
1005                backfill_models = model_selector.expand_model_selections(select_models)
1006
1007        expanded_restate_models = None
1008        if restate_models is not None:
1009            expanded_restate_models = model_selector.expand_model_selections(restate_models)
1010            if not expanded_restate_models:
1011                self.console.log_error(
1012                    f"Provided restated models do not match any models. No models will be included in plan. Provided: {', '.join(restate_models)}"
1013                )
1014
1015        snapshots = self._snapshots(models_override)
1016        context_diff = self._context_diff(
1017            environment or c.PROD,
1018            snapshots=snapshots,
1019            create_from=create_from,
1020            force_no_diff=(restate_models is not None and not expanded_restate_models)
1021            or (backfill_models is not None and not backfill_models),
1022            ensure_finalized_snapshots=self.config.plan.use_finalized_state,
1023        )
1024
1025        # If no end date is specified, use the max interval end from prod
1026        # to prevent unintended evaluation of the entire DAG.
1027        if not run:
1028            if backfill_models is not None:
1029                # Only consider selected models for the default end value.
1030                models_for_default_end = backfill_models.copy()
1031                for name in backfill_models:
1032                    if name not in snapshots:
1033                        continue
1034                    snapshot = snapshots[name]
1035                    snapshot_id = snapshot.snapshot_id
1036                    if (
1037                        snapshot_id in context_diff.added
1038                        and snapshot_id in context_diff.new_snapshots
1039                    ):
1040                        # If the selected model is a newly added model, then we should narrow down the intervals
1041                        # that should be considered for the default plan end value by including its parents.
1042                        models_for_default_end |= {s.name for s in snapshot.parents}
1043                default_end = self.state_sync.greatest_common_interval_end(
1044                    c.PROD,
1045                    models_for_default_end,
1046                    ensure_finalized_snapshots=self.config.plan.use_finalized_state,
1047                )
1048            else:
1049                default_end = self.state_sync.max_interval_end_for_environment(
1050                    c.PROD, ensure_finalized_snapshots=self.config.plan.use_finalized_state
1051                )
1052        else:
1053            default_end = None
1054
1055        default_start = to_date(default_end) - timedelta(days=1) if default_end and is_dev else None
1056
1057        return PlanBuilder(
1058            context_diff=context_diff,
1059            start=start,
1060            end=end,
1061            execution_time=execution_time,
1062            apply=self.apply,
1063            restate_models=expanded_restate_models,
1064            backfill_models=backfill_models,
1065            no_gaps=no_gaps,
1066            skip_backfill=skip_backfill,
1067            is_dev=is_dev,
1068            forward_only=(
1069                forward_only if forward_only is not None else self.config.plan.forward_only
1070            ),
1071            environment_ttl=environment_ttl,
1072            environment_suffix_target=self.config.environment_suffix_target,
1073            environment_catalog_mapping=self.config.environment_catalog_mapping,
1074            categorizer_config=categorizer_config or self.auto_categorize_changes,
1075            auto_categorization_enabled=not no_auto_categorization,
1076            effective_from=effective_from,
1077            include_unmodified=(
1078                include_unmodified
1079                if include_unmodified is not None
1080                else self.config.plan.include_unmodified
1081            ),
1082            default_start=default_start,
1083            default_end=default_end,
1084            enable_preview=(
1085                enable_preview if enable_preview is not None else self.config.plan.enable_preview
1086            ),
1087            end_bounded=not run,
1088            ensure_finalized_snapshots=self.config.plan.use_finalized_state,
1089        )
1090
1091    def apply(
1092        self,
1093        plan: Plan,
1094        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
1095    ) -> None:
1096        """Applies a plan by pushing snapshots and backfilling data.
1097
1098        Given a plan, it pushes snapshots into the state sync and then uses the scheduler
1099        to backfill all models.
1100
1101        Args:
1102            plan: The plan to apply.
1103            circuit_breaker: An optional handler which checks if the apply should be aborted.
1104        """
1105        if (
1106            not plan.context_diff.has_changes
1107            and not plan.requires_backfill
1108            and not plan.has_unmodified_unpromoted
1109        ):
1110            return
1111        if plan.uncategorized:
1112            raise UncategorizedPlanError("Can't apply a plan with uncategorized changes.")
1113        self.notification_target_manager.notify(
1114            NotificationEvent.APPLY_START,
1115            environment=plan.environment_naming_info.name,
1116            plan_id=plan.plan_id,
1117        )
1118        try:
1119            self._apply(plan, circuit_breaker)
1120        except Exception as e:
1121            self.notification_target_manager.notify(
1122                NotificationEvent.APPLY_FAILURE,
1123                environment=plan.environment_naming_info.name,
1124                plan_id=plan.plan_id,
1125                exc=traceback.format_exc(),
1126            )
1127            logger.error(f"Apply Failure: {traceback.format_exc()}")
1128            raise e
1129        self.notification_target_manager.notify(
1130            NotificationEvent.APPLY_END,
1131            environment=plan.environment_naming_info.name,
1132            plan_id=plan.plan_id,
1133        )
1134
1135    def invalidate_environment(self, name: str, sync: bool = False) -> None:
1136        """Invalidates the target environment by setting its expiration timestamp to now.
1137
1138        Args:
1139            name: The name of the environment to invalidate.
1140            sync: If True, the call blocks until the environment is deleted. Otherwise, the environment will
1141                be deleted asynchronously by the janitor process.
1142        """
1143        self.state_sync.invalidate_environment(name)
1144        if sync:
1145            self._cleanup_environments()
1146            self.console.log_success(f"Environment '{name}' has been deleted.")
1147        else:
1148            self.console.log_success(f"Environment '{name}' has been invalidated.")
1149
1150    def diff(self, environment: t.Optional[str] = None, detailed: bool = False) -> bool:
1151        """Show a diff of the current context with a given environment.
1152
1153        Args:
1154            environment: The environment to diff against.
1155            detailed: Show the actual SQL differences if True.
1156
1157        Returns:
1158            True if there are changes, False otherwise.
1159        """
1160        environment = environment or self.config.default_target_environment
1161        environment = Environment.normalize_name(environment)
1162        context_diff = self._context_diff(environment)
1163        self.console.show_model_difference_summary(
1164            context_diff,
1165            EnvironmentNamingInfo.from_environment_catalog_mapping(
1166                self.config.environment_catalog_mapping,
1167                name=environment,
1168                suffix_target=self.config.environment_suffix_target,
1169            ),
1170            self.default_catalog,
1171            no_diff=not detailed,
1172        )
1173        return context_diff.has_changes
1174
1175    def table_diff(
1176        self,
1177        source: str,
1178        target: str,
1179        on: t.List[str] | exp.Condition | None = None,
1180        model_or_snapshot: t.Optional[ModelOrSnapshot] = None,
1181        where: t.Optional[str | exp.Condition] = None,
1182        limit: int = 20,
1183        show: bool = True,
1184        show_sample: bool = True,
1185    ) -> TableDiff:
1186        """Show a diff between two tables.
1187
1188        Args:
1189            source: The source environment or table.
1190            target: The target environment or table.
1191            on: The join condition, table aliases must be "s" and "t" for source and target.
1192                If omitted, the table's grain will be used.
1193            model_or_snapshot: The model or snapshot to use when environments are passed in.
1194            where: An optional where statement to filter results.
1195            limit: The limit of the sample dataframe.
1196            show: Show the table diff output in the console.
1197            show_sample: Show the sample dataframe in the console. Requires show=True.
1198
1199        Returns:
1200            The TableDiff object containing schema and summary differences.
1201        """
1202        source_alias, target_alias = source, target
1203        if model_or_snapshot:
1204            model = self.get_model(model_or_snapshot, raise_if_missing=True)
1205            source_env = self.state_reader.get_environment(source)
1206            target_env = self.state_reader.get_environment(target)
1207
1208            if not source_env:
1209                raise SQLMeshError(f"Could not find environment '{source}'")
1210            if not target_env:
1211                raise SQLMeshError(f"Could not find environment '{target}')")
1212
1213            source = next(
1214                snapshot for snapshot in source_env.snapshots if snapshot.name == model.fqn
1215            ).table_name()
1216            target = next(
1217                snapshot for snapshot in target_env.snapshots if snapshot.name == model.fqn
1218            ).table_name()
1219            source_alias = source_env.name
1220            target_alias = target_env.name
1221
1222            if not on:
1223                for ref in model.all_references:
1224                    if ref.unique:
1225                        on = ref.columns
1226
1227        if not on:
1228            raise SQLMeshError(
1229                "SQLMesh doesn't know how to join the two tables. Specify the `grains` in each model definition or pass join column names in separate `-o` flags."
1230            )
1231
1232        table_diff = TableDiff(
1233            adapter=self._engine_adapter,
1234            source=source,
1235            target=target,
1236            on=on,
1237            where=where,
1238            source_alias=source_alias,
1239            target_alias=target_alias,
1240            model_name=model.name if model_or_snapshot else None,
1241            limit=limit,
1242        )
1243        if show:
1244            self.console.show_schema_diff(table_diff.schema_diff())
1245            self.console.show_row_diff(table_diff.row_diff(), show_sample=show_sample)
1246        return table_diff
1247
1248    def get_dag(
1249        self, select_models: t.Optional[t.Collection[str]] = None, **options: t.Any
1250    ) -> GraphHTML:
1251        """Gets an HTML object representation of the DAG.
1252
1253        Args:
1254            select_models: A list of model selection strings that should be included in the dag.
1255        Returns:
1256            An html object that renders the dag.
1257        """
1258        dag = (
1259            self.dag.prune(*self._new_selector().expand_model_selections(select_models))
1260            if select_models
1261            else self.dag
1262        )
1263
1264        nodes = {}
1265        edges: t.List[t.Dict] = []
1266
1267        for node, deps in dag.graph.items():
1268            nodes[node] = {
1269                "id": node,
1270                "label": node.split(".")[-1],
1271                "title": f"<span>{node}</span>",
1272            }
1273            edges.extend({"from": d, "to": node} for d in deps)
1274
1275        return GraphHTML(
1276            nodes,
1277            edges,
1278            options={
1279                "height": "100%",
1280                "width": "100%",
1281                "interaction": {},
1282                "layout": {
1283                    "hierarchical": {
1284                        "enabled": True,
1285                        "nodeSpacing": 200,
1286                        "sortMethod": "directed",
1287                    },
1288                },
1289                "nodes": {
1290                    "shape": "box",
1291                },
1292                **options,
1293            },
1294        )
1295
1296    def render_dag(self, path: str, select_models: t.Optional[t.Collection[str]] = None) -> None:
1297        """Render the dag as HTML and save it to a file.
1298
1299        Args:
1300            path: filename to save the dag html to
1301            select_models: A list of model selection strings that should be included in the dag.
1302        """
1303        file_path = Path(path)
1304        suffix = file_path.suffix
1305        if suffix != ".html":
1306            if suffix:
1307                logger.warning(
1308                    f"The extension {suffix} does not designate an html file. A file with a `.html` extension will be created instead."
1309                )
1310            path = str(file_path.with_suffix(".html"))
1311
1312        with open(path, "w", encoding="utf-8") as file:
1313            file.write(str(self.get_dag(select_models)))
1314
1315    def create_test(
1316        self,
1317        model: str,
1318        input_queries: t.Dict[str, str],
1319        overwrite: bool = False,
1320        variables: t.Optional[t.Dict[str, str]] = None,
1321        path: t.Optional[str] = None,
1322        name: t.Optional[str] = None,
1323        include_ctes: bool = False,
1324    ) -> None:
1325        """Generate a unit test fixture for a given model.
1326
1327        Args:
1328            model: The model to test.
1329            input_queries: Mapping of model names to queries. Each model included in this mapping
1330                will be populated in the test based on the results of the corresponding query.
1331            overwrite: Whether to overwrite the existing test in case of a file path collision.
1332                When set to False, an error will be raised if there is such a collision.
1333            variables: Key-value pairs that will define variables needed by the model.
1334            path: The file path corresponding to the fixture, relative to the test directory.
1335                By default, the fixture will be created under the test directory and the file name
1336                will be inferred from the test's name.
1337            name: The name of the test. This is inferred from the model name by default.
1338            include_ctes: When true, CTE fixtures will also be generated.
1339        """
1340        input_queries = {
1341            # The get_model here has two purposes: return normalized names & check for missing deps
1342            self.get_model(dep, raise_if_missing=True).fqn: query
1343            for dep, query in input_queries.items()
1344        }
1345
1346        generate_test(
1347            model=self.get_model(model, raise_if_missing=True),
1348            input_queries=input_queries,
1349            models=self._models,
1350            engine_adapter=self._engine_adapter,
1351            test_engine_adapter=self._test_engine_adapter,
1352            project_path=self.path,
1353            overwrite=overwrite,
1354            variables=variables,
1355            path=path,
1356            name=name,
1357            include_ctes=include_ctes,
1358        )
1359
1360    def test(
1361        self,
1362        match_patterns: t.Optional[t.List[str]] = None,
1363        tests: t.Optional[t.List[str]] = None,
1364        verbose: bool = False,
1365        preserve_fixtures: bool = False,
1366        stream: t.Optional[t.TextIO] = None,
1367    ) -> ModelTextTestResult:
1368        """Discover and run model tests"""
1369        if verbose:
1370            pd.set_option("display.max_columns", None)
1371            verbosity = 2
1372        else:
1373            verbosity = 1
1374
1375        try:
1376            if tests:
1377                result = run_model_tests(
1378                    tests=tests,
1379                    models=self._models,
1380                    engine_adapter=self._test_engine_adapter,
1381                    dialect=self.default_dialect,
1382                    verbosity=verbosity,
1383                    patterns=match_patterns,
1384                    preserve_fixtures=preserve_fixtures,
1385                    default_catalog=self.default_catalog,
1386                )
1387            else:
1388                test_meta = []
1389
1390                for path, config in self.configs.items():
1391                    test_meta.extend(
1392                        get_all_model_tests(
1393                            path / c.TESTS,
1394                            patterns=match_patterns,
1395                            ignore_patterns=config.ignore_patterns,
1396                        )
1397                    )
1398
1399                result = run_tests(
1400                    test_meta,
1401                    models=self._models,
1402                    engine_adapter=self._test_engine_adapter,
1403                    dialect=self.default_dialect,
1404                    verbosity=verbosity,
1405                    preserve_fixtures=preserve_fixtures,
1406                    stream=stream,
1407                    default_catalog=self.default_catalog,
1408                )
1409        finally:
1410            self._test_engine_adapter.close()
1411
1412        return result
1413
1414    def audit(
1415        self,
1416        start: TimeLike,
1417        end: TimeLike,
1418        *,
1419        models: t.Optional[t.Iterator[str]] = None,
1420        execution_time: t.Optional[TimeLike] = None,
1421    ) -> None:
1422        """Audit models.
1423
1424        Args:
1425            start: The start of the interval to audit.
1426            end: The end of the interval to audit.
1427            models: The models to audit. All models will be audited if not specified.
1428            execution_time: The date/time time reference to use for execution time. Defaults to now.
1429        """
1430
1431        snapshots = (
1432            [self.get_snapshot(model, raise_if_missing=True) for model in models]
1433            if models
1434            else self.snapshots.values()
1435        )
1436
1437        num_audits = sum(len(snapshot.audits_with_args) for snapshot in snapshots)
1438        self.console.log_status_update(f"Found {num_audits} audit(s).")
1439        errors = []
1440        skipped_count = 0
1441        for snapshot in snapshots:
1442            for audit_result in self.snapshot_evaluator.audit(
1443                snapshot=snapshot,
1444                start=start,
1445                end=end,
1446                snapshots=self.snapshots,
1447                raise_exception=False,
1448            ):
1449                audit_id = f"{audit_result.audit.name}"
1450                if audit_result.model:
1451                    audit_id += f" on model {audit_result.model.name}"
1452
1453                if audit_result.skipped:
1454                    self.console.log_status_update(f"{audit_id} ⏸️ SKIPPED.")
1455                    skipped_count += 1
1456                elif audit_result.count:
1457                    errors.append(audit_result)
1458                    self.console.log_status_update(
1459                        f"{audit_id} ❌ [red]FAIL [{audit_result.count}][/red]."
1460                    )
1461                else:
1462                    self.console.log_status_update(f"{audit_id} ✅ [green]PASS[/green].")
1463
1464        self.console.log_status_update(
1465            f"\nFinished with {len(errors)} audit error{'' if len(errors) == 1 else 's'} "
1466            f"and {skipped_count} audit{'' if skipped_count == 1 else 's'} skipped."
1467        )
1468        for error in errors:
1469            self.console.log_status_update(
1470                f"\nFailure in audit {error.audit.name} ({error.audit._path})."
1471            )
1472            self.console.log_status_update(f"Got {error.count} results, expected 0.")
1473            if error.query:
1474                self.console.show_sql(
1475                    f"{error.query.sql(dialect=self.snapshot_evaluator.adapter.dialect)}"
1476                )
1477
1478        self.console.log_status_update("Done.")
1479
1480    def rewrite(self, sql: str, dialect: str = "") -> exp.Expression:
1481        """Rewrite a sql expression with semantic references into an executable query.
1482
1483        https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/
1484
1485        Args:
1486            sql: The sql string to rewrite.
1487            dialect: The dialect of the sql string, defaults to the project dialect.
1488
1489        Returns:
1490            A SQLGlot expression with semantic references expanded.
1491        """
1492        return rewrite(
1493            sql,
1494            graph=ReferenceGraph(self.models.values()),
1495            metrics=self._metrics,
1496            dialect=dialect or self.default_dialect,
1497        )
1498
1499    def migrate(self) -> None:
1500        """Migrates SQLMesh to the current running version.
1501
1502        Please contact your SQLMesh administrator before doing this.
1503        """
1504        self.notification_target_manager.notify(NotificationEvent.MIGRATION_START)
1505        try:
1506            self._new_state_sync().migrate(
1507                default_catalog=self.default_catalog,
1508                promoted_snapshots_only=self.config.migration.promoted_snapshots_only,
1509            )
1510        except Exception as e:
1511            self.notification_target_manager.notify(
1512                NotificationEvent.MIGRATION_FAILURE, traceback.format_exc()
1513            )
1514            raise e
1515        self.notification_target_manager.notify(NotificationEvent.MIGRATION_END)
1516
1517    def rollback(self) -> None:
1518        """Rolls back SQLMesh to the previous migration.
1519
1520        Please contact your SQLMesh administrator before doing this. This action cannot be undone.
1521        """
1522        self._new_state_sync().rollback()
1523
1524    def create_external_models(self) -> None:
1525        """Create a schema file with all external models.
1526
1527        The schema file contains all columns and types of external models, allowing for more robust
1528        lineage, validation, and optimizations.
1529        """
1530        if not self._models:
1531            self.load(update_schemas=False)
1532
1533        for path, config in self.configs.items():
1534            create_schema_file(
1535                path=path / c.SCHEMA_YAML,
1536                models=UniqueKeyDict(
1537                    "models",
1538                    {
1539                        fqn: model
1540                        for fqn, model in self._models.items()
1541                        if self.config_for_node(model) is config
1542                    },
1543                ),
1544                adapter=self._engine_adapter,
1545                state_reader=self.state_reader,
1546                dialect=config.model_defaults.dialect,
1547                max_workers=self.concurrent_tasks,
1548            )
1549
1550    def print_info(self) -> None:
1551        """Prints information about connections, models, macros, etc. to the console."""
1552        self.console.log_status_update(f"Models: {len(self.models)}")
1553        self.console.log_status_update(f"Macros: {len(self._macros) - len(macro.get_registry())}")
1554
1555        self._try_connection("data warehouse", self._engine_adapter)
1556
1557        state_connection = self.config.get_state_connection(self.gateway)
1558        if state_connection:
1559            self._try_connection("state backend", state_connection.create_engine_adapter())
1560
1561        self._try_connection("test", self._test_engine_adapter)
1562
1563    def close(self) -> None:
1564        """Releases all resources allocated by this context."""
1565        self.snapshot_evaluator.close()
1566        self.state_sync.close()
1567
1568    def _run(
1569        self,
1570        environment: str,
1571        *,
1572        start: t.Optional[TimeLike],
1573        end: t.Optional[TimeLike],
1574        execution_time: t.Optional[TimeLike],
1575        skip_janitor: bool,
1576        ignore_cron: bool,
1577    ) -> bool:
1578        if not skip_janitor and environment.lower() == c.PROD:
1579            self._run_janitor()
1580
1581        env_check_attempts_num = max(
1582            1,
1583            self.config.run.environment_check_max_wait
1584            // self.config.run.environment_check_interval,
1585        )
1586
1587        def _block_until_finalized() -> str:
1588            for _ in range(env_check_attempts_num):
1589                assert environment is not None  # mypy
1590                environment_state = self.state_sync.get_environment(environment)
1591                if not environment_state:
1592                    raise SQLMeshError(f"Environment '{environment}' was not found.")
1593                if environment_state.finalized_ts:
1594                    return environment_state.plan_id
1595                logger.warning(
1596                    "Environment '%s' is being updated by plan '%s'. Retrying in %s seconds...",
1597                    environment,
1598                    environment_state.plan_id,
1599                    self.config.run.environment_check_interval,
1600                )
1601                time.sleep(self.config.run.environment_check_interval)
1602            raise SQLMeshError(
1603                f"Exceeded the maximum wait time for environment '{environment}' to be ready. "
1604                "This means that the environment either failed to update or the update is taking longer than expected. "
1605                "See https://sqlmesh.readthedocs.io/en/stable/reference/configuration/#run to adjust the timeout settings."
1606            )
1607
1608        done = False
1609        while not done:
1610            plan_id_at_start = _block_until_finalized()
1611
1612            def _has_environment_changed() -> bool:
1613                assert environment is not None  # mypy
1614                current_environment_state = self.state_sync.get_environment(environment)
1615                return (
1616                    not current_environment_state
1617                    or current_environment_state.plan_id != plan_id_at_start
1618                    or not current_environment_state.finalized_ts
1619                )
1620
1621            try:
1622                success = self.scheduler(environment=environment).run(
1623                    environment,
1624                    start=start,
1625                    end=end,
1626                    execution_time=execution_time,
1627                    ignore_cron=ignore_cron,
1628                    circuit_breaker=_has_environment_changed,
1629                )
1630                done = True
1631            except CircuitBreakerError:
1632                logger.warning(
1633                    "Environment '%s' has been modified while running. Restarting the run...",
1634                    environment,
1635                )
1636
1637        return success
1638
1639    def _apply(self, plan: Plan, circuit_breaker: t.Optional[t.Callable[[], bool]]) -> None:
1640        self._scheduler.create_plan_evaluator(self).evaluate(plan, circuit_breaker=circuit_breaker)
1641
1642    def table_name(self, model_name: str, dev: bool) -> str:
1643        """Returns the name of the pysical table for the given model name.
1644
1645        Args:
1646            model_name: The name of the model.
1647            dev: Whether to use the deployability index for the table name.
1648
1649        Returns:
1650            The name of the physical table.
1651        """
1652        deployability_index = (
1653            DeployabilityIndex.create(self.snapshots.values())
1654            if dev
1655            else DeployabilityIndex.all_deployable()
1656        )
1657        snapshot = self.get_snapshot(model_name)
1658        if not snapshot:
1659            raise SQLMeshError(f"Model '{model_name}' was not found.")
1660        return snapshot.table_name(is_deployable=deployability_index.is_deployable(snapshot))
1661
1662    def clear_caches(self) -> None:
1663        for path in self.configs:
1664            rmtree(path / c.CACHE)
1665
1666    def _run_tests(self, verbose: bool = False) -> t.Tuple[unittest.result.TestResult, str]:
1667        test_output_io = StringIO()
1668        result = self.test(stream=test_output_io, verbose=verbose)
1669        return result, test_output_io.getvalue()
1670
1671    def _run_plan_tests(
1672        self, skip_tests: bool = False
1673    ) -> t.Tuple[t.Optional[unittest.result.TestResult], t.Optional[str]]:
1674        if self._test_engine_adapter and not skip_tests:
1675            result, test_output = self._run_tests()
1676            if result.testsRun > 0:
1677                self.console.log_test_results(
1678                    result, test_output, self._test_engine_adapter.dialect
1679                )
1680            if not result.wasSuccessful():
1681                raise PlanError(
1682                    "Cannot generate plan due to failing test(s). Fix test(s) and run again"
1683                )
1684            return result, test_output
1685        return None, None
1686
1687    @property
1688    def _model_tables(self) -> t.Dict[str, str]:
1689        """Mapping of model name to physical table name.
1690
1691        If a snapshot has not been versioned yet, its view name will be returned.
1692        """
1693        return {
1694            fqn: (
1695                snapshot.table_name()
1696                if snapshot.version
1697                else snapshot.qualified_view_name.for_environment(
1698                    EnvironmentNamingInfo.from_environment_catalog_mapping(
1699                        self.config.environment_catalog_mapping,
1700                        name=c.PROD,
1701                        suffix_target=self.config.environment_suffix_target,
1702                    )
1703                )
1704            )
1705            for fqn, snapshot in self.snapshots.items()
1706        }
1707
1708    def _snapshots(
1709        self, models_override: t.Optional[UniqueKeyDict[str, Model]] = None
1710    ) -> t.Dict[str, Snapshot]:
1711        prod = self.state_reader.get_environment(c.PROD)
1712        remote_snapshots = (
1713            {
1714                snapshot.name: snapshot
1715                for snapshot in self.state_reader.get_snapshots(prod.snapshots).values()
1716            }
1717            if prod
1718            else {}
1719        )
1720
1721        local_nodes = {**(models_override or self._models), **self._standalone_audits}
1722        nodes = local_nodes.copy()
1723        audits = self._audits.copy()
1724        projects = {config.project for config in self.configs.values()}
1725
1726        for name, snapshot in remote_snapshots.items():
1727            if name not in nodes and snapshot.node.project not in projects:
1728                nodes[name] = snapshot.node
1729                if snapshot.is_model:
1730                    for audit in snapshot.audits:
1731                        if name not in audits:
1732                            audits[name] = audit
1733
1734        def _nodes_to_snapshots(nodes: t.Dict[str, Node]) -> t.Dict[str, Snapshot]:
1735            snapshots: t.Dict[str, Snapshot] = {}
1736            fingerprint_cache: t.Dict[str, SnapshotFingerprint] = {}
1737
1738            for node in nodes.values():
1739                if node.fqn not in local_nodes and node.fqn in remote_snapshots:
1740                    ttl = remote_snapshots[node.fqn].ttl
1741                else:
1742                    config = self.config_for_node(node)
1743                    ttl = config.snapshot_ttl
1744
1745                snapshot = Snapshot.from_node(
1746                    node,
1747                    nodes=nodes,
1748                    audits=audits,
1749                    cache=fingerprint_cache,
1750                    ttl=ttl,
1751                )
1752                snapshots[snapshot.name] = snapshot
1753            return snapshots
1754
1755        snapshots = _nodes_to_snapshots(nodes)
1756        stored_snapshots = self.state_reader.get_snapshots(snapshots.values())
1757
1758        unrestorable_snapshots = {
1759            snapshot
1760            for snapshot in stored_snapshots.values()
1761            if snapshot.name in local_nodes and snapshot.unrestorable
1762        }
1763        if unrestorable_snapshots:
1764            for snapshot in unrestorable_snapshots:
1765                logger.info(
1766                    "Found a unrestorable snapshot %s. Restamping the model...", snapshot.name
1767                )
1768                node = local_nodes[snapshot.name]
1769                nodes[snapshot.name] = node.copy(
1770                    update={"stamp": f"revert to {snapshot.identifier}"}
1771                )
1772            snapshots = _nodes_to_snapshots(nodes)
1773            stored_snapshots = self.state_reader.get_snapshots(snapshots.values())
1774
1775        for snapshot in stored_snapshots.values():
1776            # Keep the original model instance to preserve the query cache.
1777            snapshot.node = snapshots[snapshot.name].node
1778
1779        return {name: stored_snapshots.get(s.snapshot_id, s) for name, s in snapshots.items()}
1780
1781    def _context_diff(
1782        self,
1783        environment: str,
1784        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
1785        create_from: t.Optional[str] = None,
1786        force_no_diff: bool = False,
1787        ensure_finalized_snapshots: bool = False,
1788    ) -> ContextDiff:
1789        environment = Environment.normalize_name(environment)
1790        if force_no_diff:
1791            return ContextDiff.create_no_diff(environment)
1792        return ContextDiff.create(
1793            environment,
1794            snapshots=snapshots or self.snapshots,
1795            create_from=create_from or c.PROD,
1796            state_reader=self.state_reader,
1797            ensure_finalized_snapshots=ensure_finalized_snapshots,
1798        )
1799
1800    def _run_janitor(self) -> None:
1801        self._cleanup_environments()
1802        expired_snapshots = self.state_sync.delete_expired_snapshots()
1803        self.snapshot_evaluator.cleanup(
1804            expired_snapshots, on_complete=self.console.update_cleanup_progress
1805        )
1806
1807        self.state_sync.compact_intervals()
1808
1809    def _cleanup_environments(self) -> None:
1810        expired_environments = self.state_sync.delete_expired_environments()
1811        cleanup_expired_views(self.engine_adapter, expired_environments, console=self.console)
1812
1813    def _try_connection(self, connection_name: str, engine_adapter: EngineAdapter) -> None:
1814        connection_name = connection_name.capitalize()
1815        try:
1816            engine_adapter.fetchall("SELECT 1")
1817            self.console.log_status_update(f"{connection_name} connection [green]succeeded[/green]")
1818        except Exception as ex:
1819            self.console.log_error(f"{connection_name} connection failed. {ex}")
1820
1821    def _new_state_sync(self) -> StateSync:
1822        return self._provided_state_sync or self._scheduler.create_state_sync(self)
1823
1824    def _new_selector(self) -> Selector:
1825        return Selector(
1826            self.state_reader,
1827            self._models,
1828            context_path=self.path,
1829            default_catalog=self.default_catalog,
1830            dialect=self.default_dialect,
1831        )
1832
1833    def _register_notification_targets(self) -> None:
1834        event_notifications = collections.defaultdict(set)
1835        for target in self.notification_targets:
1836            if target.is_configured:
1837                for event in target.notify_on:
1838                    event_notifications[event].add(target)
1839        user_notification_targets = {
1840            user.username: set(
1841                target for target in user.notification_targets if target.is_configured
1842            )
1843            for user in self.users
1844        }
1845        self.notification_target_manager = NotificationTargetManager(
1846            event_notifications, user_notification_targets, username=self.config.username
1847        )
1848
1849
1850class Context(GenericContext[Config]):
1851    CONFIG_TYPE = Config
class BaseContext(abc.ABC):
131class BaseContext(abc.ABC):
132    """The base context which defines methods to execute a model."""
133
134    @property
135    @abc.abstractmethod
136    def default_dialect(self) -> t.Optional[str]:
137        """Returns the default dialect."""
138
139    @property
140    @abc.abstractmethod
141    def _model_tables(self) -> t.Dict[str, str]:
142        """Returns a mapping of model names to tables."""
143
144    @property
145    @abc.abstractmethod
146    def engine_adapter(self) -> EngineAdapter:
147        """Returns an engine adapter."""
148
149    @property
150    def spark(self) -> t.Optional[PySparkSession]:
151        """Returns the spark session if it exists."""
152        return self.engine_adapter.spark
153
154    @property
155    def default_catalog(self) -> t.Optional[str]:
156        raise NotImplementedError
157
158    def table(self, model_name: str) -> str:
159        """Gets the physical table name for a given model.
160
161        Args:
162            model_name: The model name.
163
164        Returns:
165            The physical table name.
166        """
167        model_name = normalize_model_name(model_name, self.default_catalog, self.default_dialect)
168
169        # We generate SQL for the default dialect because the table name may be used in a
170        # fetchdf call and so the quotes need to be correct (eg. backticks for bigquery)
171        return parse_one(self._model_tables[model_name]).sql(dialect=self.default_dialect)
172
173    def fetchdf(
174        self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False
175    ) -> pd.DataFrame:
176        """Fetches a dataframe given a sql string or sqlglot expression.
177
178        Args:
179            query: SQL string or sqlglot expression.
180            quote_identifiers: Whether to quote all identifiers in the query.
181
182        Returns:
183            The default dataframe is Pandas, but for Spark a PySpark dataframe is returned.
184        """
185        return self.engine_adapter.fetchdf(query, quote_identifiers=quote_identifiers)
186
187    def fetch_pyspark_df(
188        self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False
189    ) -> PySparkDataFrame:
190        """Fetches a PySpark dataframe given a sql string or sqlglot expression.
191
192        Args:
193            query: SQL string or sqlglot expression.
194            quote_identifiers: Whether to quote all identifiers in the query.
195
196        Returns:
197            A PySpark dataframe.
198        """
199        return self.engine_adapter.fetch_pyspark_df(query, quote_identifiers=quote_identifiers)

The base context which defines methods to execute a model.

default_dialect: Union[str, NoneType]

Returns the default dialect.

Returns an engine adapter.

spark: Union[<MagicMock id='139778335663440'>, NoneType]

Returns the spark session if it exists.

def table(self, model_name: str) -> str:
158    def table(self, model_name: str) -> str:
159        """Gets the physical table name for a given model.
160
161        Args:
162            model_name: The model name.
163
164        Returns:
165            The physical table name.
166        """
167        model_name = normalize_model_name(model_name, self.default_catalog, self.default_dialect)
168
169        # We generate SQL for the default dialect because the table name may be used in a
170        # fetchdf call and so the quotes need to be correct (eg. backticks for bigquery)
171        return parse_one(self._model_tables[model_name]).sql(dialect=self.default_dialect)

Gets the physical table name for a given model.

Arguments:
  • model_name: The model name.
Returns:

The physical table name.

def fetchdf( self, query: Union[sqlglot.expressions.Expression, str], quote_identifiers: bool = False) -> pandas.core.frame.DataFrame:
173    def fetchdf(
174        self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False
175    ) -> pd.DataFrame:
176        """Fetches a dataframe given a sql string or sqlglot expression.
177
178        Args:
179            query: SQL string or sqlglot expression.
180            quote_identifiers: Whether to quote all identifiers in the query.
181
182        Returns:
183            The default dataframe is Pandas, but for Spark a PySpark dataframe is returned.
184        """
185        return self.engine_adapter.fetchdf(query, quote_identifiers=quote_identifiers)

Fetches a dataframe given a sql string or sqlglot expression.

Arguments:
  • query: SQL string or sqlglot expression.
  • quote_identifiers: Whether to quote all identifiers in the query.
Returns:

The default dataframe is Pandas, but for Spark a PySpark dataframe is returned.

def fetch_pyspark_df( self, query: Union[sqlglot.expressions.Expression, str], quote_identifiers: bool = False) -> <MagicMock id='139778335617184'>:
187    def fetch_pyspark_df(
188        self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False
189    ) -> PySparkDataFrame:
190        """Fetches a PySpark dataframe given a sql string or sqlglot expression.
191
192        Args:
193            query: SQL string or sqlglot expression.
194            quote_identifiers: Whether to quote all identifiers in the query.
195
196        Returns:
197            A PySpark dataframe.
198        """
199        return self.engine_adapter.fetch_pyspark_df(query, quote_identifiers=quote_identifiers)

Fetches a PySpark dataframe given a sql string or sqlglot expression.

Arguments:
  • query: SQL string or sqlglot expression.
  • quote_identifiers: Whether to quote all identifiers in the query.
Returns:

A PySpark dataframe.

class ExecutionContext(BaseContext):
202class ExecutionContext(BaseContext):
203    """The minimal context needed to execute a model.
204
205    Args:
206        engine_adapter: The engine adapter to execute queries against.
207        snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations.
208        deployability_index: Determines snapshots that are deployable in the context of this evaluation.
209    """
210
211    def __init__(
212        self,
213        engine_adapter: EngineAdapter,
214        snapshots: t.Dict[str, Snapshot],
215        deployability_index: t.Optional[DeployabilityIndex] = None,
216        default_dialect: t.Optional[str] = None,
217        default_catalog: t.Optional[str] = None,
218        variables: t.Optional[t.Dict[str, t.Any]] = None,
219    ):
220        self.snapshots = snapshots
221        self.deployability_index = deployability_index
222        self._engine_adapter = engine_adapter
223        self._default_catalog = default_catalog
224        self._default_dialect = default_dialect
225        self._variables = variables or {}
226
227    @property
228    def default_dialect(self) -> t.Optional[str]:
229        return self._default_dialect
230
231    @property
232    def engine_adapter(self) -> EngineAdapter:
233        """Returns an engine adapter."""
234        return self._engine_adapter
235
236    @cached_property
237    def _model_tables(self) -> t.Dict[str, str]:
238        """Returns a mapping of model names to tables."""
239        return to_table_mapping(self.snapshots.values(), self.deployability_index)
240
241    @property
242    def default_catalog(self) -> t.Optional[str]:
243        return self._default_catalog
244
245    @property
246    def gateway(self) -> t.Optional[str]:
247        """Returns the gateway name."""
248        return self.var(c.GATEWAY)
249
250    def var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]:
251        """Returns a variable value."""
252        return self._variables.get(var_name.lower(), default)
253
254    def with_variables(self, variables: t.Dict[str, t.Any]) -> ExecutionContext:
255        """Returns a new ExecutionContext with additional variables."""
256        return ExecutionContext(
257            self._engine_adapter,
258            self.snapshots,
259            self.deployability_index,
260            self._default_dialect,
261            self._default_catalog,
262            variables=variables,
263        )

The minimal context needed to execute a model.

Arguments:
  • engine_adapter: The engine adapter to execute queries against.
  • snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations.
  • deployability_index: Determines snapshots that are deployable in the context of this evaluation.
ExecutionContext( engine_adapter: sqlmesh.core.engine_adapter.base.EngineAdapter, snapshots: Dict[str, sqlmesh.core.snapshot.definition.Snapshot], deployability_index: Union[sqlmesh.core.snapshot.definition.DeployabilityIndex, NoneType] = None, default_dialect: Union[str, NoneType] = None, default_catalog: Union[str, NoneType] = None, variables: Union[Dict[str, Any], NoneType] = None)
211    def __init__(
212        self,
213        engine_adapter: EngineAdapter,
214        snapshots: t.Dict[str, Snapshot],
215        deployability_index: t.Optional[DeployabilityIndex] = None,
216        default_dialect: t.Optional[str] = None,
217        default_catalog: t.Optional[str] = None,
218        variables: t.Optional[t.Dict[str, t.Any]] = None,
219    ):
220        self.snapshots = snapshots
221        self.deployability_index = deployability_index
222        self._engine_adapter = engine_adapter
223        self._default_catalog = default_catalog
224        self._default_dialect = default_dialect
225        self._variables = variables or {}
default_dialect: Union[str, NoneType]

Returns the default dialect.

Returns an engine adapter.

gateway: Union[str, NoneType]

Returns the gateway name.

def var( self, var_name: str, default: Union[Any, NoneType] = None) -> Union[Any, NoneType]:
250    def var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]:
251        """Returns a variable value."""
252        return self._variables.get(var_name.lower(), default)

Returns a variable value.

def with_variables(self, variables: Dict[str, Any]) -> sqlmesh.core.context.ExecutionContext:
254    def with_variables(self, variables: t.Dict[str, t.Any]) -> ExecutionContext:
255        """Returns a new ExecutionContext with additional variables."""
256        return ExecutionContext(
257            self._engine_adapter,
258            self.snapshots,
259            self.deployability_index,
260            self._default_dialect,
261            self._default_catalog,
262            variables=variables,
263        )

Returns a new ExecutionContext with additional variables.

class GenericContext(BaseContext, typing.Generic[~C]):
 266class GenericContext(BaseContext, t.Generic[C]):
 267    """Encapsulates a SQLMesh environment supplying convenient functions to perform various tasks.
 268
 269    Args:
 270        engine_adapter: The default engine adapter to use.
 271        notification_targets: The notification target to use. Defaults to what is defined in config.
 272        paths: The directories containing SQLMesh files.
 273        config: A Config object or the name of a Config object in config.py.
 274        connection: The name of the connection. If not specified the first connection as it appears
 275            in configuration will be used.
 276        test_connection: The name of the connection to use for tests. If not specified the first
 277            connection as it appears in configuration will be used.
 278        concurrent_tasks: The maximum number of tasks that can use the connection concurrently.
 279        load: Whether or not to automatically load all models and macros (default True).
 280        console: The rich instance used for printing out CLI command results.
 281        users: A list of users to make known to SQLMesh.
 282        config_type: The type of config object to use (default Config).
 283    """
 284
 285    CONFIG_TYPE: t.Type[C]
 286
 287    def __init__(
 288        self,
 289        engine_adapter: t.Optional[EngineAdapter] = None,
 290        notification_targets: t.Optional[t.List[NotificationTarget]] = None,
 291        state_sync: t.Optional[StateSync] = None,
 292        paths: t.Union[str | Path, t.Iterable[str | Path]] = "",
 293        config: t.Optional[t.Union[C, str, t.Dict[Path, C]]] = None,
 294        gateway: t.Optional[str] = None,
 295        concurrent_tasks: t.Optional[int] = None,
 296        loader: t.Optional[t.Type[Loader]] = None,
 297        load: bool = True,
 298        console: t.Optional[Console] = None,
 299        users: t.Optional[t.List[User]] = None,
 300    ):
 301        self.console = console or get_console()
 302        self.configs = (
 303            config if isinstance(config, dict) else load_configs(config, self.CONFIG_TYPE, paths)
 304        )
 305        self.dag: DAG[str] = DAG()
 306        self._models: UniqueKeyDict[str, Model] = UniqueKeyDict("models")
 307        self._audits: UniqueKeyDict[str, Audit] = UniqueKeyDict("audits")
 308        self._standalone_audits: UniqueKeyDict[str, StandaloneAudit] = UniqueKeyDict(
 309            "standaloneaudits"
 310        )
 311        self._macros: UniqueKeyDict[str, ExecutableOrMacro] = UniqueKeyDict("macros")
 312        self._metrics: UniqueKeyDict[str, Metric] = UniqueKeyDict("metrics")
 313        self._jinja_macros = JinjaMacroRegistry()
 314        self._default_catalog: t.Optional[str] = None
 315
 316        self.path, self.config = t.cast(t.Tuple[Path, C], next(iter(self.configs.items())))
 317
 318        self.gateway = gateway
 319        self._scheduler = self.config.get_scheduler(self.gateway)
 320        self.environment_ttl = self.config.environment_ttl
 321        self.pinned_environments = Environment.normalize_names(self.config.pinned_environments)
 322        self.auto_categorize_changes = self.config.plan.auto_categorize_changes
 323
 324        self._connection_config = self.config.get_connection(self.gateway)
 325        self.concurrent_tasks = concurrent_tasks or self._connection_config.concurrent_tasks
 326        self._engine_adapter = engine_adapter or self._connection_config.create_engine_adapter()
 327
 328        test_connection_config = self.config.get_test_connection(
 329            self.gateway, self.default_catalog, default_catalog_dialect=self.engine_adapter.DIALECT
 330        )
 331        self._test_engine_adapter = test_connection_config.create_engine_adapter(
 332            register_comments_override=False
 333        )
 334
 335        self._snapshot_evaluator: t.Optional[SnapshotEvaluator] = None
 336
 337        self._provided_state_sync: t.Optional[StateSync] = state_sync
 338        self._state_sync: t.Optional[StateSync] = None
 339
 340        self._loader = (loader or self.config.loader)(**self.config.loader_kwargs)
 341
 342        # Should we dedupe notification_targets? If so how?
 343        self.notification_targets = (notification_targets or []) + self.config.notification_targets
 344        self.users = (users or []) + self.config.users
 345        self.users = list({user.username: user for user in self.users}.values())
 346        self._register_notification_targets()
 347
 348        if (
 349            self.config.environment_catalog_mapping
 350            and not self.engine_adapter.CATALOG_SUPPORT.is_multi_catalog_supported
 351        ):
 352            raise SQLMeshError(
 353                "Environment catalog mapping is only supported for engine adapters that support multiple catalogs"
 354            )
 355
 356        if load:
 357            self.load()
 358
 359    @property
 360    def default_dialect(self) -> t.Optional[str]:
 361        return self.config.dialect
 362
 363    @property
 364    def engine_adapter(self) -> EngineAdapter:
 365        """Returns an engine adapter."""
 366        return self._engine_adapter
 367
 368    @property
 369    def snapshot_evaluator(self) -> SnapshotEvaluator:
 370        if not self._snapshot_evaluator:
 371            self._snapshot_evaluator = SnapshotEvaluator(
 372                self.engine_adapter.with_log_level(logging.INFO),
 373                ddl_concurrent_tasks=self.concurrent_tasks,
 374            )
 375        return self._snapshot_evaluator
 376
 377    def execution_context(
 378        self, deployability_index: t.Optional[DeployabilityIndex] = None
 379    ) -> ExecutionContext:
 380        """Returns an execution context."""
 381        return ExecutionContext(
 382            engine_adapter=self._engine_adapter,
 383            snapshots=self.snapshots,
 384            deployability_index=deployability_index,
 385            default_dialect=self.default_dialect,
 386            default_catalog=self.default_catalog,
 387        )
 388
 389    def upsert_model(self, model: t.Union[str, Model], **kwargs: t.Any) -> Model:
 390        """Update or insert a model.
 391
 392        The context's models dictionary will be updated to include these changes.
 393
 394        Args:
 395            model: Model name or instance to update.
 396            kwargs: The kwargs to update the model with.
 397
 398        Returns:
 399            A new instance of the updated or inserted model.
 400        """
 401        model = self.get_model(model, raise_if_missing=True)
 402        path = model._path
 403
 404        # model.copy() can't be used here due to a cached state that can be a part of a model instance.
 405        model = t.cast(Model, type(model)(**{**t.cast(Model, model).dict(), **kwargs}))
 406        model._path = path
 407
 408        self._models.update({model.fqn: model})
 409        self.dag.add(model.fqn, model.depends_on)
 410        update_model_schemas(
 411            self.dag,
 412            self._models,
 413            self.path,
 414        )
 415
 416        model.validate_definition()
 417
 418        return model
 419
 420    def scheduler(self, environment: t.Optional[str] = None) -> Scheduler:
 421        """Returns the built-in scheduler.
 422
 423        Args:
 424            environment: The target environment to source model snapshots from, or None
 425                if snapshots should be sourced from the currently loaded local state.
 426
 427        Returns:
 428            The built-in scheduler instance.
 429        """
 430        snapshots: t.Iterable[Snapshot]
 431        if environment is not None:
 432            stored_environment = self.state_sync.get_environment(environment)
 433            if stored_environment is None:
 434                raise ConfigError(f"Environment '{environment}' was not found.")
 435            snapshots = self.state_sync.get_snapshots(stored_environment.snapshots).values()
 436        else:
 437            snapshots = self.snapshots.values()
 438
 439        if not snapshots:
 440            raise ConfigError("No models were found")
 441
 442        return Scheduler(
 443            snapshots,
 444            self.snapshot_evaluator,
 445            self.state_sync,
 446            default_catalog=self.default_catalog,
 447            max_workers=self.concurrent_tasks,
 448            console=self.console,
 449            notification_target_manager=self.notification_target_manager,
 450        )
 451
 452    @property
 453    def state_sync(self) -> StateSync:
 454        if not self._state_sync:
 455            self._state_sync = self._new_state_sync()
 456
 457            if self._state_sync.get_versions(validate=False).schema_version == 0:
 458                self._state_sync.migrate(default_catalog=self.default_catalog)
 459            self._state_sync.get_versions()
 460            self._state_sync = CachingStateSync(self._state_sync)  # type: ignore
 461        return self._state_sync
 462
 463    @property
 464    def state_reader(self) -> StateReader:
 465        return self.state_sync
 466
 467    def refresh(self) -> None:
 468        """Refresh all models that have been updated."""
 469        if self._loader.reload_needed():
 470            self.load()
 471
 472    def load(self, update_schemas: bool = True) -> GenericContext[C]:
 473        """Load all files in the context's path."""
 474        with sys_path(*self.configs):
 475            gc.disable()
 476            project = self._loader.load(self, update_schemas)
 477            self._macros = project.macros
 478            self._jinja_macros = project.jinja_macros
 479            self._models = project.models
 480            self._metrics = project.metrics
 481            self._standalone_audits.clear()
 482            self._audits.clear()
 483            for name, audit in project.audits.items():
 484                if isinstance(audit, StandaloneAudit):
 485                    self._standalone_audits[name] = audit
 486                else:
 487                    self._audits[name] = audit
 488            self.dag = project.dag
 489            gc.enable()
 490
 491            duplicates = set(self._models) & set(self._standalone_audits)
 492            if duplicates:
 493                raise ConfigError(
 494                    f"Models and Standalone audits cannot have the same name: {duplicates}"
 495                )
 496
 497        return self
 498
 499    def run(
 500        self,
 501        environment: t.Optional[str] = None,
 502        *,
 503        start: t.Optional[TimeLike] = None,
 504        end: t.Optional[TimeLike] = None,
 505        execution_time: t.Optional[TimeLike] = None,
 506        skip_janitor: bool = False,
 507        ignore_cron: bool = False,
 508    ) -> bool:
 509        """Run the entire dag through the scheduler.
 510
 511        Args:
 512            environment: The target environment to source model snapshots from and virtually update. Default: prod.
 513            start: The start of the interval to render.
 514            end: The end of the interval to render.
 515            execution_time: The date/time time reference to use for execution time. Defaults to now.
 516            skip_janitor: Whether to skip the janitor task.
 517            ignore_cron: Whether to ignore the model's cron schedule and run all available missing intervals.
 518
 519        Returns:
 520            True if the run was successful, False otherwise.
 521        """
 522        environment = environment or self.config.default_target_environment
 523
 524        self.notification_target_manager.notify(
 525            NotificationEvent.RUN_START, environment=environment
 526        )
 527        success = False
 528        try:
 529            success = self._run(
 530                environment=environment,
 531                start=start,
 532                end=end,
 533                execution_time=execution_time,
 534                skip_janitor=skip_janitor,
 535                ignore_cron=ignore_cron,
 536            )
 537        except Exception as e:
 538            self.notification_target_manager.notify(
 539                NotificationEvent.RUN_FAILURE, traceback.format_exc()
 540            )
 541            logger.error(f"Run Failure: {traceback.format_exc()}")
 542            raise e
 543
 544        if success:
 545            self.notification_target_manager.notify(
 546                NotificationEvent.RUN_END, environment=environment
 547            )
 548            self.console.log_success(f"Run finished for environment '{environment}'")
 549        else:
 550            self.notification_target_manager.notify(
 551                NotificationEvent.RUN_FAILURE, "See console logs for details."
 552            )
 553
 554        return success
 555
 556    @t.overload
 557    def get_model(
 558        self, model_or_snapshot: ModelOrSnapshot, raise_if_missing: Literal[True] = True
 559    ) -> Model: ...
 560
 561    @t.overload
 562    def get_model(
 563        self,
 564        model_or_snapshot: ModelOrSnapshot,
 565        raise_if_missing: Literal[False] = False,
 566    ) -> t.Optional[Model]: ...
 567
 568    def get_model(
 569        self, model_or_snapshot: ModelOrSnapshot, raise_if_missing: bool = False
 570    ) -> t.Optional[Model]:
 571        """Returns a model with the given name or None if a model with such name doesn't exist.
 572
 573        Args:
 574            model_or_snapshot: A model name, model, or snapshot.
 575            raise_if_missing: Raises an error if a model is not found.
 576
 577        Returns:
 578            The expected model.
 579        """
 580        if isinstance(model_or_snapshot, str):
 581            normalized_name = normalize_model_name(
 582                model_or_snapshot,
 583                dialect=self.default_dialect,
 584                default_catalog=self.default_catalog,
 585            )
 586            model = self._models.get(normalized_name)
 587        elif isinstance(model_or_snapshot, Snapshot):
 588            model = model_or_snapshot.model
 589        else:
 590            model = model_or_snapshot
 591
 592        if raise_if_missing and not model:
 593            raise SQLMeshError(f"Cannot find model for '{model_or_snapshot}'")
 594
 595        return model
 596
 597    @t.overload
 598    def get_snapshot(self, node_or_snapshot: NodeOrSnapshot) -> t.Optional[Snapshot]: ...
 599
 600    @t.overload
 601    def get_snapshot(
 602        self, node_or_snapshot: NodeOrSnapshot, raise_if_missing: Literal[True]
 603    ) -> Snapshot: ...
 604
 605    @t.overload
 606    def get_snapshot(
 607        self, node_or_snapshot: NodeOrSnapshot, raise_if_missing: Literal[False]
 608    ) -> t.Optional[Snapshot]: ...
 609
 610    def get_snapshot(
 611        self, node_or_snapshot: NodeOrSnapshot, raise_if_missing: bool = False
 612    ) -> t.Optional[Snapshot]:
 613        """Returns a snapshot with the given name or None if a snapshot with such name doesn't exist.
 614
 615        Args:
 616            node_or_snapshot: A node name, node, or snapshot.
 617            raise_if_missing: Raises an error if a snapshot is not found.
 618
 619        Returns:
 620            The expected snapshot.
 621        """
 622        if isinstance(node_or_snapshot, Snapshot):
 623            return node_or_snapshot
 624        if isinstance(node_or_snapshot, str) and not self.standalone_audits.get(node_or_snapshot):
 625            node_or_snapshot = normalize_model_name(
 626                node_or_snapshot,
 627                dialect=self.default_dialect,
 628                default_catalog=self.default_catalog,
 629            )
 630        fqn = node_or_snapshot if isinstance(node_or_snapshot, str) else node_or_snapshot.fqn
 631        snapshot = self.snapshots.get(fqn)
 632
 633        if raise_if_missing and not snapshot:
 634            raise SQLMeshError(f"Cannot find snapshot for '{fqn}'")
 635
 636        return snapshot
 637
 638    def config_for_path(self, path: Path) -> Config:
 639        for config_path, config in self.configs.items():
 640            try:
 641                path.relative_to(config_path)
 642                return config
 643            except ValueError:
 644                pass
 645        return self.config
 646
 647    def config_for_node(self, node: str | Model | StandaloneAudit) -> Config:
 648        if isinstance(node, str):
 649            return self.config_for_path(self.get_snapshot(node, raise_if_missing=True).node._path)  # type: ignore
 650        return self.config_for_path(node._path)  # type: ignore
 651
 652    @property
 653    def models(self) -> MappingProxyType[str, Model]:
 654        """Returns all registered models in this context."""
 655        return MappingProxyType(self._models)
 656
 657    @property
 658    def metrics(self) -> MappingProxyType[str, Metric]:
 659        """Returns all registered metrics in this context."""
 660        return MappingProxyType(self._metrics)
 661
 662    @property
 663    def standalone_audits(self) -> MappingProxyType[str, StandaloneAudit]:
 664        """Returns all registered standalone audits in this context."""
 665        return MappingProxyType(self._standalone_audits)
 666
 667    @property
 668    def snapshots(self) -> t.Dict[str, Snapshot]:
 669        """Generates and returns snapshots based on models registered in this context.
 670
 671        If one of the snapshots has been previously stored in the persisted state, the stored
 672        instance will be returned.
 673        """
 674        return self._snapshots()
 675
 676    @property
 677    def default_catalog(self) -> t.Optional[str]:
 678        if self._default_catalog is None:
 679            self._default_catalog = self._scheduler.get_default_catalog(self)
 680        return self._default_catalog
 681
 682    def render(
 683        self,
 684        model_or_snapshot: ModelOrSnapshot,
 685        *,
 686        start: t.Optional[TimeLike] = None,
 687        end: t.Optional[TimeLike] = None,
 688        execution_time: t.Optional[TimeLike] = None,
 689        expand: t.Union[bool, t.Iterable[str]] = False,
 690        **kwargs: t.Any,
 691    ) -> exp.Expression:
 692        """Renders a model's query, expanding macros with provided kwargs, and optionally expanding referenced models.
 693
 694        Args:
 695            model_or_snapshot: The model, model name, or snapshot to render.
 696            start: The start of the interval to render.
 697            end: The end of the interval to render.
 698            execution_time: The date/time time reference to use for execution time. Defaults to now.
 699            expand: Whether or not to use expand materialized models, defaults to False.
 700                If True, all referenced models are expanded as raw queries.
 701                If a list, only referenced models are expanded as raw queries.
 702
 703        Returns:
 704            The rendered expression.
 705        """
 706        execution_time = execution_time or now_ds()
 707
 708        model = self.get_model(model_or_snapshot, raise_if_missing=True)
 709
 710        if expand and not isinstance(expand, bool):
 711            expand = {
 712                normalize_model_name(
 713                    x, default_catalog=self.default_catalog, dialect=self.default_dialect
 714                )
 715                for x in expand
 716            }
 717
 718        expand = self.dag.upstream(model.fqn) if expand is True else expand or []
 719
 720        if model.is_seed:
 721            df = next(
 722                model.render(
 723                    context=self.execution_context(),
 724                    start=start,
 725                    end=end,
 726                    execution_time=execution_time,
 727                    **kwargs,
 728                )
 729            )
 730            return next(pandas_to_sql(t.cast(pd.DataFrame, df), model.columns_to_types))
 731
 732        return model.render_query_or_raise(
 733            start=start,
 734            end=end,
 735            execution_time=execution_time,
 736            snapshots=self.snapshots,
 737            expand=expand,
 738            engine_adapter=self.engine_adapter,
 739            **kwargs,
 740        )
 741
 742    def evaluate(
 743        self,
 744        model_or_snapshot: ModelOrSnapshot,
 745        start: TimeLike,
 746        end: TimeLike,
 747        execution_time: TimeLike,
 748        limit: t.Optional[int] = None,
 749        **kwargs: t.Any,
 750    ) -> DF:
 751        """Evaluate a model or snapshot (running its query against a DB/Engine).
 752
 753        This method is used to test or iterate on models without side effects.
 754
 755        Args:
 756            model_or_snapshot: The model, model name, or snapshot to render.
 757            start: The start of the interval to evaluate.
 758            end: The end of the interval to evaluate.
 759            execution_time: The date/time time reference to use for execution time.
 760            limit: A limit applied to the model.
 761        """
 762        snapshot = self.get_snapshot(model_or_snapshot, raise_if_missing=True)
 763
 764        df = self.snapshot_evaluator.evaluate_and_fetch(
 765            snapshot,
 766            start=start,
 767            end=end,
 768            execution_time=execution_time,
 769            snapshots=self.snapshots,
 770            limit=limit or c.DEFAULT_MAX_LIMIT,
 771        )
 772
 773        if df is None:
 774            raise RuntimeError(f"Error evaluating {snapshot.name}")
 775
 776        return df
 777
 778    def format(
 779        self,
 780        transpile: t.Optional[str] = None,
 781        append_newline: t.Optional[bool] = None,
 782        **kwargs: t.Any,
 783    ) -> None:
 784        """Format all SQL models."""
 785        for model in self._models.values():
 786            if not model._path.suffix == ".sql":
 787                continue
 788            with open(model._path, "r+", encoding="utf-8") as file:
 789                expressions = parse(
 790                    file.read(), default_dialect=self.config_for_node(model).dialect
 791                )
 792                if transpile:
 793                    for prop in expressions[0].expressions:
 794                        if prop.name.lower() == "dialect":
 795                            prop.replace(
 796                                exp.Property(
 797                                    this="dialect",
 798                                    value=exp.Literal.string(transpile or model.dialect),
 799                                )
 800                            )
 801                format = self.config_for_node(model).format
 802                opts = {**format.generator_options, **kwargs}
 803                file.seek(0)
 804                file.write(
 805                    format_model_expressions(expressions, transpile or model.dialect, **opts)
 806                )
 807                if append_newline is None:
 808                    append_newline = format.append_newline
 809                if append_newline:
 810                    file.write("\n")
 811                file.truncate()
 812
 813    def plan(
 814        self,
 815        environment: t.Optional[str] = None,
 816        *,
 817        start: t.Optional[TimeLike] = None,
 818        end: t.Optional[TimeLike] = None,
 819        execution_time: t.Optional[TimeLike] = None,
 820        create_from: t.Optional[str] = None,
 821        skip_tests: bool = False,
 822        restate_models: t.Optional[t.Iterable[str]] = None,
 823        no_gaps: bool = False,
 824        skip_backfill: bool = False,
 825        forward_only: t.Optional[bool] = None,
 826        no_prompts: t.Optional[bool] = None,
 827        auto_apply: t.Optional[bool] = None,
 828        no_auto_categorization: t.Optional[bool] = None,
 829        effective_from: t.Optional[TimeLike] = None,
 830        include_unmodified: t.Optional[bool] = None,
 831        select_models: t.Optional[t.Collection[str]] = None,
 832        backfill_models: t.Optional[t.Collection[str]] = None,
 833        categorizer_config: t.Optional[CategorizerConfig] = None,
 834        enable_preview: t.Optional[bool] = None,
 835        no_diff: t.Optional[bool] = None,
 836        run: bool = False,
 837    ) -> Plan:
 838        """Interactively creates a plan.
 839
 840        This method compares the current context with the target environment. It then presents
 841        the differences and asks whether to backfill each modified model.
 842
 843        Args:
 844            environment: The environment to diff and plan against.
 845            start: The start date of the backfill if there is one.
 846            end: The end date of the backfill if there is one.
 847            execution_time: The date/time reference to use for execution time. Defaults to now.
 848            create_from: The environment to create the target environment from if it
 849                doesn't exist. If not specified, the "prod" environment will be used.
 850            skip_tests: Unit tests are run by default so this will skip them if enabled
 851            restate_models: A list of either internal or external models, or tags, that need to be restated
 852                for the given plan interval. If the target environment is a production environment,
 853                ALL snapshots that depended on these upstream tables will have their intervals deleted
 854                (even ones not in this current environment). Only the snapshots in this environment will
 855                be backfilled whereas others need to be recovered on a future plan application. For development
 856                environments only snapshots that are part of this plan will be affected.
 857            no_gaps:  Whether to ensure that new snapshots for models that are already a
 858                part of the target environment have no data gaps when compared against previous
 859                snapshots for same models.
 860            skip_backfill: Whether to skip the backfill step. Default: False.
 861            forward_only: Whether the purpose of the plan is to make forward only changes.
 862            no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that
 863                if this flag is set to true and there are uncategorized changes the plan creation will
 864                fail. Default: False.
 865            auto_apply: Whether to automatically apply the new plan after creation. Default: False.
 866            no_auto_categorization: Indicates whether to disable automatic categorization of model
 867                changes (breaking / non-breaking). If not provided, then the corresponding configuration
 868                option determines the behavior.
 869            categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the
 870                project config by default.
 871            effective_from: The effective date from which to apply forward-only changes on production.
 872            include_unmodified: Indicates whether to include unmodified models in the target development environment.
 873            select_models: A list of model selection strings to filter the models that should be included into this plan.
 874            backfill_models: A list of model selection strings to filter the models for which the data should be backfilled.
 875            enable_preview: Indicates whether to enable preview for forward-only models in development environments.
 876            no_diff: Hide text differences for changed models.
 877            run: Whether to run latest intervals as part of the plan application.
 878
 879        Returns:
 880            The populated Plan object.
 881        """
 882        plan_builder = self.plan_builder(
 883            environment,
 884            start=start,
 885            end=end,
 886            execution_time=execution_time,
 887            create_from=create_from,
 888            skip_tests=skip_tests,
 889            restate_models=restate_models,
 890            no_gaps=no_gaps,
 891            skip_backfill=skip_backfill,
 892            forward_only=forward_only,
 893            no_auto_categorization=no_auto_categorization,
 894            effective_from=effective_from,
 895            include_unmodified=include_unmodified,
 896            select_models=select_models,
 897            backfill_models=backfill_models,
 898            categorizer_config=categorizer_config,
 899            enable_preview=enable_preview,
 900            run=run,
 901        )
 902
 903        self.console.plan(
 904            plan_builder,
 905            auto_apply if auto_apply is not None else self.config.plan.auto_apply,
 906            self.default_catalog,
 907            no_diff=no_diff if no_diff is not None else self.config.plan.no_diff,
 908            no_prompts=no_prompts if no_prompts is not None else self.config.plan.no_prompts,
 909        )
 910
 911        return plan_builder.build()
 912
 913    def plan_builder(
 914        self,
 915        environment: t.Optional[str] = None,
 916        *,
 917        start: t.Optional[TimeLike] = None,
 918        end: t.Optional[TimeLike] = None,
 919        execution_time: t.Optional[TimeLike] = None,
 920        create_from: t.Optional[str] = None,
 921        skip_tests: bool = False,
 922        restate_models: t.Optional[t.Iterable[str]] = None,
 923        no_gaps: bool = False,
 924        skip_backfill: bool = False,
 925        forward_only: t.Optional[bool] = None,
 926        no_auto_categorization: t.Optional[bool] = None,
 927        effective_from: t.Optional[TimeLike] = None,
 928        include_unmodified: t.Optional[bool] = None,
 929        select_models: t.Optional[t.Collection[str]] = None,
 930        backfill_models: t.Optional[t.Collection[str]] = None,
 931        categorizer_config: t.Optional[CategorizerConfig] = None,
 932        enable_preview: t.Optional[bool] = None,
 933        run: bool = False,
 934    ) -> PlanBuilder:
 935        """Creates a plan builder.
 936
 937        Args:
 938            environment: The environment to diff and plan against.
 939            start: The start date of the backfill if there is one.
 940            end: The end date of the backfill if there is one.
 941            execution_time: The date/time reference to use for execution time. Defaults to now.
 942            create_from: The environment to create the target environment from if it
 943                doesn't exist. If not specified, the "prod" environment will be used.
 944            skip_tests: Unit tests are run by default so this will skip them if enabled
 945            restate_models: A list of either internal or external models, or tags, that need to be restated
 946                for the given plan interval. If the target environment is a production environment,
 947                ALL snapshots that depended on these upstream tables will have their intervals deleted
 948                (even ones not in this current environment). Only the snapshots in this environment will
 949                be backfilled whereas others need to be recovered on a future plan application. For development
 950                environments only snapshots that are part of this plan will be affected.
 951            no_gaps:  Whether to ensure that new snapshots for models that are already a
 952                part of the target environment have no data gaps when compared against previous
 953                snapshots for same models.
 954            skip_backfill: Whether to skip the backfill step. Default: False.
 955            forward_only: Whether the purpose of the plan is to make forward only changes.
 956            no_auto_categorization: Indicates whether to disable automatic categorization of model
 957                changes (breaking / non-breaking). If not provided, then the corresponding configuration
 958                option determines the behavior.
 959            categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the
 960                project config by default.
 961            effective_from: The effective date from which to apply forward-only changes on production.
 962            include_unmodified: Indicates whether to include unmodified models in the target development environment.
 963            select_models: A list of model selection strings to filter the models that should be included into this plan.
 964            backfill_models: A list of model selection strings to filter the models for which the data should be backfilled.
 965            enable_preview: Indicates whether to enable preview for forward-only models in development environments.
 966            run: Whether to run latest intervals as part of the plan application.
 967
 968        Returns:
 969            The plan builder.
 970        """
 971        environment = environment or self.config.default_target_environment
 972        environment = Environment.normalize_name(environment)
 973        is_dev = environment != c.PROD
 974
 975        if skip_backfill and not no_gaps and not is_dev:
 976            raise ConfigError(
 977                "When targeting the production environment either the backfill should not be skipped or the lack of data gaps should be enforced (--no-gaps flag)."
 978            )
 979
 980        if run and is_dev:
 981            raise ConfigError("The '--run' flag is only supported for the production environment.")
 982
 983        self._run_plan_tests(skip_tests=skip_tests)
 984
 985        environment_ttl = (
 986            self.environment_ttl if environment not in self.pinned_environments else None
 987        )
 988
 989        model_selector = self._new_selector()
 990
 991        if backfill_models:
 992            backfill_models = model_selector.expand_model_selections(backfill_models)
 993        else:
 994            backfill_models = None
 995
 996        models_override: t.Optional[UniqueKeyDict[str, Model]] = None
 997        if select_models:
 998            models_override = model_selector.select_models(
 999                select_models,
1000                environment,
1001                fallback_env_name=create_from or c.PROD,
1002                ensure_finalized_snapshots=self.config.plan.use_finalized_state,
1003            )
1004            if not backfill_models:
1005                # Only backfill selected models unless explicitly specified.
1006                backfill_models = model_selector.expand_model_selections(select_models)
1007
1008        expanded_restate_models = None
1009        if restate_models is not None:
1010            expanded_restate_models = model_selector.expand_model_selections(restate_models)
1011            if not expanded_restate_models:
1012                self.console.log_error(
1013                    f"Provided restated models do not match any models. No models will be included in plan. Provided: {', '.join(restate_models)}"
1014                )
1015
1016        snapshots = self._snapshots(models_override)
1017        context_diff = self._context_diff(
1018            environment or c.PROD,
1019            snapshots=snapshots,
1020            create_from=create_from,
1021            force_no_diff=(restate_models is not None and not expanded_restate_models)
1022            or (backfill_models is not None and not backfill_models),
1023            ensure_finalized_snapshots=self.config.plan.use_finalized_state,
1024        )
1025
1026        # If no end date is specified, use the max interval end from prod
1027        # to prevent unintended evaluation of the entire DAG.
1028        if not run:
1029            if backfill_models is not None:
1030                # Only consider selected models for the default end value.
1031                models_for_default_end = backfill_models.copy()
1032                for name in backfill_models:
1033                    if name not in snapshots:
1034                        continue
1035                    snapshot = snapshots[name]
1036                    snapshot_id = snapshot.snapshot_id
1037                    if (
1038                        snapshot_id in context_diff.added
1039                        and snapshot_id in context_diff.new_snapshots
1040                    ):
1041                        # If the selected model is a newly added model, then we should narrow down the intervals
1042                        # that should be considered for the default plan end value by including its parents.
1043                        models_for_default_end |= {s.name for s in snapshot.parents}
1044                default_end = self.state_sync.greatest_common_interval_end(
1045                    c.PROD,
1046                    models_for_default_end,
1047                    ensure_finalized_snapshots=self.config.plan.use_finalized_state,
1048                )
1049            else:
1050                default_end = self.state_sync.max_interval_end_for_environment(
1051                    c.PROD, ensure_finalized_snapshots=self.config.plan.use_finalized_state
1052                )
1053        else:
1054            default_end = None
1055
1056        default_start = to_date(default_end) - timedelta(days=1) if default_end and is_dev else None
1057
1058        return PlanBuilder(
1059            context_diff=context_diff,
1060            start=start,
1061            end=end,
1062            execution_time=execution_time,
1063            apply=self.apply,
1064            restate_models=expanded_restate_models,
1065            backfill_models=backfill_models,
1066            no_gaps=no_gaps,
1067            skip_backfill=skip_backfill,
1068            is_dev=is_dev,
1069            forward_only=(
1070                forward_only if forward_only is not None else self.config.plan.forward_only
1071            ),
1072            environment_ttl=environment_ttl,
1073            environment_suffix_target=self.config.environment_suffix_target,
1074            environment_catalog_mapping=self.config.environment_catalog_mapping,
1075            categorizer_config=categorizer_config or self.auto_categorize_changes,
1076            auto_categorization_enabled=not no_auto_categorization,
1077            effective_from=effective_from,
1078            include_unmodified=(
1079                include_unmodified
1080                if include_unmodified is not None
1081                else self.config.plan.include_unmodified
1082            ),
1083            default_start=default_start,
1084            default_end=default_end,
1085            enable_preview=(
1086                enable_preview if enable_preview is not None else self.config.plan.enable_preview
1087            ),
1088            end_bounded=not run,
1089            ensure_finalized_snapshots=self.config.plan.use_finalized_state,
1090        )
1091
1092    def apply(
1093        self,
1094        plan: Plan,
1095        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
1096    ) -> None:
1097        """Applies a plan by pushing snapshots and backfilling data.
1098
1099        Given a plan, it pushes snapshots into the state sync and then uses the scheduler
1100        to backfill all models.
1101
1102        Args:
1103            plan: The plan to apply.
1104            circuit_breaker: An optional handler which checks if the apply should be aborted.
1105        """
1106        if (
1107            not plan.context_diff.has_changes
1108            and not plan.requires_backfill
1109            and not plan.has_unmodified_unpromoted
1110        ):
1111            return
1112        if plan.uncategorized:
1113            raise UncategorizedPlanError("Can't apply a plan with uncategorized changes.")
1114        self.notification_target_manager.notify(
1115            NotificationEvent.APPLY_START,
1116            environment=plan.environment_naming_info.name,
1117            plan_id=plan.plan_id,
1118        )
1119        try:
1120            self._apply(plan, circuit_breaker)
1121        except Exception as e:
1122            self.notification_target_manager.notify(
1123                NotificationEvent.APPLY_FAILURE,
1124                environment=plan.environment_naming_info.name,
1125                plan_id=plan.plan_id,
1126                exc=traceback.format_exc(),
1127            )
1128            logger.error(f"Apply Failure: {traceback.format_exc()}")
1129            raise e
1130        self.notification_target_manager.notify(
1131            NotificationEvent.APPLY_END,
1132            environment=plan.environment_naming_info.name,
1133            plan_id=plan.plan_id,
1134        )
1135
1136    def invalidate_environment(self, name: str, sync: bool = False) -> None:
1137        """Invalidates the target environment by setting its expiration timestamp to now.
1138
1139        Args:
1140            name: The name of the environment to invalidate.
1141            sync: If True, the call blocks until the environment is deleted. Otherwise, the environment will
1142                be deleted asynchronously by the janitor process.
1143        """
1144        self.state_sync.invalidate_environment(name)
1145        if sync:
1146            self._cleanup_environments()
1147            self.console.log_success(f"Environment '{name}' has been deleted.")
1148        else:
1149            self.console.log_success(f"Environment '{name}' has been invalidated.")
1150
1151    def diff(self, environment: t.Optional[str] = None, detailed: bool = False) -> bool:
1152        """Show a diff of the current context with a given environment.
1153
1154        Args:
1155            environment: The environment to diff against.
1156            detailed: Show the actual SQL differences if True.
1157
1158        Returns:
1159            True if there are changes, False otherwise.
1160        """
1161        environment = environment or self.config.default_target_environment
1162        environment = Environment.normalize_name(environment)
1163        context_diff = self._context_diff(environment)
1164        self.console.show_model_difference_summary(
1165            context_diff,
1166            EnvironmentNamingInfo.from_environment_catalog_mapping(
1167                self.config.environment_catalog_mapping,
1168                name=environment,
1169                suffix_target=self.config.environment_suffix_target,
1170            ),
1171            self.default_catalog,
1172            no_diff=not detailed,
1173        )
1174        return context_diff.has_changes
1175
1176    def table_diff(
1177        self,
1178        source: str,
1179        target: str,
1180        on: t.List[str] | exp.Condition | None = None,
1181        model_or_snapshot: t.Optional[ModelOrSnapshot] = None,
1182        where: t.Optional[str | exp.Condition] = None,
1183        limit: int = 20,
1184        show: bool = True,
1185        show_sample: bool = True,
1186    ) -> TableDiff:
1187        """Show a diff between two tables.
1188
1189        Args:
1190            source: The source environment or table.
1191            target: The target environment or table.
1192            on: The join condition, table aliases must be "s" and "t" for source and target.
1193                If omitted, the table's grain will be used.
1194            model_or_snapshot: The model or snapshot to use when environments are passed in.
1195            where: An optional where statement to filter results.
1196            limit: The limit of the sample dataframe.
1197            show: Show the table diff output in the console.
1198            show_sample: Show the sample dataframe in the console. Requires show=True.
1199
1200        Returns:
1201            The TableDiff object containing schema and summary differences.
1202        """
1203        source_alias, target_alias = source, target
1204        if model_or_snapshot:
1205            model = self.get_model(model_or_snapshot, raise_if_missing=True)
1206            source_env = self.state_reader.get_environment(source)
1207            target_env = self.state_reader.get_environment(target)
1208
1209            if not source_env:
1210                raise SQLMeshError(f"Could not find environment '{source}'")
1211            if not target_env:
1212                raise SQLMeshError(f"Could not find environment '{target}')")
1213
1214            source = next(
1215                snapshot for snapshot in source_env.snapshots if snapshot.name == model.fqn
1216            ).table_name()
1217            target = next(
1218                snapshot for snapshot in target_env.snapshots if snapshot.name == model.fqn
1219            ).table_name()
1220            source_alias = source_env.name
1221            target_alias = target_env.name
1222
1223            if not on:
1224                for ref in model.all_references:
1225                    if ref.unique:
1226                        on = ref.columns
1227
1228        if not on:
1229            raise SQLMeshError(
1230                "SQLMesh doesn't know how to join the two tables. Specify the `grains` in each model definition or pass join column names in separate `-o` flags."
1231            )
1232
1233        table_diff = TableDiff(
1234            adapter=self._engine_adapter,
1235            source=source,
1236            target=target,
1237            on=on,
1238            where=where,
1239            source_alias=source_alias,
1240            target_alias=target_alias,
1241            model_name=model.name if model_or_snapshot else None,
1242            limit=limit,
1243        )
1244        if show:
1245            self.console.show_schema_diff(table_diff.schema_diff())
1246            self.console.show_row_diff(table_diff.row_diff(), show_sample=show_sample)
1247        return table_diff
1248
1249    def get_dag(
1250        self, select_models: t.Optional[t.Collection[str]] = None, **options: t.Any
1251    ) -> GraphHTML:
1252        """Gets an HTML object representation of the DAG.
1253
1254        Args:
1255            select_models: A list of model selection strings that should be included in the dag.
1256        Returns:
1257            An html object that renders the dag.
1258        """
1259        dag = (
1260            self.dag.prune(*self._new_selector().expand_model_selections(select_models))
1261            if select_models
1262            else self.dag
1263        )
1264
1265        nodes = {}
1266        edges: t.List[t.Dict] = []
1267
1268        for node, deps in dag.graph.items():
1269            nodes[node] = {
1270                "id": node,
1271                "label": node.split(".")[-1],
1272                "title": f"<span>{node}</span>",
1273            }
1274            edges.extend({"from": d, "to": node} for d in deps)
1275
1276        return GraphHTML(
1277            nodes,
1278            edges,
1279            options={
1280                "height": "100%",
1281                "width": "100%",
1282                "interaction": {},
1283                "layout": {
1284                    "hierarchical": {
1285                        "enabled": True,
1286                        "nodeSpacing": 200,
1287                        "sortMethod": "directed",
1288                    },
1289                },
1290                "nodes": {
1291                    "shape": "box",
1292                },
1293                **options,
1294            },
1295        )
1296
1297    def render_dag(self, path: str, select_models: t.Optional[t.Collection[str]] = None) -> None:
1298        """Render the dag as HTML and save it to a file.
1299
1300        Args:
1301            path: filename to save the dag html to
1302            select_models: A list of model selection strings that should be included in the dag.
1303        """
1304        file_path = Path(path)
1305        suffix = file_path.suffix
1306        if suffix != ".html":
1307            if suffix:
1308                logger.warning(
1309                    f"The extension {suffix} does not designate an html file. A file with a `.html` extension will be created instead."
1310                )
1311            path = str(file_path.with_suffix(".html"))
1312
1313        with open(path, "w", encoding="utf-8") as file:
1314            file.write(str(self.get_dag(select_models)))
1315
1316    def create_test(
1317        self,
1318        model: str,
1319        input_queries: t.Dict[str, str],
1320        overwrite: bool = False,
1321        variables: t.Optional[t.Dict[str, str]] = None,
1322        path: t.Optional[str] = None,
1323        name: t.Optional[str] = None,
1324        include_ctes: bool = False,
1325    ) -> None:
1326        """Generate a unit test fixture for a given model.
1327
1328        Args:
1329            model: The model to test.
1330            input_queries: Mapping of model names to queries. Each model included in this mapping
1331                will be populated in the test based on the results of the corresponding query.
1332            overwrite: Whether to overwrite the existing test in case of a file path collision.
1333                When set to False, an error will be raised if there is such a collision.
1334            variables: Key-value pairs that will define variables needed by the model.
1335            path: The file path corresponding to the fixture, relative to the test directory.
1336                By default, the fixture will be created under the test directory and the file name
1337                will be inferred from the test's name.
1338            name: The name of the test. This is inferred from the model name by default.
1339            include_ctes: When true, CTE fixtures will also be generated.
1340        """
1341        input_queries = {
1342            # The get_model here has two purposes: return normalized names & check for missing deps
1343            self.get_model(dep, raise_if_missing=True).fqn: query
1344            for dep, query in input_queries.items()
1345        }
1346
1347        generate_test(
1348            model=self.get_model(model, raise_if_missing=True),
1349            input_queries=input_queries,
1350            models=self._models,
1351            engine_adapter=self._engine_adapter,
1352            test_engine_adapter=self._test_engine_adapter,
1353            project_path=self.path,
1354            overwrite=overwrite,
1355            variables=variables,
1356            path=path,
1357            name=name,
1358            include_ctes=include_ctes,
1359        )
1360
1361    def test(
1362        self,
1363        match_patterns: t.Optional[t.List[str]] = None,
1364        tests: t.Optional[t.List[str]] = None,
1365        verbose: bool = False,
1366        preserve_fixtures: bool = False,
1367        stream: t.Optional[t.TextIO] = None,
1368    ) -> ModelTextTestResult:
1369        """Discover and run model tests"""
1370        if verbose:
1371            pd.set_option("display.max_columns", None)
1372            verbosity = 2
1373        else:
1374            verbosity = 1
1375
1376        try:
1377            if tests:
1378                result = run_model_tests(
1379                    tests=tests,
1380                    models=self._models,
1381                    engine_adapter=self._test_engine_adapter,
1382                    dialect=self.default_dialect,
1383                    verbosity=verbosity,
1384                    patterns=match_patterns,
1385                    preserve_fixtures=preserve_fixtures,
1386                    default_catalog=self.default_catalog,
1387                )
1388            else:
1389                test_meta = []
1390
1391                for path, config in self.configs.items():
1392                    test_meta.extend(
1393                        get_all_model_tests(
1394                            path / c.TESTS,
1395                            patterns=match_patterns,
1396                            ignore_patterns=config.ignore_patterns,
1397                        )
1398                    )
1399
1400                result = run_tests(
1401                    test_meta,
1402                    models=self._models,
1403                    engine_adapter=self._test_engine_adapter,
1404                    dialect=self.default_dialect,
1405                    verbosity=verbosity,
1406                    preserve_fixtures=preserve_fixtures,
1407                    stream=stream,
1408                    default_catalog=self.default_catalog,
1409                )
1410        finally:
1411            self._test_engine_adapter.close()
1412
1413        return result
1414
1415    def audit(
1416        self,
1417        start: TimeLike,
1418        end: TimeLike,
1419        *,
1420        models: t.Optional[t.Iterator[str]] = None,
1421        execution_time: t.Optional[TimeLike] = None,
1422    ) -> None:
1423        """Audit models.
1424
1425        Args:
1426            start: The start of the interval to audit.
1427            end: The end of the interval to audit.
1428            models: The models to audit. All models will be audited if not specified.
1429            execution_time: The date/time time reference to use for execution time. Defaults to now.
1430        """
1431
1432        snapshots = (
1433            [self.get_snapshot(model, raise_if_missing=True) for model in models]
1434            if models
1435            else self.snapshots.values()
1436        )
1437
1438        num_audits = sum(len(snapshot.audits_with_args) for snapshot in snapshots)
1439        self.console.log_status_update(f"Found {num_audits} audit(s).")
1440        errors = []
1441        skipped_count = 0
1442        for snapshot in snapshots:
1443            for audit_result in self.snapshot_evaluator.audit(
1444                snapshot=snapshot,
1445                start=start,
1446                end=end,
1447                snapshots=self.snapshots,
1448                raise_exception=False,
1449            ):
1450                audit_id = f"{audit_result.audit.name}"
1451                if audit_result.model:
1452                    audit_id += f" on model {audit_result.model.name}"
1453
1454                if audit_result.skipped:
1455                    self.console.log_status_update(f"{audit_id} ⏸️ SKIPPED.")
1456                    skipped_count += 1
1457                elif audit_result.count:
1458                    errors.append(audit_result)
1459                    self.console.log_status_update(
1460                        f"{audit_id} ❌ [red]FAIL [{audit_result.count}][/red]."
1461                    )
1462                else:
1463                    self.console.log_status_update(f"{audit_id} ✅ [green]PASS[/green].")
1464
1465        self.console.log_status_update(
1466            f"\nFinished with {len(errors)} audit error{'' if len(errors) == 1 else 's'} "
1467            f"and {skipped_count} audit{'' if skipped_count == 1 else 's'} skipped."
1468        )
1469        for error in errors:
1470            self.console.log_status_update(
1471                f"\nFailure in audit {error.audit.name} ({error.audit._path})."
1472            )
1473            self.console.log_status_update(f"Got {error.count} results, expected 0.")
1474            if error.query:
1475                self.console.show_sql(
1476                    f"{error.query.sql(dialect=self.snapshot_evaluator.adapter.dialect)}"
1477                )
1478
1479        self.console.log_status_update("Done.")
1480
1481    def rewrite(self, sql: str, dialect: str = "") -> exp.Expression:
1482        """Rewrite a sql expression with semantic references into an executable query.
1483
1484        https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/
1485
1486        Args:
1487            sql: The sql string to rewrite.
1488            dialect: The dialect of the sql string, defaults to the project dialect.
1489
1490        Returns:
1491            A SQLGlot expression with semantic references expanded.
1492        """
1493        return rewrite(
1494            sql,
1495            graph=ReferenceGraph(self.models.values()),
1496            metrics=self._metrics,
1497            dialect=dialect or self.default_dialect,
1498        )
1499
1500    def migrate(self) -> None:
1501        """Migrates SQLMesh to the current running version.
1502
1503        Please contact your SQLMesh administrator before doing this.
1504        """
1505        self.notification_target_manager.notify(NotificationEvent.MIGRATION_START)
1506        try:
1507            self._new_state_sync().migrate(
1508                default_catalog=self.default_catalog,
1509                promoted_snapshots_only=self.config.migration.promoted_snapshots_only,
1510            )
1511        except Exception as e:
1512            self.notification_target_manager.notify(
1513                NotificationEvent.MIGRATION_FAILURE, traceback.format_exc()
1514            )
1515            raise e
1516        self.notification_target_manager.notify(NotificationEvent.MIGRATION_END)
1517
1518    def rollback(self) -> None:
1519        """Rolls back SQLMesh to the previous migration.
1520
1521        Please contact your SQLMesh administrator before doing this. This action cannot be undone.
1522        """
1523        self._new_state_sync().rollback()
1524
1525    def create_external_models(self) -> None:
1526        """Create a schema file with all external models.
1527
1528        The schema file contains all columns and types of external models, allowing for more robust
1529        lineage, validation, and optimizations.
1530        """
1531        if not self._models:
1532            self.load(update_schemas=False)
1533
1534        for path, config in self.configs.items():
1535            create_schema_file(
1536                path=path / c.SCHEMA_YAML,
1537                models=UniqueKeyDict(
1538                    "models",
1539                    {
1540                        fqn: model
1541                        for fqn, model in self._models.items()
1542                        if self.config_for_node(model) is config
1543                    },
1544                ),
1545                adapter=self._engine_adapter,
1546                state_reader=self.state_reader,
1547                dialect=config.model_defaults.dialect,
1548                max_workers=self.concurrent_tasks,
1549            )
1550
1551    def print_info(self) -> None:
1552        """Prints information about connections, models, macros, etc. to the console."""
1553        self.console.log_status_update(f"Models: {len(self.models)}")
1554        self.console.log_status_update(f"Macros: {len(self._macros) - len(macro.get_registry())}")
1555
1556        self._try_connection("data warehouse", self._engine_adapter)
1557
1558        state_connection = self.config.get_state_connection(self.gateway)
1559        if state_connection:
1560            self._try_connection("state backend", state_connection.create_engine_adapter())
1561
1562        self._try_connection("test", self._test_engine_adapter)
1563
1564    def close(self) -> None:
1565        """Releases all resources allocated by this context."""
1566        self.snapshot_evaluator.close()
1567        self.state_sync.close()
1568
1569    def _run(
1570        self,
1571        environment: str,
1572        *,
1573        start: t.Optional[TimeLike],
1574        end: t.Optional[TimeLike],
1575        execution_time: t.Optional[TimeLike],
1576        skip_janitor: bool,
1577        ignore_cron: bool,
1578    ) -> bool:
1579        if not skip_janitor and environment.lower() == c.PROD:
1580            self._run_janitor()
1581
1582        env_check_attempts_num = max(
1583            1,
1584            self.config.run.environment_check_max_wait
1585            // self.config.run.environment_check_interval,
1586        )
1587
1588        def _block_until_finalized() -> str:
1589            for _ in range(env_check_attempts_num):
1590                assert environment is not None  # mypy
1591                environment_state = self.state_sync.get_environment(environment)
1592                if not environment_state:
1593                    raise SQLMeshError(f"Environment '{environment}' was not found.")
1594                if environment_state.finalized_ts:
1595                    return environment_state.plan_id
1596                logger.warning(
1597                    "Environment '%s' is being updated by plan '%s'. Retrying in %s seconds...",
1598                    environment,
1599                    environment_state.plan_id,
1600                    self.config.run.environment_check_interval,
1601                )
1602                time.sleep(self.config.run.environment_check_interval)
1603            raise SQLMeshError(
1604                f"Exceeded the maximum wait time for environment '{environment}' to be ready. "
1605                "This means that the environment either failed to update or the update is taking longer than expected. "
1606                "See https://sqlmesh.readthedocs.io/en/stable/reference/configuration/#run to adjust the timeout settings."
1607            )
1608
1609        done = False
1610        while not done:
1611            plan_id_at_start = _block_until_finalized()
1612
1613            def _has_environment_changed() -> bool:
1614                assert environment is not None  # mypy
1615                current_environment_state = self.state_sync.get_environment(environment)
1616                return (
1617                    not current_environment_state
1618                    or current_environment_state.plan_id != plan_id_at_start
1619                    or not current_environment_state.finalized_ts
1620                )
1621
1622            try:
1623                success = self.scheduler(environment=environment).run(
1624                    environment,
1625                    start=start,
1626                    end=end,
1627                    execution_time=execution_time,
1628                    ignore_cron=ignore_cron,
1629                    circuit_breaker=_has_environment_changed,
1630                )
1631                done = True
1632            except CircuitBreakerError:
1633                logger.warning(
1634                    "Environment '%s' has been modified while running. Restarting the run...",
1635                    environment,
1636                )
1637
1638        return success
1639
1640    def _apply(self, plan: Plan, circuit_breaker: t.Optional[t.Callable[[], bool]]) -> None:
1641        self._scheduler.create_plan_evaluator(self).evaluate(plan, circuit_breaker=circuit_breaker)
1642
1643    def table_name(self, model_name: str, dev: bool) -> str:
1644        """Returns the name of the pysical table for the given model name.
1645
1646        Args:
1647            model_name: The name of the model.
1648            dev: Whether to use the deployability index for the table name.
1649
1650        Returns:
1651            The name of the physical table.
1652        """
1653        deployability_index = (
1654            DeployabilityIndex.create(self.snapshots.values())
1655            if dev
1656            else DeployabilityIndex.all_deployable()
1657        )
1658        snapshot = self.get_snapshot(model_name)
1659        if not snapshot:
1660            raise SQLMeshError(f"Model '{model_name}' was not found.")
1661        return snapshot.table_name(is_deployable=deployability_index.is_deployable(snapshot))
1662
1663    def clear_caches(self) -> None:
1664        for path in self.configs:
1665            rmtree(path / c.CACHE)
1666
1667    def _run_tests(self, verbose: bool = False) -> t.Tuple[unittest.result.TestResult, str]:
1668        test_output_io = StringIO()
1669        result = self.test(stream=test_output_io, verbose=verbose)
1670        return result, test_output_io.getvalue()
1671
1672    def _run_plan_tests(
1673        self, skip_tests: bool = False
1674    ) -> t.Tuple[t.Optional[unittest.result.TestResult], t.Optional[str]]:
1675        if self._test_engine_adapter and not skip_tests:
1676            result, test_output = self._run_tests()
1677            if result.testsRun > 0:
1678                self.console.log_test_results(
1679                    result, test_output, self._test_engine_adapter.dialect
1680                )
1681            if not result.wasSuccessful():
1682                raise PlanError(
1683                    "Cannot generate plan due to failing test(s). Fix test(s) and run again"
1684                )
1685            return result, test_output
1686        return None, None
1687
1688    @property
1689    def _model_tables(self) -> t.Dict[str, str]:
1690        """Mapping of model name to physical table name.
1691
1692        If a snapshot has not been versioned yet, its view name will be returned.
1693        """
1694        return {
1695            fqn: (
1696                snapshot.table_name()
1697                if snapshot.version
1698                else snapshot.qualified_view_name.for_environment(
1699                    EnvironmentNamingInfo.from_environment_catalog_mapping(
1700                        self.config.environment_catalog_mapping,
1701                        name=c.PROD,
1702                        suffix_target=self.config.environment_suffix_target,
1703                    )
1704                )
1705            )
1706            for fqn, snapshot in self.snapshots.items()
1707        }
1708
1709    def _snapshots(
1710        self, models_override: t.Optional[UniqueKeyDict[str, Model]] = None
1711    ) -> t.Dict[str, Snapshot]:
1712        prod = self.state_reader.get_environment(c.PROD)
1713        remote_snapshots = (
1714            {
1715                snapshot.name: snapshot
1716                for snapshot in self.state_reader.get_snapshots(prod.snapshots).values()
1717            }
1718            if prod
1719            else {}
1720        )
1721
1722        local_nodes = {**(models_override or self._models), **self._standalone_audits}
1723        nodes = local_nodes.copy()
1724        audits = self._audits.copy()
1725        projects = {config.project for config in self.configs.values()}
1726
1727        for name, snapshot in remote_snapshots.items():
1728            if name not in nodes and snapshot.node.project not in projects:
1729                nodes[name] = snapshot.node
1730                if snapshot.is_model:
1731                    for audit in snapshot.audits:
1732                        if name not in audits:
1733                            audits[name] = audit
1734
1735        def _nodes_to_snapshots(nodes: t.Dict[str, Node]) -> t.Dict[str, Snapshot]:
1736            snapshots: t.Dict[str, Snapshot] = {}
1737            fingerprint_cache: t.Dict[str, SnapshotFingerprint] = {}
1738
1739            for node in nodes.values():
1740                if node.fqn not in local_nodes and node.fqn in remote_snapshots:
1741                    ttl = remote_snapshots[node.fqn].ttl
1742                else:
1743                    config = self.config_for_node(node)
1744                    ttl = config.snapshot_ttl
1745
1746                snapshot = Snapshot.from_node(
1747                    node,
1748                    nodes=nodes,
1749                    audits=audits,
1750                    cache=fingerprint_cache,
1751                    ttl=ttl,
1752                )
1753                snapshots[snapshot.name] = snapshot
1754            return snapshots
1755
1756        snapshots = _nodes_to_snapshots(nodes)
1757        stored_snapshots = self.state_reader.get_snapshots(snapshots.values())
1758
1759        unrestorable_snapshots = {
1760            snapshot
1761            for snapshot in stored_snapshots.values()
1762            if snapshot.name in local_nodes and snapshot.unrestorable
1763        }
1764        if unrestorable_snapshots:
1765            for snapshot in unrestorable_snapshots:
1766                logger.info(
1767                    "Found a unrestorable snapshot %s. Restamping the model...", snapshot.name
1768                )
1769                node = local_nodes[snapshot.name]
1770                nodes[snapshot.name] = node.copy(
1771                    update={"stamp": f"revert to {snapshot.identifier}"}
1772                )
1773            snapshots = _nodes_to_snapshots(nodes)
1774            stored_snapshots = self.state_reader.get_snapshots(snapshots.values())
1775
1776        for snapshot in stored_snapshots.values():
1777            # Keep the original model instance to preserve the query cache.
1778            snapshot.node = snapshots[snapshot.name].node
1779
1780        return {name: stored_snapshots.get(s.snapshot_id, s) for name, s in snapshots.items()}
1781
1782    def _context_diff(
1783        self,
1784        environment: str,
1785        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
1786        create_from: t.Optional[str] = None,
1787        force_no_diff: bool = False,
1788        ensure_finalized_snapshots: bool = False,
1789    ) -> ContextDiff:
1790        environment = Environment.normalize_name(environment)
1791        if force_no_diff:
1792            return ContextDiff.create_no_diff(environment)
1793        return ContextDiff.create(
1794            environment,
1795            snapshots=snapshots or self.snapshots,
1796            create_from=create_from or c.PROD,
1797            state_reader=self.state_reader,
1798            ensure_finalized_snapshots=ensure_finalized_snapshots,
1799        )
1800
1801    def _run_janitor(self) -> None:
1802        self._cleanup_environments()
1803        expired_snapshots = self.state_sync.delete_expired_snapshots()
1804        self.snapshot_evaluator.cleanup(
1805            expired_snapshots, on_complete=self.console.update_cleanup_progress
1806        )
1807
1808        self.state_sync.compact_intervals()
1809
1810    def _cleanup_environments(self) -> None:
1811        expired_environments = self.state_sync.delete_expired_environments()
1812        cleanup_expired_views(self.engine_adapter, expired_environments, console=self.console)
1813
1814    def _try_connection(self, connection_name: str, engine_adapter: EngineAdapter) -> None:
1815        connection_name = connection_name.capitalize()
1816        try:
1817            engine_adapter.fetchall("SELECT 1")
1818            self.console.log_status_update(f"{connection_name} connection [green]succeeded[/green]")
1819        except Exception as ex:
1820            self.console.log_error(f"{connection_name} connection failed. {ex}")
1821
1822    def _new_state_sync(self) -> StateSync:
1823        return self._provided_state_sync or self._scheduler.create_state_sync(self)
1824
1825    def _new_selector(self) -> Selector:
1826        return Selector(
1827            self.state_reader,
1828            self._models,
1829            context_path=self.path,
1830            default_catalog=self.default_catalog,
1831            dialect=self.default_dialect,
1832        )
1833
1834    def _register_notification_targets(self) -> None:
1835        event_notifications = collections.defaultdict(set)
1836        for target in self.notification_targets:
1837            if target.is_configured:
1838                for event in target.notify_on:
1839                    event_notifications[event].add(target)
1840        user_notification_targets = {
1841            user.username: set(
1842                target for target in user.notification_targets if target.is_configured
1843            )
1844            for user in self.users
1845        }
1846        self.notification_target_manager = NotificationTargetManager(
1847            event_notifications, user_notification_targets, username=self.config.username
1848        )

Encapsulates a SQLMesh environment supplying convenient functions to perform various tasks.

Arguments:
  • engine_adapter: The default engine adapter to use.
  • notification_targets: The notification target to use. Defaults to what is defined in config.
  • paths: The directories containing SQLMesh files.
  • config: A Config object or the name of a Config object in config.py.
  • connection: The name of the connection. If not specified the first connection as it appears in configuration will be used.
  • test_connection: The name of the connection to use for tests. If not specified the first connection as it appears in configuration will be used.
  • concurrent_tasks: The maximum number of tasks that can use the connection concurrently.
  • load: Whether or not to automatically load all models and macros (default True).
  • console: The rich instance used for printing out CLI command results.
  • users: A list of users to make known to SQLMesh.
  • config_type: The type of config object to use (default Config).
GenericContext( engine_adapter: Union[sqlmesh.core.engine_adapter.base.EngineAdapter, NoneType] = None, notification_targets: Union[List[typing_extensions.Annotated[Union[sqlmesh.core.notification_target.BasicSMTPNotificationTarget, sqlmesh.core.notification_target.ConsoleNotificationTarget, sqlmesh.core.notification_target.SlackApiNotificationTarget, sqlmesh.core.notification_target.SlackWebhookNotificationTarget], FieldInfo(annotation=NoneType, required=True, discriminator='type_')]], NoneType] = None, state_sync: Union[sqlmesh.core.state_sync.base.StateSync, NoneType] = None, paths: 't.Union[str | Path, t.Iterable[str | Path]]' = '', config: Union[~C, str, Dict[pathlib.Path, ~C], NoneType] = None, gateway: Union[str, NoneType] = None, concurrent_tasks: Union[int, NoneType] = None, loader: Union[Type[sqlmesh.core.loader.Loader], NoneType] = None, load: bool = True, console: Union[sqlmesh.core.console.Console, NoneType] = None, users: Union[List[sqlmesh.core.user.User], NoneType] = None)
287    def __init__(
288        self,
289        engine_adapter: t.Optional[EngineAdapter] = None,
290        notification_targets: t.Optional[t.List[NotificationTarget]] = None,
291        state_sync: t.Optional[StateSync] = None,
292        paths: t.Union[str | Path, t.Iterable[str | Path]] = "",
293        config: t.Optional[t.Union[C, str, t.Dict[Path, C]]] = None,
294        gateway: t.Optional[str] = None,
295        concurrent_tasks: t.Optional[int] = None,
296        loader: t.Optional[t.Type[Loader]] = None,
297        load: bool = True,
298        console: t.Optional[Console] = None,
299        users: t.Optional[t.List[User]] = None,
300    ):
301        self.console = console or get_console()
302        self.configs = (
303            config if isinstance(config, dict) else load_configs(config, self.CONFIG_TYPE, paths)
304        )
305        self.dag: DAG[str] = DAG()
306        self._models: UniqueKeyDict[str, Model] = UniqueKeyDict("models")
307        self._audits: UniqueKeyDict[str, Audit] = UniqueKeyDict("audits")
308        self._standalone_audits: UniqueKeyDict[str, StandaloneAudit] = UniqueKeyDict(
309            "standaloneaudits"
310        )
311        self._macros: UniqueKeyDict[str, ExecutableOrMacro] = UniqueKeyDict("macros")
312        self._metrics: UniqueKeyDict[str, Metric] = UniqueKeyDict("metrics")
313        self._jinja_macros = JinjaMacroRegistry()
314        self._default_catalog: t.Optional[str] = None
315
316        self.path, self.config = t.cast(t.Tuple[Path, C], next(iter(self.configs.items())))
317
318        self.gateway = gateway
319        self._scheduler = self.config.get_scheduler(self.gateway)
320        self.environment_ttl = self.config.environment_ttl
321        self.pinned_environments = Environment.normalize_names(self.config.pinned_environments)
322        self.auto_categorize_changes = self.config.plan.auto_categorize_changes
323
324        self._connection_config = self.config.get_connection(self.gateway)
325        self.concurrent_tasks = concurrent_tasks or self._connection_config.concurrent_tasks
326        self._engine_adapter = engine_adapter or self._connection_config.create_engine_adapter()
327
328        test_connection_config = self.config.get_test_connection(
329            self.gateway, self.default_catalog, default_catalog_dialect=self.engine_adapter.DIALECT
330        )
331        self._test_engine_adapter = test_connection_config.create_engine_adapter(
332            register_comments_override=False
333        )
334
335        self._snapshot_evaluator: t.Optional[SnapshotEvaluator] = None
336
337        self._provided_state_sync: t.Optional[StateSync] = state_sync
338        self._state_sync: t.Optional[StateSync] = None
339
340        self._loader = (loader or self.config.loader)(**self.config.loader_kwargs)
341
342        # Should we dedupe notification_targets? If so how?
343        self.notification_targets = (notification_targets or []) + self.config.notification_targets
344        self.users = (users or []) + self.config.users
345        self.users = list({user.username: user for user in self.users}.values())
346        self._register_notification_targets()
347
348        if (
349            self.config.environment_catalog_mapping
350            and not self.engine_adapter.CATALOG_SUPPORT.is_multi_catalog_supported
351        ):
352            raise SQLMeshError(
353                "Environment catalog mapping is only supported for engine adapters that support multiple catalogs"
354            )
355
356        if load:
357            self.load()
default_dialect: Union[str, NoneType]

Returns the default dialect.

Returns an engine adapter.

def execution_context( self, deployability_index: Union[sqlmesh.core.snapshot.definition.DeployabilityIndex, NoneType] = None) -> sqlmesh.core.context.ExecutionContext:
377    def execution_context(
378        self, deployability_index: t.Optional[DeployabilityIndex] = None
379    ) -> ExecutionContext:
380        """Returns an execution context."""
381        return ExecutionContext(
382            engine_adapter=self._engine_adapter,
383            snapshots=self.snapshots,
384            deployability_index=deployability_index,
385            default_dialect=self.default_dialect,
386            default_catalog=self.default_catalog,
387        )

Returns an execution context.

389    def upsert_model(self, model: t.Union[str, Model], **kwargs: t.Any) -> Model:
390        """Update or insert a model.
391
392        The context's models dictionary will be updated to include these changes.
393
394        Args:
395            model: Model name or instance to update.
396            kwargs: The kwargs to update the model with.
397
398        Returns:
399            A new instance of the updated or inserted model.
400        """
401        model = self.get_model(model, raise_if_missing=True)
402        path = model._path
403
404        # model.copy() can't be used here due to a cached state that can be a part of a model instance.
405        model = t.cast(Model, type(model)(**{**t.cast(Model, model).dict(), **kwargs}))
406        model._path = path
407
408        self._models.update({model.fqn: model})
409        self.dag.add(model.fqn, model.depends_on)
410        update_model_schemas(
411            self.dag,
412            self._models,
413            self.path,
414        )
415
416        model.validate_definition()
417
418        return model

Update or insert a model.

The context's models dictionary will be updated to include these changes.

Arguments:
  • model: Model name or instance to update.
  • kwargs: The kwargs to update the model with.
Returns:

A new instance of the updated or inserted model.

def scheduler( self, environment: Union[str, NoneType] = None) -> sqlmesh.core.scheduler.Scheduler:
420    def scheduler(self, environment: t.Optional[str] = None) -> Scheduler:
421        """Returns the built-in scheduler.
422
423        Args:
424            environment: The target environment to source model snapshots from, or None
425                if snapshots should be sourced from the currently loaded local state.
426
427        Returns:
428            The built-in scheduler instance.
429        """
430        snapshots: t.Iterable[Snapshot]
431        if environment is not None:
432            stored_environment = self.state_sync.get_environment(environment)
433            if stored_environment is None:
434                raise ConfigError(f"Environment '{environment}' was not found.")
435            snapshots = self.state_sync.get_snapshots(stored_environment.snapshots).values()
436        else:
437            snapshots = self.snapshots.values()
438
439        if not snapshots:
440            raise ConfigError("No models were found")
441
442        return Scheduler(
443            snapshots,
444            self.snapshot_evaluator,
445            self.state_sync,
446            default_catalog=self.default_catalog,
447            max_workers=self.concurrent_tasks,
448            console=self.console,
449            notification_target_manager=self.notification_target_manager,
450        )

Returns the built-in scheduler.

Arguments:
  • environment: The target environment to source model snapshots from, or None if snapshots should be sourced from the currently loaded local state.
Returns:

The built-in scheduler instance.

def refresh(self) -> None:
467    def refresh(self) -> None:
468        """Refresh all models that have been updated."""
469        if self._loader.reload_needed():
470            self.load()

Refresh all models that have been updated.

def load( self, update_schemas: bool = True) -> sqlmesh.core.context.GenericContext[~C]:
472    def load(self, update_schemas: bool = True) -> GenericContext[C]:
473        """Load all files in the context's path."""
474        with sys_path(*self.configs):
475            gc.disable()
476            project = self._loader.load(self, update_schemas)
477            self._macros = project.macros
478            self._jinja_macros = project.jinja_macros
479            self._models = project.models
480            self._metrics = project.metrics
481            self._standalone_audits.clear()
482            self._audits.clear()
483            for name, audit in project.audits.items():
484                if isinstance(audit, StandaloneAudit):
485                    self._standalone_audits[name] = audit
486                else:
487                    self._audits[name] = audit
488            self.dag = project.dag
489            gc.enable()
490
491            duplicates = set(self._models) & set(self._standalone_audits)
492            if duplicates:
493                raise ConfigError(
494                    f"Models and Standalone audits cannot have the same name: {duplicates}"
495                )
496
497        return self

Load all files in the context's path.

def run( self, environment: Union[str, NoneType] = None, *, start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, skip_janitor: bool = False, ignore_cron: bool = False) -> bool:
499    def run(
500        self,
501        environment: t.Optional[str] = None,
502        *,
503        start: t.Optional[TimeLike] = None,
504        end: t.Optional[TimeLike] = None,
505        execution_time: t.Optional[TimeLike] = None,
506        skip_janitor: bool = False,
507        ignore_cron: bool = False,
508    ) -> bool:
509        """Run the entire dag through the scheduler.
510
511        Args:
512            environment: The target environment to source model snapshots from and virtually update. Default: prod.
513            start: The start of the interval to render.
514            end: The end of the interval to render.
515            execution_time: The date/time time reference to use for execution time. Defaults to now.
516            skip_janitor: Whether to skip the janitor task.
517            ignore_cron: Whether to ignore the model's cron schedule and run all available missing intervals.
518
519        Returns:
520            True if the run was successful, False otherwise.
521        """
522        environment = environment or self.config.default_target_environment
523
524        self.notification_target_manager.notify(
525            NotificationEvent.RUN_START, environment=environment
526        )
527        success = False
528        try:
529            success = self._run(
530                environment=environment,
531                start=start,
532                end=end,
533                execution_time=execution_time,
534                skip_janitor=skip_janitor,
535                ignore_cron=ignore_cron,
536            )
537        except Exception as e:
538            self.notification_target_manager.notify(
539                NotificationEvent.RUN_FAILURE, traceback.format_exc()
540            )
541            logger.error(f"Run Failure: {traceback.format_exc()}")
542            raise e
543
544        if success:
545            self.notification_target_manager.notify(
546                NotificationEvent.RUN_END, environment=environment
547            )
548            self.console.log_success(f"Run finished for environment '{environment}'")
549        else:
550            self.notification_target_manager.notify(
551                NotificationEvent.RUN_FAILURE, "See console logs for details."
552            )
553
554        return success

Run the entire dag through the scheduler.

Arguments:
  • environment: The target environment to source model snapshots from and virtually update. Default: prod.
  • start: The start of the interval to render.
  • end: The end of the interval to render.
  • execution_time: The date/time time reference to use for execution time. Defaults to now.
  • skip_janitor: Whether to skip the janitor task.
  • ignore_cron: Whether to ignore the model's cron schedule and run all available missing intervals.
Returns:

True if the run was successful, False otherwise.

def get_model( self, model_or_snapshot: <MagicMock id='139778330602224'>, raise_if_missing: bool = False) -> Union[sqlmesh.core.model.definition.SqlModel, sqlmesh.core.model.definition.SeedModel, sqlmesh.core.model.definition.PythonModel, sqlmesh.core.model.definition.ExternalModel, NoneType]:
568    def get_model(
569        self, model_or_snapshot: ModelOrSnapshot, raise_if_missing: bool = False
570    ) -> t.Optional[Model]:
571        """Returns a model with the given name or None if a model with such name doesn't exist.
572
573        Args:
574            model_or_snapshot: A model name, model, or snapshot.
575            raise_if_missing: Raises an error if a model is not found.
576
577        Returns:
578            The expected model.
579        """
580        if isinstance(model_or_snapshot, str):
581            normalized_name = normalize_model_name(
582                model_or_snapshot,
583                dialect=self.default_dialect,
584                default_catalog=self.default_catalog,
585            )
586            model = self._models.get(normalized_name)
587        elif isinstance(model_or_snapshot, Snapshot):
588            model = model_or_snapshot.model
589        else:
590            model = model_or_snapshot
591
592        if raise_if_missing and not model:
593            raise SQLMeshError(f"Cannot find model for '{model_or_snapshot}'")
594
595        return model

Returns a model with the given name or None if a model with such name doesn't exist.

Arguments:
  • model_or_snapshot: A model name, model, or snapshot.
  • raise_if_missing: Raises an error if a model is not found.
Returns:

The expected model.

def get_snapshot( self, node_or_snapshot: <MagicMock id='139778330635088'>, raise_if_missing: bool = False) -> Union[sqlmesh.core.snapshot.definition.Snapshot, NoneType]:
610    def get_snapshot(
611        self, node_or_snapshot: NodeOrSnapshot, raise_if_missing: bool = False
612    ) -> t.Optional[Snapshot]:
613        """Returns a snapshot with the given name or None if a snapshot with such name doesn't exist.
614
615        Args:
616            node_or_snapshot: A node name, node, or snapshot.
617            raise_if_missing: Raises an error if a snapshot is not found.
618
619        Returns:
620            The expected snapshot.
621        """
622        if isinstance(node_or_snapshot, Snapshot):
623            return node_or_snapshot
624        if isinstance(node_or_snapshot, str) and not self.standalone_audits.get(node_or_snapshot):
625            node_or_snapshot = normalize_model_name(
626                node_or_snapshot,
627                dialect=self.default_dialect,
628                default_catalog=self.default_catalog,
629            )
630        fqn = node_or_snapshot if isinstance(node_or_snapshot, str) else node_or_snapshot.fqn
631        snapshot = self.snapshots.get(fqn)
632
633        if raise_if_missing and not snapshot:
634            raise SQLMeshError(f"Cannot find snapshot for '{fqn}'")
635
636        return snapshot

Returns a snapshot with the given name or None if a snapshot with such name doesn't exist.

Arguments:
  • node_or_snapshot: A node name, node, or snapshot.
  • raise_if_missing: Raises an error if a snapshot is not found.
Returns:

The expected snapshot.

def config_for_path(self, path: pathlib.Path) -> sqlmesh.core.config.root.Config:
638    def config_for_path(self, path: Path) -> Config:
639        for config_path, config in self.configs.items():
640            try:
641                path.relative_to(config_path)
642                return config
643            except ValueError:
644                pass
645        return self.config
def config_for_node( self, node: 'str | Model | StandaloneAudit') -> sqlmesh.core.config.root.Config:
647    def config_for_node(self, node: str | Model | StandaloneAudit) -> Config:
648        if isinstance(node, str):
649            return self.config_for_path(self.get_snapshot(node, raise_if_missing=True).node._path)  # type: ignore
650        return self.config_for_path(node._path)  # type: ignore
models: 'MappingProxyType[str, Model]'

Returns all registered models in this context.

metrics: 'MappingProxyType[str, Metric]'

Returns all registered metrics in this context.

standalone_audits: 'MappingProxyType[str, StandaloneAudit]'

Returns all registered standalone audits in this context.

Generates and returns snapshots based on models registered in this context.

If one of the snapshots has been previously stored in the persisted state, the stored instance will be returned.

def render( self, model_or_snapshot: <MagicMock id='139778330721056'>, *, start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, expand: Union[bool, Iterable[str]] = False, **kwargs: Any) -> sqlglot.expressions.Expression:
682    def render(
683        self,
684        model_or_snapshot: ModelOrSnapshot,
685        *,
686        start: t.Optional[TimeLike] = None,
687        end: t.Optional[TimeLike] = None,
688        execution_time: t.Optional[TimeLike] = None,
689        expand: t.Union[bool, t.Iterable[str]] = False,
690        **kwargs: t.Any,
691    ) -> exp.Expression:
692        """Renders a model's query, expanding macros with provided kwargs, and optionally expanding referenced models.
693
694        Args:
695            model_or_snapshot: The model, model name, or snapshot to render.
696            start: The start of the interval to render.
697            end: The end of the interval to render.
698            execution_time: The date/time time reference to use for execution time. Defaults to now.
699            expand: Whether or not to use expand materialized models, defaults to False.
700                If True, all referenced models are expanded as raw queries.
701                If a list, only referenced models are expanded as raw queries.
702
703        Returns:
704            The rendered expression.
705        """
706        execution_time = execution_time or now_ds()
707
708        model = self.get_model(model_or_snapshot, raise_if_missing=True)
709
710        if expand and not isinstance(expand, bool):
711            expand = {
712                normalize_model_name(
713                    x, default_catalog=self.default_catalog, dialect=self.default_dialect
714                )
715                for x in expand
716            }
717
718        expand = self.dag.upstream(model.fqn) if expand is True else expand or []
719
720        if model.is_seed:
721            df = next(
722                model.render(
723                    context=self.execution_context(),
724                    start=start,
725                    end=end,
726                    execution_time=execution_time,
727                    **kwargs,
728                )
729            )
730            return next(pandas_to_sql(t.cast(pd.DataFrame, df), model.columns_to_types))
731
732        return model.render_query_or_raise(
733            start=start,
734            end=end,
735            execution_time=execution_time,
736            snapshots=self.snapshots,
737            expand=expand,
738            engine_adapter=self.engine_adapter,
739            **kwargs,
740        )

Renders a model's query, expanding macros with provided kwargs, and optionally expanding referenced models.

Arguments:
  • model_or_snapshot: The model, model name, or snapshot to render.
  • start: The start of the interval to render.
  • end: The end of the interval to render.
  • execution_time: The date/time time reference to use for execution time. Defaults to now.
  • expand: Whether or not to use expand materialized models, defaults to False. If True, all referenced models are expanded as raw queries. If a list, only referenced models are expanded as raw queries.
Returns:

The rendered expression.

def evaluate( self, model_or_snapshot: <MagicMock id='139778330257824'>, start: Union[datetime.date, datetime.datetime, str, int, float], end: Union[datetime.date, datetime.datetime, str, int, float], execution_time: Union[datetime.date, datetime.datetime, str, int, float], limit: Union[int, NoneType] = None, **kwargs: Any) -> <MagicMock id='139778330237632'>:
742    def evaluate(
743        self,
744        model_or_snapshot: ModelOrSnapshot,
745        start: TimeLike,
746        end: TimeLike,
747        execution_time: TimeLike,
748        limit: t.Optional[int] = None,
749        **kwargs: t.Any,
750    ) -> DF:
751        """Evaluate a model or snapshot (running its query against a DB/Engine).
752
753        This method is used to test or iterate on models without side effects.
754
755        Args:
756            model_or_snapshot: The model, model name, or snapshot to render.
757            start: The start of the interval to evaluate.
758            end: The end of the interval to evaluate.
759            execution_time: The date/time time reference to use for execution time.
760            limit: A limit applied to the model.
761        """
762        snapshot = self.get_snapshot(model_or_snapshot, raise_if_missing=True)
763
764        df = self.snapshot_evaluator.evaluate_and_fetch(
765            snapshot,
766            start=start,
767            end=end,
768            execution_time=execution_time,
769            snapshots=self.snapshots,
770            limit=limit or c.DEFAULT_MAX_LIMIT,
771        )
772
773        if df is None:
774            raise RuntimeError(f"Error evaluating {snapshot.name}")
775
776        return df

Evaluate a model or snapshot (running its query against a DB/Engine).

This method is used to test or iterate on models without side effects.

Arguments:
  • model_or_snapshot: The model, model name, or snapshot to render.
  • start: The start of the interval to evaluate.
  • end: The end of the interval to evaluate.
  • execution_time: The date/time time reference to use for execution time.
  • limit: A limit applied to the model.
def format( self, transpile: Union[str, NoneType] = None, append_newline: Union[bool, NoneType] = None, **kwargs: Any) -> None:
778    def format(
779        self,
780        transpile: t.Optional[str] = None,
781        append_newline: t.Optional[bool] = None,
782        **kwargs: t.Any,
783    ) -> None:
784        """Format all SQL models."""
785        for model in self._models.values():
786            if not model._path.suffix == ".sql":
787                continue
788            with open(model._path, "r+", encoding="utf-8") as file:
789                expressions = parse(
790                    file.read(), default_dialect=self.config_for_node(model).dialect
791                )
792                if transpile:
793                    for prop in expressions[0].expressions:
794                        if prop.name.lower() == "dialect":
795                            prop.replace(
796                                exp.Property(
797                                    this="dialect",
798                                    value=exp.Literal.string(transpile or model.dialect),
799                                )
800                            )
801                format = self.config_for_node(model).format
802                opts = {**format.generator_options, **kwargs}
803                file.seek(0)
804                file.write(
805                    format_model_expressions(expressions, transpile or model.dialect, **opts)
806                )
807                if append_newline is None:
808                    append_newline = format.append_newline
809                if append_newline:
810                    file.write("\n")
811                file.truncate()

Format all SQL models.

def plan( self, environment: Union[str, NoneType] = None, *, start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, create_from: Union[str, NoneType] = None, skip_tests: bool = False, restate_models: Union[Iterable[str], NoneType] = None, no_gaps: bool = False, skip_backfill: bool = False, forward_only: Union[bool, NoneType] = None, no_prompts: Union[bool, NoneType] = None, auto_apply: Union[bool, NoneType] = None, no_auto_categorization: Union[bool, NoneType] = None, effective_from: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, include_unmodified: Union[bool, NoneType] = None, select_models: Union[Collection[str], NoneType] = None, backfill_models: Union[Collection[str], NoneType] = None, categorizer_config: Union[sqlmesh.core.config.categorizer.CategorizerConfig, NoneType] = None, enable_preview: Union[bool, NoneType] = None, no_diff: Union[bool, NoneType] = None, run: bool = False) -> sqlmesh.core.plan.definition.Plan:
813    def plan(
814        self,
815        environment: t.Optional[str] = None,
816        *,
817        start: t.Optional[TimeLike] = None,
818        end: t.Optional[TimeLike] = None,
819        execution_time: t.Optional[TimeLike] = None,
820        create_from: t.Optional[str] = None,
821        skip_tests: bool = False,
822        restate_models: t.Optional[t.Iterable[str]] = None,
823        no_gaps: bool = False,
824        skip_backfill: bool = False,
825        forward_only: t.Optional[bool] = None,
826        no_prompts: t.Optional[bool] = None,
827        auto_apply: t.Optional[bool] = None,
828        no_auto_categorization: t.Optional[bool] = None,
829        effective_from: t.Optional[TimeLike] = None,
830        include_unmodified: t.Optional[bool] = None,
831        select_models: t.Optional[t.Collection[str]] = None,
832        backfill_models: t.Optional[t.Collection[str]] = None,
833        categorizer_config: t.Optional[CategorizerConfig] = None,
834        enable_preview: t.Optional[bool] = None,
835        no_diff: t.Optional[bool] = None,
836        run: bool = False,
837    ) -> Plan:
838        """Interactively creates a plan.
839
840        This method compares the current context with the target environment. It then presents
841        the differences and asks whether to backfill each modified model.
842
843        Args:
844            environment: The environment to diff and plan against.
845            start: The start date of the backfill if there is one.
846            end: The end date of the backfill if there is one.
847            execution_time: The date/time reference to use for execution time. Defaults to now.
848            create_from: The environment to create the target environment from if it
849                doesn't exist. If not specified, the "prod" environment will be used.
850            skip_tests: Unit tests are run by default so this will skip them if enabled
851            restate_models: A list of either internal or external models, or tags, that need to be restated
852                for the given plan interval. If the target environment is a production environment,
853                ALL snapshots that depended on these upstream tables will have their intervals deleted
854                (even ones not in this current environment). Only the snapshots in this environment will
855                be backfilled whereas others need to be recovered on a future plan application. For development
856                environments only snapshots that are part of this plan will be affected.
857            no_gaps:  Whether to ensure that new snapshots for models that are already a
858                part of the target environment have no data gaps when compared against previous
859                snapshots for same models.
860            skip_backfill: Whether to skip the backfill step. Default: False.
861            forward_only: Whether the purpose of the plan is to make forward only changes.
862            no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that
863                if this flag is set to true and there are uncategorized changes the plan creation will
864                fail. Default: False.
865            auto_apply: Whether to automatically apply the new plan after creation. Default: False.
866            no_auto_categorization: Indicates whether to disable automatic categorization of model
867                changes (breaking / non-breaking). If not provided, then the corresponding configuration
868                option determines the behavior.
869            categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the
870                project config by default.
871            effective_from: The effective date from which to apply forward-only changes on production.
872            include_unmodified: Indicates whether to include unmodified models in the target development environment.
873            select_models: A list of model selection strings to filter the models that should be included into this plan.
874            backfill_models: A list of model selection strings to filter the models for which the data should be backfilled.
875            enable_preview: Indicates whether to enable preview for forward-only models in development environments.
876            no_diff: Hide text differences for changed models.
877            run: Whether to run latest intervals as part of the plan application.
878
879        Returns:
880            The populated Plan object.
881        """
882        plan_builder = self.plan_builder(
883            environment,
884            start=start,
885            end=end,
886            execution_time=execution_time,
887            create_from=create_from,
888            skip_tests=skip_tests,
889            restate_models=restate_models,
890            no_gaps=no_gaps,
891            skip_backfill=skip_backfill,
892            forward_only=forward_only,
893            no_auto_categorization=no_auto_categorization,
894            effective_from=effective_from,
895            include_unmodified=include_unmodified,
896            select_models=select_models,
897            backfill_models=backfill_models,
898            categorizer_config=categorizer_config,
899            enable_preview=enable_preview,
900            run=run,
901        )
902
903        self.console.plan(
904            plan_builder,
905            auto_apply if auto_apply is not None else self.config.plan.auto_apply,
906            self.default_catalog,
907            no_diff=no_diff if no_diff is not None else self.config.plan.no_diff,
908            no_prompts=no_prompts if no_prompts is not None else self.config.plan.no_prompts,
909        )
910
911        return plan_builder.build()

Interactively creates a plan.

This method compares the current context with the target environment. It then presents the differences and asks whether to backfill each modified model.

Arguments:
  • environment: The environment to diff and plan against.
  • start: The start date of the backfill if there is one.
  • end: The end date of the backfill if there is one.
  • execution_time: The date/time reference to use for execution time. Defaults to now.
  • create_from: The environment to create the target environment from if it doesn't exist. If not specified, the "prod" environment will be used.
  • skip_tests: Unit tests are run by default so this will skip them if enabled
  • restate_models: A list of either internal or external models, or tags, that need to be restated for the given plan interval. If the target environment is a production environment, ALL snapshots that depended on these upstream tables will have their intervals deleted (even ones not in this current environment). Only the snapshots in this environment will be backfilled whereas others need to be recovered on a future plan application. For development environments only snapshots that are part of this plan will be affected.
  • no_gaps: Whether to ensure that new snapshots for models that are already a part of the target environment have no data gaps when compared against previous snapshots for same models.
  • skip_backfill: Whether to skip the backfill step. Default: False.
  • forward_only: Whether the purpose of the plan is to make forward only changes.
  • no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that if this flag is set to true and there are uncategorized changes the plan creation will fail. Default: False.
  • auto_apply: Whether to automatically apply the new plan after creation. Default: False.
  • no_auto_categorization: Indicates whether to disable automatic categorization of model changes (breaking / non-breaking). If not provided, then the corresponding configuration option determines the behavior.
  • categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the project config by default.
  • effective_from: The effective date from which to apply forward-only changes on production.
  • include_unmodified: Indicates whether to include unmodified models in the target development environment.
  • select_models: A list of model selection strings to filter the models that should be included into this plan.
  • backfill_models: A list of model selection strings to filter the models for which the data should be backfilled.
  • enable_preview: Indicates whether to enable preview for forward-only models in development environments.
  • no_diff: Hide text differences for changed models.
  • run: Whether to run latest intervals as part of the plan application.
Returns:

The populated Plan object.

def plan_builder( self, environment: Union[str, NoneType] = None, *, start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, create_from: Union[str, NoneType] = None, skip_tests: bool = False, restate_models: Union[Iterable[str], NoneType] = None, no_gaps: bool = False, skip_backfill: bool = False, forward_only: Union[bool, NoneType] = None, no_auto_categorization: Union[bool, NoneType] = None, effective_from: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, include_unmodified: Union[bool, NoneType] = None, select_models: Union[Collection[str], NoneType] = None, backfill_models: Union[Collection[str], NoneType] = None, categorizer_config: Union[sqlmesh.core.config.categorizer.CategorizerConfig, NoneType] = None, enable_preview: Union[bool, NoneType] = None, run: bool = False) -> sqlmesh.core.plan.builder.PlanBuilder:
 913    def plan_builder(
 914        self,
 915        environment: t.Optional[str] = None,
 916        *,
 917        start: t.Optional[TimeLike] = None,
 918        end: t.Optional[TimeLike] = None,
 919        execution_time: t.Optional[TimeLike] = None,
 920        create_from: t.Optional[str] = None,
 921        skip_tests: bool = False,
 922        restate_models: t.Optional[t.Iterable[str]] = None,
 923        no_gaps: bool = False,
 924        skip_backfill: bool = False,
 925        forward_only: t.Optional[bool] = None,
 926        no_auto_categorization: t.Optional[bool] = None,
 927        effective_from: t.Optional[TimeLike] = None,
 928        include_unmodified: t.Optional[bool] = None,
 929        select_models: t.Optional[t.Collection[str]] = None,
 930        backfill_models: t.Optional[t.Collection[str]] = None,
 931        categorizer_config: t.Optional[CategorizerConfig] = None,
 932        enable_preview: t.Optional[bool] = None,
 933        run: bool = False,
 934    ) -> PlanBuilder:
 935        """Creates a plan builder.
 936
 937        Args:
 938            environment: The environment to diff and plan against.
 939            start: The start date of the backfill if there is one.
 940            end: The end date of the backfill if there is one.
 941            execution_time: The date/time reference to use for execution time. Defaults to now.
 942            create_from: The environment to create the target environment from if it
 943                doesn't exist. If not specified, the "prod" environment will be used.
 944            skip_tests: Unit tests are run by default so this will skip them if enabled
 945            restate_models: A list of either internal or external models, or tags, that need to be restated
 946                for the given plan interval. If the target environment is a production environment,
 947                ALL snapshots that depended on these upstream tables will have their intervals deleted
 948                (even ones not in this current environment). Only the snapshots in this environment will
 949                be backfilled whereas others need to be recovered on a future plan application. For development
 950                environments only snapshots that are part of this plan will be affected.
 951            no_gaps:  Whether to ensure that new snapshots for models that are already a
 952                part of the target environment have no data gaps when compared against previous
 953                snapshots for same models.
 954            skip_backfill: Whether to skip the backfill step. Default: False.
 955            forward_only: Whether the purpose of the plan is to make forward only changes.
 956            no_auto_categorization: Indicates whether to disable automatic categorization of model
 957                changes (breaking / non-breaking). If not provided, then the corresponding configuration
 958                option determines the behavior.
 959            categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the
 960                project config by default.
 961            effective_from: The effective date from which to apply forward-only changes on production.
 962            include_unmodified: Indicates whether to include unmodified models in the target development environment.
 963            select_models: A list of model selection strings to filter the models that should be included into this plan.
 964            backfill_models: A list of model selection strings to filter the models for which the data should be backfilled.
 965            enable_preview: Indicates whether to enable preview for forward-only models in development environments.
 966            run: Whether to run latest intervals as part of the plan application.
 967
 968        Returns:
 969            The plan builder.
 970        """
 971        environment = environment or self.config.default_target_environment
 972        environment = Environment.normalize_name(environment)
 973        is_dev = environment != c.PROD
 974
 975        if skip_backfill and not no_gaps and not is_dev:
 976            raise ConfigError(
 977                "When targeting the production environment either the backfill should not be skipped or the lack of data gaps should be enforced (--no-gaps flag)."
 978            )
 979
 980        if run and is_dev:
 981            raise ConfigError("The '--run' flag is only supported for the production environment.")
 982
 983        self._run_plan_tests(skip_tests=skip_tests)
 984
 985        environment_ttl = (
 986            self.environment_ttl if environment not in self.pinned_environments else None
 987        )
 988
 989        model_selector = self._new_selector()
 990
 991        if backfill_models:
 992            backfill_models = model_selector.expand_model_selections(backfill_models)
 993        else:
 994            backfill_models = None
 995
 996        models_override: t.Optional[UniqueKeyDict[str, Model]] = None
 997        if select_models:
 998            models_override = model_selector.select_models(
 999                select_models,
1000                environment,
1001                fallback_env_name=create_from or c.PROD,
1002                ensure_finalized_snapshots=self.config.plan.use_finalized_state,
1003            )
1004            if not backfill_models:
1005                # Only backfill selected models unless explicitly specified.
1006                backfill_models = model_selector.expand_model_selections(select_models)
1007
1008        expanded_restate_models = None
1009        if restate_models is not None:
1010            expanded_restate_models = model_selector.expand_model_selections(restate_models)
1011            if not expanded_restate_models:
1012                self.console.log_error(
1013                    f"Provided restated models do not match any models. No models will be included in plan. Provided: {', '.join(restate_models)}"
1014                )
1015
1016        snapshots = self._snapshots(models_override)
1017        context_diff = self._context_diff(
1018            environment or c.PROD,
1019            snapshots=snapshots,
1020            create_from=create_from,
1021            force_no_diff=(restate_models is not None and not expanded_restate_models)
1022            or (backfill_models is not None and not backfill_models),
1023            ensure_finalized_snapshots=self.config.plan.use_finalized_state,
1024        )
1025
1026        # If no end date is specified, use the max interval end from prod
1027        # to prevent unintended evaluation of the entire DAG.
1028        if not run:
1029            if backfill_models is not None:
1030                # Only consider selected models for the default end value.
1031                models_for_default_end = backfill_models.copy()
1032                for name in backfill_models:
1033                    if name not in snapshots:
1034                        continue
1035                    snapshot = snapshots[name]
1036                    snapshot_id = snapshot.snapshot_id
1037                    if (
1038                        snapshot_id in context_diff.added
1039                        and snapshot_id in context_diff.new_snapshots
1040                    ):
1041                        # If the selected model is a newly added model, then we should narrow down the intervals
1042                        # that should be considered for the default plan end value by including its parents.
1043                        models_for_default_end |= {s.name for s in snapshot.parents}
1044                default_end = self.state_sync.greatest_common_interval_end(
1045                    c.PROD,
1046                    models_for_default_end,
1047                    ensure_finalized_snapshots=self.config.plan.use_finalized_state,
1048                )
1049            else:
1050                default_end = self.state_sync.max_interval_end_for_environment(
1051                    c.PROD, ensure_finalized_snapshots=self.config.plan.use_finalized_state
1052                )
1053        else:
1054            default_end = None
1055
1056        default_start = to_date(default_end) - timedelta(days=1) if default_end and is_dev else None
1057
1058        return PlanBuilder(
1059            context_diff=context_diff,
1060            start=start,
1061            end=end,
1062            execution_time=execution_time,
1063            apply=self.apply,
1064            restate_models=expanded_restate_models,
1065            backfill_models=backfill_models,
1066            no_gaps=no_gaps,
1067            skip_backfill=skip_backfill,
1068            is_dev=is_dev,
1069            forward_only=(
1070                forward_only if forward_only is not None else self.config.plan.forward_only
1071            ),
1072            environment_ttl=environment_ttl,
1073            environment_suffix_target=self.config.environment_suffix_target,
1074            environment_catalog_mapping=self.config.environment_catalog_mapping,
1075            categorizer_config=categorizer_config or self.auto_categorize_changes,
1076            auto_categorization_enabled=not no_auto_categorization,
1077            effective_from=effective_from,
1078            include_unmodified=(
1079                include_unmodified
1080                if include_unmodified is not None
1081                else self.config.plan.include_unmodified
1082            ),
1083            default_start=default_start,
1084            default_end=default_end,
1085            enable_preview=(
1086                enable_preview if enable_preview is not None else self.config.plan.enable_preview
1087            ),
1088            end_bounded=not run,
1089            ensure_finalized_snapshots=self.config.plan.use_finalized_state,
1090        )

Creates a plan builder.

Arguments:
  • environment: The environment to diff and plan against.
  • start: The start date of the backfill if there is one.
  • end: The end date of the backfill if there is one.
  • execution_time: The date/time reference to use for execution time. Defaults to now.
  • create_from: The environment to create the target environment from if it doesn't exist. If not specified, the "prod" environment will be used.
  • skip_tests: Unit tests are run by default so this will skip them if enabled
  • restate_models: A list of either internal or external models, or tags, that need to be restated for the given plan interval. If the target environment is a production environment, ALL snapshots that depended on these upstream tables will have their intervals deleted (even ones not in this current environment). Only the snapshots in this environment will be backfilled whereas others need to be recovered on a future plan application. For development environments only snapshots that are part of this plan will be affected.
  • no_gaps: Whether to ensure that new snapshots for models that are already a part of the target environment have no data gaps when compared against previous snapshots for same models.
  • skip_backfill: Whether to skip the backfill step. Default: False.
  • forward_only: Whether the purpose of the plan is to make forward only changes.
  • no_auto_categorization: Indicates whether to disable automatic categorization of model changes (breaking / non-breaking). If not provided, then the corresponding configuration option determines the behavior.
  • categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the project config by default.
  • effective_from: The effective date from which to apply forward-only changes on production.
  • include_unmodified: Indicates whether to include unmodified models in the target development environment.
  • select_models: A list of model selection strings to filter the models that should be included into this plan.
  • backfill_models: A list of model selection strings to filter the models for which the data should be backfilled.
  • enable_preview: Indicates whether to enable preview for forward-only models in development environments.
  • run: Whether to run latest intervals as part of the plan application.
Returns:

The plan builder.

def apply( self, plan: sqlmesh.core.plan.definition.Plan, circuit_breaker: Union[Callable[[], bool], NoneType] = None) -> None:
1092    def apply(
1093        self,
1094        plan: Plan,
1095        circuit_breaker: t.Optional[t.Callable[[], bool]] = None,
1096    ) -> None:
1097        """Applies a plan by pushing snapshots and backfilling data.
1098
1099        Given a plan, it pushes snapshots into the state sync and then uses the scheduler
1100        to backfill all models.
1101
1102        Args:
1103            plan: The plan to apply.
1104            circuit_breaker: An optional handler which checks if the apply should be aborted.
1105        """
1106        if (
1107            not plan.context_diff.has_changes
1108            and not plan.requires_backfill
1109            and not plan.has_unmodified_unpromoted
1110        ):
1111            return
1112        if plan.uncategorized:
1113            raise UncategorizedPlanError("Can't apply a plan with uncategorized changes.")
1114        self.notification_target_manager.notify(
1115            NotificationEvent.APPLY_START,
1116            environment=plan.environment_naming_info.name,
1117            plan_id=plan.plan_id,
1118        )
1119        try:
1120            self._apply(plan, circuit_breaker)
1121        except Exception as e:
1122            self.notification_target_manager.notify(
1123                NotificationEvent.APPLY_FAILURE,
1124                environment=plan.environment_naming_info.name,
1125                plan_id=plan.plan_id,
1126                exc=traceback.format_exc(),
1127            )
1128            logger.error(f"Apply Failure: {traceback.format_exc()}")
1129            raise e
1130        self.notification_target_manager.notify(
1131            NotificationEvent.APPLY_END,
1132            environment=plan.environment_naming_info.name,
1133            plan_id=plan.plan_id,
1134        )

Applies a plan by pushing snapshots and backfilling data.

Given a plan, it pushes snapshots into the state sync and then uses the scheduler to backfill all models.

Arguments:
  • plan: The plan to apply.
  • circuit_breaker: An optional handler which checks if the apply should be aborted.
def invalidate_environment(self, name: str, sync: bool = False) -> None:
1136    def invalidate_environment(self, name: str, sync: bool = False) -> None:
1137        """Invalidates the target environment by setting its expiration timestamp to now.
1138
1139        Args:
1140            name: The name of the environment to invalidate.
1141            sync: If True, the call blocks until the environment is deleted. Otherwise, the environment will
1142                be deleted asynchronously by the janitor process.
1143        """
1144        self.state_sync.invalidate_environment(name)
1145        if sync:
1146            self._cleanup_environments()
1147            self.console.log_success(f"Environment '{name}' has been deleted.")
1148        else:
1149            self.console.log_success(f"Environment '{name}' has been invalidated.")

Invalidates the target environment by setting its expiration timestamp to now.

Arguments:
  • name: The name of the environment to invalidate.
  • sync: If True, the call blocks until the environment is deleted. Otherwise, the environment will be deleted asynchronously by the janitor process.
def diff( self, environment: Union[str, NoneType] = None, detailed: bool = False) -> bool:
1151    def diff(self, environment: t.Optional[str] = None, detailed: bool = False) -> bool:
1152        """Show a diff of the current context with a given environment.
1153
1154        Args:
1155            environment: The environment to diff against.
1156            detailed: Show the actual SQL differences if True.
1157
1158        Returns:
1159            True if there are changes, False otherwise.
1160        """
1161        environment = environment or self.config.default_target_environment
1162        environment = Environment.normalize_name(environment)
1163        context_diff = self._context_diff(environment)
1164        self.console.show_model_difference_summary(
1165            context_diff,
1166            EnvironmentNamingInfo.from_environment_catalog_mapping(
1167                self.config.environment_catalog_mapping,
1168                name=environment,
1169                suffix_target=self.config.environment_suffix_target,
1170            ),
1171            self.default_catalog,
1172            no_diff=not detailed,
1173        )
1174        return context_diff.has_changes

Show a diff of the current context with a given environment.

Arguments:
  • environment: The environment to diff against.
  • detailed: Show the actual SQL differences if True.
Returns:

True if there are changes, False otherwise.

def table_diff( self, source: str, target: str, on: 't.List[str] | exp.Condition | None' = None, model_or_snapshot: Union[<MagicMock id='139778330047584'>, NoneType] = None, where: 't.Optional[str | exp.Condition]' = None, limit: int = 20, show: bool = True, show_sample: bool = True) -> sqlmesh.core.table_diff.TableDiff:
1176    def table_diff(
1177        self,
1178        source: str,
1179        target: str,
1180        on: t.List[str] | exp.Condition | None = None,
1181        model_or_snapshot: t.Optional[ModelOrSnapshot] = None,
1182        where: t.Optional[str | exp.Condition] = None,
1183        limit: int = 20,
1184        show: bool = True,
1185        show_sample: bool = True,
1186    ) -> TableDiff:
1187        """Show a diff between two tables.
1188
1189        Args:
1190            source: The source environment or table.
1191            target: The target environment or table.
1192            on: The join condition, table aliases must be "s" and "t" for source and target.
1193                If omitted, the table's grain will be used.
1194            model_or_snapshot: The model or snapshot to use when environments are passed in.
1195            where: An optional where statement to filter results.
1196            limit: The limit of the sample dataframe.
1197            show: Show the table diff output in the console.
1198            show_sample: Show the sample dataframe in the console. Requires show=True.
1199
1200        Returns:
1201            The TableDiff object containing schema and summary differences.
1202        """
1203        source_alias, target_alias = source, target
1204        if model_or_snapshot:
1205            model = self.get_model(model_or_snapshot, raise_if_missing=True)
1206            source_env = self.state_reader.get_environment(source)
1207            target_env = self.state_reader.get_environment(target)
1208
1209            if not source_env:
1210                raise SQLMeshError(f"Could not find environment '{source}'")
1211            if not target_env:
1212                raise SQLMeshError(f"Could not find environment '{target}')")
1213
1214            source = next(
1215                snapshot for snapshot in source_env.snapshots if snapshot.name == model.fqn
1216            ).table_name()
1217            target = next(
1218                snapshot for snapshot in target_env.snapshots if snapshot.name == model.fqn
1219            ).table_name()
1220            source_alias = source_env.name
1221            target_alias = target_env.name
1222
1223            if not on:
1224                for ref in model.all_references:
1225                    if ref.unique:
1226                        on = ref.columns
1227
1228        if not on:
1229            raise SQLMeshError(
1230                "SQLMesh doesn't know how to join the two tables. Specify the `grains` in each model definition or pass join column names in separate `-o` flags."
1231            )
1232
1233        table_diff = TableDiff(
1234            adapter=self._engine_adapter,
1235            source=source,
1236            target=target,
1237            on=on,
1238            where=where,
1239            source_alias=source_alias,
1240            target_alias=target_alias,
1241            model_name=model.name if model_or_snapshot else None,
1242            limit=limit,
1243        )
1244        if show:
1245            self.console.show_schema_diff(table_diff.schema_diff())
1246            self.console.show_row_diff(table_diff.row_diff(), show_sample=show_sample)
1247        return table_diff

Show a diff between two tables.

Arguments:
  • source: The source environment or table.
  • target: The target environment or table.
  • on: The join condition, table aliases must be "s" and "t" for source and target. If omitted, the table's grain will be used.
  • model_or_snapshot: The model or snapshot to use when environments are passed in.
  • where: An optional where statement to filter results.
  • limit: The limit of the sample dataframe.
  • show: Show the table diff output in the console.
  • show_sample: Show the sample dataframe in the console. Requires show=True.
Returns:

The TableDiff object containing schema and summary differences.

def get_dag( self, select_models: Union[Collection[str], NoneType] = None, **options: Any) -> sqlglot.lineage.GraphHTML:
1249    def get_dag(
1250        self, select_models: t.Optional[t.Collection[str]] = None, **options: t.Any
1251    ) -> GraphHTML:
1252        """Gets an HTML object representation of the DAG.
1253
1254        Args:
1255            select_models: A list of model selection strings that should be included in the dag.
1256        Returns:
1257            An html object that renders the dag.
1258        """
1259        dag = (
1260            self.dag.prune(*self._new_selector().expand_model_selections(select_models))
1261            if select_models
1262            else self.dag
1263        )
1264
1265        nodes = {}
1266        edges: t.List[t.Dict] = []
1267
1268        for node, deps in dag.graph.items():
1269            nodes[node] = {
1270                "id": node,
1271                "label": node.split(".")[-1],
1272                "title": f"<span>{node}</span>",
1273            }
1274            edges.extend({"from": d, "to": node} for d in deps)
1275
1276        return GraphHTML(
1277            nodes,
1278            edges,
1279            options={
1280                "height": "100%",
1281                "width": "100%",
1282                "interaction": {},
1283                "layout": {
1284                    "hierarchical": {
1285                        "enabled": True,
1286                        "nodeSpacing": 200,
1287                        "sortMethod": "directed",
1288                    },
1289                },
1290                "nodes": {
1291                    "shape": "box",
1292                },
1293                **options,
1294            },
1295        )

Gets an HTML object representation of the DAG.

Arguments:
  • select_models: A list of model selection strings that should be included in the dag.
Returns:

An html object that renders the dag.

def render_dag( self, path: str, select_models: Union[Collection[str], NoneType] = None) -> None:
1297    def render_dag(self, path: str, select_models: t.Optional[t.Collection[str]] = None) -> None:
1298        """Render the dag as HTML and save it to a file.
1299
1300        Args:
1301            path: filename to save the dag html to
1302            select_models: A list of model selection strings that should be included in the dag.
1303        """
1304        file_path = Path(path)
1305        suffix = file_path.suffix
1306        if suffix != ".html":
1307            if suffix:
1308                logger.warning(
1309                    f"The extension {suffix} does not designate an html file. A file with a `.html` extension will be created instead."
1310                )
1311            path = str(file_path.with_suffix(".html"))
1312
1313        with open(path, "w", encoding="utf-8") as file:
1314            file.write(str(self.get_dag(select_models)))

Render the dag as HTML and save it to a file.

Arguments:
  • path: filename to save the dag html to
  • select_models: A list of model selection strings that should be included in the dag.
def create_test( self, model: str, input_queries: Dict[str, str], overwrite: bool = False, variables: Union[Dict[str, str], NoneType] = None, path: Union[str, NoneType] = None, name: Union[str, NoneType] = None, include_ctes: bool = False) -> None:
1316    def create_test(
1317        self,
1318        model: str,
1319        input_queries: t.Dict[str, str],
1320        overwrite: bool = False,
1321        variables: t.Optional[t.Dict[str, str]] = None,
1322        path: t.Optional[str] = None,
1323        name: t.Optional[str] = None,
1324        include_ctes: bool = False,
1325    ) -> None:
1326        """Generate a unit test fixture for a given model.
1327
1328        Args:
1329            model: The model to test.
1330            input_queries: Mapping of model names to queries. Each model included in this mapping
1331                will be populated in the test based on the results of the corresponding query.
1332            overwrite: Whether to overwrite the existing test in case of a file path collision.
1333                When set to False, an error will be raised if there is such a collision.
1334            variables: Key-value pairs that will define variables needed by the model.
1335            path: The file path corresponding to the fixture, relative to the test directory.
1336                By default, the fixture will be created under the test directory and the file name
1337                will be inferred from the test's name.
1338            name: The name of the test. This is inferred from the model name by default.
1339            include_ctes: When true, CTE fixtures will also be generated.
1340        """
1341        input_queries = {
1342            # The get_model here has two purposes: return normalized names & check for missing deps
1343            self.get_model(dep, raise_if_missing=True).fqn: query
1344            for dep, query in input_queries.items()
1345        }
1346
1347        generate_test(
1348            model=self.get_model(model, raise_if_missing=True),
1349            input_queries=input_queries,
1350            models=self._models,
1351            engine_adapter=self._engine_adapter,
1352            test_engine_adapter=self._test_engine_adapter,
1353            project_path=self.path,
1354            overwrite=overwrite,
1355            variables=variables,
1356            path=path,
1357            name=name,
1358            include_ctes=include_ctes,
1359        )

Generate a unit test fixture for a given model.

Arguments:
  • model: The model to test.
  • input_queries: Mapping of model names to queries. Each model included in this mapping will be populated in the test based on the results of the corresponding query.
  • overwrite: Whether to overwrite the existing test in case of a file path collision. When set to False, an error will be raised if there is such a collision.
  • variables: Key-value pairs that will define variables needed by the model.
  • path: The file path corresponding to the fixture, relative to the test directory. By default, the fixture will be created under the test directory and the file name will be inferred from the test's name.
  • name: The name of the test. This is inferred from the model name by default.
  • include_ctes: When true, CTE fixtures will also be generated.
def test( self, match_patterns: Union[List[str], NoneType] = None, tests: Union[List[str], NoneType] = None, verbose: bool = False, preserve_fixtures: bool = False, stream: Union[TextIO, NoneType] = None) -> sqlmesh.core.test.result.ModelTextTestResult:
1361    def test(
1362        self,
1363        match_patterns: t.Optional[t.List[str]] = None,
1364        tests: t.Optional[t.List[str]] = None,
1365        verbose: bool = False,
1366        preserve_fixtures: bool = False,
1367        stream: t.Optional[t.TextIO] = None,
1368    ) -> ModelTextTestResult:
1369        """Discover and run model tests"""
1370        if verbose:
1371            pd.set_option("display.max_columns", None)
1372            verbosity = 2
1373        else:
1374            verbosity = 1
1375
1376        try:
1377            if tests:
1378                result = run_model_tests(
1379                    tests=tests,
1380                    models=self._models,
1381                    engine_adapter=self._test_engine_adapter,
1382                    dialect=self.default_dialect,
1383                    verbosity=verbosity,
1384                    patterns=match_patterns,
1385                    preserve_fixtures=preserve_fixtures,
1386                    default_catalog=self.default_catalog,
1387                )
1388            else:
1389                test_meta = []
1390
1391                for path, config in self.configs.items():
1392                    test_meta.extend(
1393                        get_all_model_tests(
1394                            path / c.TESTS,
1395                            patterns=match_patterns,
1396                            ignore_patterns=config.ignore_patterns,
1397                        )
1398                    )
1399
1400                result = run_tests(
1401                    test_meta,
1402                    models=self._models,
1403                    engine_adapter=self._test_engine_adapter,
1404                    dialect=self.default_dialect,
1405                    verbosity=verbosity,
1406                    preserve_fixtures=preserve_fixtures,
1407                    stream=stream,
1408                    default_catalog=self.default_catalog,
1409                )
1410        finally:
1411            self._test_engine_adapter.close()
1412
1413        return result

Discover and run model tests

def audit( self, start: Union[datetime.date, datetime.datetime, str, int, float], end: Union[datetime.date, datetime.datetime, str, int, float], *, models: Union[Iterator[str], NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None) -> None:
1415    def audit(
1416        self,
1417        start: TimeLike,
1418        end: TimeLike,
1419        *,
1420        models: t.Optional[t.Iterator[str]] = None,
1421        execution_time: t.Optional[TimeLike] = None,
1422    ) -> None:
1423        """Audit models.
1424
1425        Args:
1426            start: The start of the interval to audit.
1427            end: The end of the interval to audit.
1428            models: The models to audit. All models will be audited if not specified.
1429            execution_time: The date/time time reference to use for execution time. Defaults to now.
1430        """
1431
1432        snapshots = (
1433            [self.get_snapshot(model, raise_if_missing=True) for model in models]
1434            if models
1435            else self.snapshots.values()
1436        )
1437
1438        num_audits = sum(len(snapshot.audits_with_args) for snapshot in snapshots)
1439        self.console.log_status_update(f"Found {num_audits} audit(s).")
1440        errors = []
1441        skipped_count = 0
1442        for snapshot in snapshots:
1443            for audit_result in self.snapshot_evaluator.audit(
1444                snapshot=snapshot,
1445                start=start,
1446                end=end,
1447                snapshots=self.snapshots,
1448                raise_exception=False,
1449            ):
1450                audit_id = f"{audit_result.audit.name}"
1451                if audit_result.model:
1452                    audit_id += f" on model {audit_result.model.name}"
1453
1454                if audit_result.skipped:
1455                    self.console.log_status_update(f"{audit_id} ⏸️ SKIPPED.")
1456                    skipped_count += 1
1457                elif audit_result.count:
1458                    errors.append(audit_result)
1459                    self.console.log_status_update(
1460                        f"{audit_id} ❌ [red]FAIL [{audit_result.count}][/red]."
1461                    )
1462                else:
1463                    self.console.log_status_update(f"{audit_id} ✅ [green]PASS[/green].")
1464
1465        self.console.log_status_update(
1466            f"\nFinished with {len(errors)} audit error{'' if len(errors) == 1 else 's'} "
1467            f"and {skipped_count} audit{'' if skipped_count == 1 else 's'} skipped."
1468        )
1469        for error in errors:
1470            self.console.log_status_update(
1471                f"\nFailure in audit {error.audit.name} ({error.audit._path})."
1472            )
1473            self.console.log_status_update(f"Got {error.count} results, expected 0.")
1474            if error.query:
1475                self.console.show_sql(
1476                    f"{error.query.sql(dialect=self.snapshot_evaluator.adapter.dialect)}"
1477                )
1478
1479        self.console.log_status_update("Done.")

Audit models.

Arguments:
  • start: The start of the interval to audit.
  • end: The end of the interval to audit.
  • models: The models to audit. All models will be audited if not specified.
  • execution_time: The date/time time reference to use for execution time. Defaults to now.
def rewrite(self, sql: str, dialect: str = '') -> sqlglot.expressions.Expression:
1481    def rewrite(self, sql: str, dialect: str = "") -> exp.Expression:
1482        """Rewrite a sql expression with semantic references into an executable query.
1483
1484        https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/
1485
1486        Args:
1487            sql: The sql string to rewrite.
1488            dialect: The dialect of the sql string, defaults to the project dialect.
1489
1490        Returns:
1491            A SQLGlot expression with semantic references expanded.
1492        """
1493        return rewrite(
1494            sql,
1495            graph=ReferenceGraph(self.models.values()),
1496            metrics=self._metrics,
1497            dialect=dialect or self.default_dialect,
1498        )

Rewrite a sql expression with semantic references into an executable query.

https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/

Arguments:
  • sql: The sql string to rewrite.
  • dialect: The dialect of the sql string, defaults to the project dialect.
Returns:

A SQLGlot expression with semantic references expanded.

def migrate(self) -> None:
1500    def migrate(self) -> None:
1501        """Migrates SQLMesh to the current running version.
1502
1503        Please contact your SQLMesh administrator before doing this.
1504        """
1505        self.notification_target_manager.notify(NotificationEvent.MIGRATION_START)
1506        try:
1507            self._new_state_sync().migrate(
1508                default_catalog=self.default_catalog,
1509                promoted_snapshots_only=self.config.migration.promoted_snapshots_only,
1510            )
1511        except Exception as e:
1512            self.notification_target_manager.notify(
1513                NotificationEvent.MIGRATION_FAILURE, traceback.format_exc()
1514            )
1515            raise e
1516        self.notification_target_manager.notify(NotificationEvent.MIGRATION_END)

Migrates SQLMesh to the current running version.

Please contact your SQLMesh administrator before doing this.

def rollback(self) -> None:
1518    def rollback(self) -> None:
1519        """Rolls back SQLMesh to the previous migration.
1520
1521        Please contact your SQLMesh administrator before doing this. This action cannot be undone.
1522        """
1523        self._new_state_sync().rollback()

Rolls back SQLMesh to the previous migration.

Please contact your SQLMesh administrator before doing this. This action cannot be undone.

def create_external_models(self) -> None:
1525    def create_external_models(self) -> None:
1526        """Create a schema file with all external models.
1527
1528        The schema file contains all columns and types of external models, allowing for more robust
1529        lineage, validation, and optimizations.
1530        """
1531        if not self._models:
1532            self.load(update_schemas=False)
1533
1534        for path, config in self.configs.items():
1535            create_schema_file(
1536                path=path / c.SCHEMA_YAML,
1537                models=UniqueKeyDict(
1538                    "models",
1539                    {
1540                        fqn: model
1541                        for fqn, model in self._models.items()
1542                        if self.config_for_node(model) is config
1543                    },
1544                ),
1545                adapter=self._engine_adapter,
1546                state_reader=self.state_reader,
1547                dialect=config.model_defaults.dialect,
1548                max_workers=self.concurrent_tasks,
1549            )

Create a schema file with all external models.

The schema file contains all columns and types of external models, allowing for more robust lineage, validation, and optimizations.

def print_info(self) -> None:
1551    def print_info(self) -> None:
1552        """Prints information about connections, models, macros, etc. to the console."""
1553        self.console.log_status_update(f"Models: {len(self.models)}")
1554        self.console.log_status_update(f"Macros: {len(self._macros) - len(macro.get_registry())}")
1555
1556        self._try_connection("data warehouse", self._engine_adapter)
1557
1558        state_connection = self.config.get_state_connection(self.gateway)
1559        if state_connection:
1560            self._try_connection("state backend", state_connection.create_engine_adapter())
1561
1562        self._try_connection("test", self._test_engine_adapter)

Prints information about connections, models, macros, etc. to the console.

def close(self) -> None:
1564    def close(self) -> None:
1565        """Releases all resources allocated by this context."""
1566        self.snapshot_evaluator.close()
1567        self.state_sync.close()

Releases all resources allocated by this context.

def table_name(self, model_name: str, dev: bool) -> str:
1643    def table_name(self, model_name: str, dev: bool) -> str:
1644        """Returns the name of the pysical table for the given model name.
1645
1646        Args:
1647            model_name: The name of the model.
1648            dev: Whether to use the deployability index for the table name.
1649
1650        Returns:
1651            The name of the physical table.
1652        """
1653        deployability_index = (
1654            DeployabilityIndex.create(self.snapshots.values())
1655            if dev
1656            else DeployabilityIndex.all_deployable()
1657        )
1658        snapshot = self.get_snapshot(model_name)
1659        if not snapshot:
1660            raise SQLMeshError(f"Model '{model_name}' was not found.")
1661        return snapshot.table_name(is_deployable=deployability_index.is_deployable(snapshot))

Returns the name of the pysical table for the given model name.

Arguments:
  • model_name: The name of the model.
  • dev: Whether to use the deployability index for the table name.
Returns:

The name of the physical table.

def clear_caches(self) -> None:
1663    def clear_caches(self) -> None:
1664        for path in self.configs:
1665            rmtree(path / c.CACHE)
1851class Context(GenericContext[Config]):
1852    CONFIG_TYPE = Config

Encapsulates a SQLMesh environment supplying convenient functions to perform various tasks.

Arguments:
  • engine_adapter: The default engine adapter to use.
  • notification_targets: The notification target to use. Defaults to what is defined in config.
  • paths: The directories containing SQLMesh files.
  • config: A Config object or the name of a Config object in config.py.
  • connection: The name of the connection. If not specified the first connection as it appears in configuration will be used.
  • test_connection: The name of the connection to use for tests. If not specified the first connection as it appears in configuration will be used.
  • concurrent_tasks: The maximum number of tasks that can use the connection concurrently.
  • load: Whether or not to automatically load all models and macros (default True).
  • console: The rich instance used for printing out CLI command results.
  • users: A list of users to make known to SQLMesh.
  • config_type: The type of config object to use (default Config).