Context
A SQLMesh context encapsulates a SQLMesh environment. When you create a new context, it will discover and
load your project's models, macros, and audits. Afterwards, you can use the context to create and apply
plans, visualize your model's lineage, run your audits and model tests, and perform various other tasks.
For more information regarding what a context can do, see sqlmesh.core.context.Context
.
Examples:
Creating and applying a plan against the staging environment.
from sqlmesh.core.context import Context
context = Context(path="example", config="local_config")
plan = context.plan("staging")
context.apply(plan)
Running audits on your data.
from sqlmesh.core.context import Context
context = Context(path="example", config="local_config")
context.audit("yesterday", "now")
Running tests on your models.
from sqlmesh.core.context import Context
context = Context(path="example")
context.test()
1""" 2# Context 3 4A SQLMesh context encapsulates a SQLMesh environment. When you create a new context, it will discover and 5load your project's models, macros, and audits. Afterwards, you can use the context to create and apply 6plans, visualize your model's lineage, run your audits and model tests, and perform various other tasks. 7For more information regarding what a context can do, see `sqlmesh.core.context.Context`. 8 9# Examples: 10 11Creating and applying a plan against the staging environment. 12```python 13from sqlmesh.core.context import Context 14context = Context(path="example", config="local_config") 15plan = context.plan("staging") 16context.apply(plan) 17``` 18 19Running audits on your data. 20```python 21from sqlmesh.core.context import Context 22context = Context(path="example", config="local_config") 23context.audit("yesterday", "now") 24``` 25 26Running tests on your models. 27```python 28from sqlmesh.core.context import Context 29context = Context(path="example") 30context.test() 31``` 32""" 33 34from __future__ import annotations 35 36import abc 37import collections 38import gc 39import logging 40import time 41import traceback 42import typing as t 43import unittest.result 44from datetime import timedelta 45from functools import cached_property 46from io import StringIO 47from pathlib import Path 48from shutil import rmtree 49from types import MappingProxyType 50 51import pandas as pd 52from sqlglot import exp 53from sqlglot.lineage import GraphHTML 54 55from sqlmesh.core import constants as c 56from sqlmesh.core.audit import Audit, StandaloneAudit 57from sqlmesh.core.config import CategorizerConfig, Config, load_configs 58from sqlmesh.core.config.loader import C 59from sqlmesh.core.console import Console, get_console 60from sqlmesh.core.context_diff import ContextDiff 61from sqlmesh.core.dialect import ( 62 format_model_expressions, 63 normalize_model_name, 64 pandas_to_sql, 65 parse, 66 parse_one, 67) 68from sqlmesh.core.engine_adapter import EngineAdapter 69from sqlmesh.core.environment import Environment, EnvironmentNamingInfo 70from sqlmesh.core.loader import Loader, update_model_schemas 71from sqlmesh.core.macros import ExecutableOrMacro, macro 72from sqlmesh.core.metric import Metric, rewrite 73from sqlmesh.core.model import Model 74from sqlmesh.core.notification_target import ( 75 NotificationEvent, 76 NotificationTarget, 77 NotificationTargetManager, 78) 79from sqlmesh.core.plan import Plan, PlanBuilder 80from sqlmesh.core.reference import ReferenceGraph 81from sqlmesh.core.scheduler import Scheduler 82from sqlmesh.core.schema_loader import create_schema_file 83from sqlmesh.core.selector import Selector 84from sqlmesh.core.snapshot import ( 85 DeployabilityIndex, 86 Snapshot, 87 SnapshotEvaluator, 88 SnapshotFingerprint, 89 to_table_mapping, 90) 91from sqlmesh.core.state_sync import ( 92 CachingStateSync, 93 StateReader, 94 StateSync, 95 cleanup_expired_views, 96) 97from sqlmesh.core.table_diff import TableDiff 98from sqlmesh.core.test import ( 99 ModelTextTestResult, 100 generate_test, 101 get_all_model_tests, 102 run_model_tests, 103 run_tests, 104) 105from sqlmesh.core.user import User 106from sqlmesh.utils import UniqueKeyDict, sys_path 107from sqlmesh.utils.dag import DAG 108from sqlmesh.utils.date import TimeLike, now_ds, to_date 109from sqlmesh.utils.errors import ( 110 CircuitBreakerError, 111 ConfigError, 112 PlanError, 113 SQLMeshError, 114 UncategorizedPlanError, 115) 116from sqlmesh.utils.jinja import JinjaMacroRegistry 117 118if t.TYPE_CHECKING: 119 from typing_extensions import Literal 120 121 from sqlmesh.core.engine_adapter._typing import DF, PySparkDataFrame, PySparkSession 122 from sqlmesh.core.snapshot import Node 123 124 ModelOrSnapshot = t.Union[str, Model, Snapshot] 125 NodeOrSnapshot = t.Union[str, Model, StandaloneAudit, Snapshot] 126 127logger = logging.getLogger(__name__) 128 129 130class BaseContext(abc.ABC): 131 """The base context which defines methods to execute a model.""" 132 133 @property 134 @abc.abstractmethod 135 def default_dialect(self) -> t.Optional[str]: 136 """Returns the default dialect.""" 137 138 @property 139 @abc.abstractmethod 140 def _model_tables(self) -> t.Dict[str, str]: 141 """Returns a mapping of model names to tables.""" 142 143 @property 144 @abc.abstractmethod 145 def engine_adapter(self) -> EngineAdapter: 146 """Returns an engine adapter.""" 147 148 @property 149 def spark(self) -> t.Optional[PySparkSession]: 150 """Returns the spark session if it exists.""" 151 return self.engine_adapter.spark 152 153 @property 154 def default_catalog(self) -> t.Optional[str]: 155 raise NotImplementedError 156 157 def table(self, model_name: str) -> str: 158 """Gets the physical table name for a given model. 159 160 Args: 161 model_name: The model name. 162 163 Returns: 164 The physical table name. 165 """ 166 model_name = normalize_model_name(model_name, self.default_catalog, self.default_dialect) 167 168 # We generate SQL for the default dialect because the table name may be used in a 169 # fetchdf call and so the quotes need to be correct (eg. backticks for bigquery) 170 return parse_one(self._model_tables[model_name]).sql(dialect=self.default_dialect) 171 172 def fetchdf( 173 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 174 ) -> pd.DataFrame: 175 """Fetches a dataframe given a sql string or sqlglot expression. 176 177 Args: 178 query: SQL string or sqlglot expression. 179 quote_identifiers: Whether to quote all identifiers in the query. 180 181 Returns: 182 The default dataframe is Pandas, but for Spark a PySpark dataframe is returned. 183 """ 184 return self.engine_adapter.fetchdf(query, quote_identifiers=quote_identifiers) 185 186 def fetch_pyspark_df( 187 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 188 ) -> PySparkDataFrame: 189 """Fetches a PySpark dataframe given a sql string or sqlglot expression. 190 191 Args: 192 query: SQL string or sqlglot expression. 193 quote_identifiers: Whether to quote all identifiers in the query. 194 195 Returns: 196 A PySpark dataframe. 197 """ 198 return self.engine_adapter.fetch_pyspark_df(query, quote_identifiers=quote_identifiers) 199 200 201class ExecutionContext(BaseContext): 202 """The minimal context needed to execute a model. 203 204 Args: 205 engine_adapter: The engine adapter to execute queries against. 206 snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations. 207 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 208 """ 209 210 def __init__( 211 self, 212 engine_adapter: EngineAdapter, 213 snapshots: t.Dict[str, Snapshot], 214 deployability_index: t.Optional[DeployabilityIndex] = None, 215 default_dialect: t.Optional[str] = None, 216 default_catalog: t.Optional[str] = None, 217 variables: t.Optional[t.Dict[str, t.Any]] = None, 218 ): 219 self.snapshots = snapshots 220 self.deployability_index = deployability_index 221 self._engine_adapter = engine_adapter 222 self._default_catalog = default_catalog 223 self._default_dialect = default_dialect 224 self._variables = variables or {} 225 226 @property 227 def default_dialect(self) -> t.Optional[str]: 228 return self._default_dialect 229 230 @property 231 def engine_adapter(self) -> EngineAdapter: 232 """Returns an engine adapter.""" 233 return self._engine_adapter 234 235 @cached_property 236 def _model_tables(self) -> t.Dict[str, str]: 237 """Returns a mapping of model names to tables.""" 238 return to_table_mapping(self.snapshots.values(), self.deployability_index) 239 240 @property 241 def default_catalog(self) -> t.Optional[str]: 242 return self._default_catalog 243 244 @property 245 def gateway(self) -> t.Optional[str]: 246 """Returns the gateway name.""" 247 return self.var(c.GATEWAY) 248 249 def var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]: 250 """Returns a variable value.""" 251 return self._variables.get(var_name.lower(), default) 252 253 def with_variables(self, variables: t.Dict[str, t.Any]) -> ExecutionContext: 254 """Returns a new ExecutionContext with additional variables.""" 255 return ExecutionContext( 256 self._engine_adapter, 257 self.snapshots, 258 self.deployability_index, 259 self._default_dialect, 260 self._default_catalog, 261 variables=variables, 262 ) 263 264 265class GenericContext(BaseContext, t.Generic[C]): 266 """Encapsulates a SQLMesh environment supplying convenient functions to perform various tasks. 267 268 Args: 269 engine_adapter: The default engine adapter to use. 270 notification_targets: The notification target to use. Defaults to what is defined in config. 271 paths: The directories containing SQLMesh files. 272 config: A Config object or the name of a Config object in config.py. 273 connection: The name of the connection. If not specified the first connection as it appears 274 in configuration will be used. 275 test_connection: The name of the connection to use for tests. If not specified the first 276 connection as it appears in configuration will be used. 277 concurrent_tasks: The maximum number of tasks that can use the connection concurrently. 278 load: Whether or not to automatically load all models and macros (default True). 279 console: The rich instance used for printing out CLI command results. 280 users: A list of users to make known to SQLMesh. 281 config_type: The type of config object to use (default Config). 282 """ 283 284 CONFIG_TYPE: t.Type[C] 285 286 def __init__( 287 self, 288 engine_adapter: t.Optional[EngineAdapter] = None, 289 notification_targets: t.Optional[t.List[NotificationTarget]] = None, 290 state_sync: t.Optional[StateSync] = None, 291 paths: t.Union[str | Path, t.Iterable[str | Path]] = "", 292 config: t.Optional[t.Union[C, str, t.Dict[Path, C]]] = None, 293 gateway: t.Optional[str] = None, 294 concurrent_tasks: t.Optional[int] = None, 295 loader: t.Optional[t.Type[Loader]] = None, 296 load: bool = True, 297 console: t.Optional[Console] = None, 298 users: t.Optional[t.List[User]] = None, 299 ): 300 self.console = console or get_console() 301 self.configs = ( 302 config if isinstance(config, dict) else load_configs(config, self.CONFIG_TYPE, paths) 303 ) 304 self.dag: DAG[str] = DAG() 305 self._models: UniqueKeyDict[str, Model] = UniqueKeyDict("models") 306 self._audits: UniqueKeyDict[str, Audit] = UniqueKeyDict("audits") 307 self._standalone_audits: UniqueKeyDict[str, StandaloneAudit] = UniqueKeyDict( 308 "standaloneaudits" 309 ) 310 self._macros: UniqueKeyDict[str, ExecutableOrMacro] = UniqueKeyDict("macros") 311 self._metrics: UniqueKeyDict[str, Metric] = UniqueKeyDict("metrics") 312 self._jinja_macros = JinjaMacroRegistry() 313 self._default_catalog: t.Optional[str] = None 314 315 self.path, self.config = t.cast(t.Tuple[Path, C], next(iter(self.configs.items()))) 316 317 self.gateway = gateway 318 self._scheduler = self.config.get_scheduler(self.gateway) 319 self.environment_ttl = self.config.environment_ttl 320 self.pinned_environments = Environment.normalize_names(self.config.pinned_environments) 321 self.auto_categorize_changes = self.config.plan.auto_categorize_changes 322 323 self._connection_config = self.config.get_connection(self.gateway) 324 self.concurrent_tasks = concurrent_tasks or self._connection_config.concurrent_tasks 325 self._engine_adapter = engine_adapter or self._connection_config.create_engine_adapter() 326 327 self._test_connection_config = self.config.get_test_connection( 328 self.gateway, self.default_catalog, default_catalog_dialect=self.engine_adapter.DIALECT 329 ) 330 331 self._snapshot_evaluator: t.Optional[SnapshotEvaluator] = None 332 333 self._provided_state_sync: t.Optional[StateSync] = state_sync 334 self._state_sync: t.Optional[StateSync] = None 335 336 self._loader = (loader or self.config.loader)(**self.config.loader_kwargs) 337 338 # Should we dedupe notification_targets? If so how? 339 self.notification_targets = (notification_targets or []) + self.config.notification_targets 340 self.users = (users or []) + self.config.users 341 self.users = list({user.username: user for user in self.users}.values()) 342 self._register_notification_targets() 343 344 if ( 345 self.config.environment_catalog_mapping 346 and not self.engine_adapter.CATALOG_SUPPORT.is_multi_catalog_supported 347 ): 348 raise SQLMeshError( 349 "Environment catalog mapping is only supported for engine adapters that support multiple catalogs" 350 ) 351 352 if load: 353 self.load() 354 355 @property 356 def default_dialect(self) -> t.Optional[str]: 357 return self.config.dialect 358 359 @property 360 def engine_adapter(self) -> EngineAdapter: 361 """Returns an engine adapter.""" 362 return self._engine_adapter 363 364 @property 365 def snapshot_evaluator(self) -> SnapshotEvaluator: 366 if not self._snapshot_evaluator: 367 self._snapshot_evaluator = SnapshotEvaluator( 368 self.engine_adapter.with_log_level(logging.INFO), 369 ddl_concurrent_tasks=self.concurrent_tasks, 370 ) 371 return self._snapshot_evaluator 372 373 def execution_context( 374 self, deployability_index: t.Optional[DeployabilityIndex] = None 375 ) -> ExecutionContext: 376 """Returns an execution context.""" 377 return ExecutionContext( 378 engine_adapter=self._engine_adapter, 379 snapshots=self.snapshots, 380 deployability_index=deployability_index, 381 default_dialect=self.default_dialect, 382 default_catalog=self.default_catalog, 383 ) 384 385 def upsert_model(self, model: t.Union[str, Model], **kwargs: t.Any) -> Model: 386 """Update or insert a model. 387 388 The context's models dictionary will be updated to include these changes. 389 390 Args: 391 model: Model name or instance to update. 392 kwargs: The kwargs to update the model with. 393 394 Returns: 395 A new instance of the updated or inserted model. 396 """ 397 model = self.get_model(model, raise_if_missing=True) 398 path = model._path 399 400 # model.copy() can't be used here due to a cached state that can be a part of a model instance. 401 model = t.cast(Model, type(model)(**{**t.cast(Model, model).dict(), **kwargs})) 402 model._path = path 403 404 self._models.update({model.fqn: model}) 405 self.dag.add(model.fqn, model.depends_on) 406 update_model_schemas( 407 self.dag, 408 self._models, 409 self.path, 410 ) 411 412 model.validate_definition() 413 414 return model 415 416 def scheduler(self, environment: t.Optional[str] = None) -> Scheduler: 417 """Returns the built-in scheduler. 418 419 Args: 420 environment: The target environment to source model snapshots from, or None 421 if snapshots should be sourced from the currently loaded local state. 422 423 Returns: 424 The built-in scheduler instance. 425 """ 426 snapshots: t.Iterable[Snapshot] 427 if environment is not None: 428 stored_environment = self.state_sync.get_environment(environment) 429 if stored_environment is None: 430 raise ConfigError(f"Environment '{environment}' was not found.") 431 snapshots = self.state_sync.get_snapshots(stored_environment.snapshots).values() 432 else: 433 snapshots = self.snapshots.values() 434 435 if not snapshots: 436 raise ConfigError("No models were found") 437 438 return Scheduler( 439 snapshots, 440 self.snapshot_evaluator, 441 self.state_sync, 442 default_catalog=self.default_catalog, 443 max_workers=self.concurrent_tasks, 444 console=self.console, 445 notification_target_manager=self.notification_target_manager, 446 ) 447 448 @property 449 def state_sync(self) -> StateSync: 450 if not self._state_sync: 451 self._state_sync = self._new_state_sync() 452 453 if self._state_sync.get_versions(validate=False).schema_version == 0: 454 self._state_sync.migrate(default_catalog=self.default_catalog) 455 self._state_sync.get_versions() 456 self._state_sync = CachingStateSync(self._state_sync) # type: ignore 457 return self._state_sync 458 459 @property 460 def state_reader(self) -> StateReader: 461 return self.state_sync 462 463 def refresh(self) -> None: 464 """Refresh all models that have been updated.""" 465 if self._loader.reload_needed(): 466 self.load() 467 468 def load(self, update_schemas: bool = True) -> GenericContext[C]: 469 """Load all files in the context's path.""" 470 with sys_path(*self.configs): 471 gc.disable() 472 project = self._loader.load(self, update_schemas) 473 self._macros = project.macros 474 self._jinja_macros = project.jinja_macros 475 self._models = project.models 476 self._metrics = project.metrics 477 self._standalone_audits.clear() 478 self._audits.clear() 479 for name, audit in project.audits.items(): 480 if isinstance(audit, StandaloneAudit): 481 self._standalone_audits[name] = audit 482 else: 483 self._audits[name] = audit 484 self.dag = project.dag 485 gc.enable() 486 487 duplicates = set(self._models) & set(self._standalone_audits) 488 if duplicates: 489 raise ConfigError( 490 f"Models and Standalone audits cannot have the same name: {duplicates}" 491 ) 492 493 return self 494 495 def run( 496 self, 497 environment: t.Optional[str] = None, 498 *, 499 start: t.Optional[TimeLike] = None, 500 end: t.Optional[TimeLike] = None, 501 execution_time: t.Optional[TimeLike] = None, 502 skip_janitor: bool = False, 503 ignore_cron: bool = False, 504 ) -> bool: 505 """Run the entire dag through the scheduler. 506 507 Args: 508 environment: The target environment to source model snapshots from and virtually update. Default: prod. 509 start: The start of the interval to render. 510 end: The end of the interval to render. 511 execution_time: The date/time time reference to use for execution time. Defaults to now. 512 skip_janitor: Whether to skip the janitor task. 513 ignore_cron: Whether to ignore the model's cron schedule and run all available missing intervals. 514 515 Returns: 516 True if the run was successful, False otherwise. 517 """ 518 environment = environment or self.config.default_target_environment 519 520 self.notification_target_manager.notify( 521 NotificationEvent.RUN_START, environment=environment 522 ) 523 success = False 524 try: 525 success = self._run( 526 environment=environment, 527 start=start, 528 end=end, 529 execution_time=execution_time, 530 skip_janitor=skip_janitor, 531 ignore_cron=ignore_cron, 532 ) 533 except Exception as e: 534 self.notification_target_manager.notify( 535 NotificationEvent.RUN_FAILURE, traceback.format_exc() 536 ) 537 logger.error(f"Run Failure: {traceback.format_exc()}") 538 raise e 539 540 if success: 541 self.notification_target_manager.notify( 542 NotificationEvent.RUN_END, environment=environment 543 ) 544 self.console.log_success(f"Run finished for environment '{environment}'") 545 else: 546 self.notification_target_manager.notify( 547 NotificationEvent.RUN_FAILURE, "See console logs for details." 548 ) 549 550 return success 551 552 @t.overload 553 def get_model( 554 self, model_or_snapshot: ModelOrSnapshot, raise_if_missing: Literal[True] = True 555 ) -> Model: ... 556 557 @t.overload 558 def get_model( 559 self, 560 model_or_snapshot: ModelOrSnapshot, 561 raise_if_missing: Literal[False] = False, 562 ) -> t.Optional[Model]: ... 563 564 def get_model( 565 self, model_or_snapshot: ModelOrSnapshot, raise_if_missing: bool = False 566 ) -> t.Optional[Model]: 567 """Returns a model with the given name or None if a model with such name doesn't exist. 568 569 Args: 570 model_or_snapshot: A model name, model, or snapshot. 571 raise_if_missing: Raises an error if a model is not found. 572 573 Returns: 574 The expected model. 575 """ 576 if isinstance(model_or_snapshot, str): 577 normalized_name = normalize_model_name( 578 model_or_snapshot, 579 dialect=self.default_dialect, 580 default_catalog=self.default_catalog, 581 ) 582 model = self._models.get(normalized_name) 583 elif isinstance(model_or_snapshot, Snapshot): 584 model = model_or_snapshot.model 585 else: 586 model = model_or_snapshot 587 588 if raise_if_missing and not model: 589 raise SQLMeshError(f"Cannot find model for '{model_or_snapshot}'") 590 591 return model 592 593 @t.overload 594 def get_snapshot(self, node_or_snapshot: NodeOrSnapshot) -> t.Optional[Snapshot]: ... 595 596 @t.overload 597 def get_snapshot( 598 self, node_or_snapshot: NodeOrSnapshot, raise_if_missing: Literal[True] 599 ) -> Snapshot: ... 600 601 @t.overload 602 def get_snapshot( 603 self, node_or_snapshot: NodeOrSnapshot, raise_if_missing: Literal[False] 604 ) -> t.Optional[Snapshot]: ... 605 606 def get_snapshot( 607 self, node_or_snapshot: NodeOrSnapshot, raise_if_missing: bool = False 608 ) -> t.Optional[Snapshot]: 609 """Returns a snapshot with the given name or None if a snapshot with such name doesn't exist. 610 611 Args: 612 node_or_snapshot: A node name, node, or snapshot. 613 raise_if_missing: Raises an error if a snapshot is not found. 614 615 Returns: 616 The expected snapshot. 617 """ 618 if isinstance(node_or_snapshot, Snapshot): 619 return node_or_snapshot 620 if isinstance(node_or_snapshot, str) and not self.standalone_audits.get(node_or_snapshot): 621 node_or_snapshot = normalize_model_name( 622 node_or_snapshot, 623 dialect=self.default_dialect, 624 default_catalog=self.default_catalog, 625 ) 626 fqn = node_or_snapshot if isinstance(node_or_snapshot, str) else node_or_snapshot.fqn 627 snapshot = self.snapshots.get(fqn) 628 629 if raise_if_missing and not snapshot: 630 raise SQLMeshError(f"Cannot find snapshot for '{fqn}'") 631 632 return snapshot 633 634 def config_for_path(self, path: Path) -> Config: 635 for config_path, config in self.configs.items(): 636 try: 637 path.relative_to(config_path) 638 return config 639 except ValueError: 640 pass 641 return self.config 642 643 def config_for_node(self, node: str | Model | StandaloneAudit) -> Config: 644 if isinstance(node, str): 645 return self.config_for_path(self.get_snapshot(node, raise_if_missing=True).node._path) # type: ignore 646 return self.config_for_path(node._path) # type: ignore 647 648 @property 649 def models(self) -> MappingProxyType[str, Model]: 650 """Returns all registered models in this context.""" 651 return MappingProxyType(self._models) 652 653 @property 654 def metrics(self) -> MappingProxyType[str, Metric]: 655 """Returns all registered metrics in this context.""" 656 return MappingProxyType(self._metrics) 657 658 @property 659 def standalone_audits(self) -> MappingProxyType[str, StandaloneAudit]: 660 """Returns all registered standalone audits in this context.""" 661 return MappingProxyType(self._standalone_audits) 662 663 @property 664 def snapshots(self) -> t.Dict[str, Snapshot]: 665 """Generates and returns snapshots based on models registered in this context. 666 667 If one of the snapshots has been previously stored in the persisted state, the stored 668 instance will be returned. 669 """ 670 return self._snapshots() 671 672 @property 673 def default_catalog(self) -> t.Optional[str]: 674 if self._default_catalog is None: 675 self._default_catalog = self._scheduler.get_default_catalog(self) 676 return self._default_catalog 677 678 def render( 679 self, 680 model_or_snapshot: ModelOrSnapshot, 681 *, 682 start: t.Optional[TimeLike] = None, 683 end: t.Optional[TimeLike] = None, 684 execution_time: t.Optional[TimeLike] = None, 685 expand: t.Union[bool, t.Iterable[str]] = False, 686 **kwargs: t.Any, 687 ) -> exp.Expression: 688 """Renders a model's query, expanding macros with provided kwargs, and optionally expanding referenced models. 689 690 Args: 691 model_or_snapshot: The model, model name, or snapshot to render. 692 start: The start of the interval to render. 693 end: The end of the interval to render. 694 execution_time: The date/time time reference to use for execution time. Defaults to now. 695 expand: Whether or not to use expand materialized models, defaults to False. 696 If True, all referenced models are expanded as raw queries. 697 If a list, only referenced models are expanded as raw queries. 698 699 Returns: 700 The rendered expression. 701 """ 702 execution_time = execution_time or now_ds() 703 704 model = self.get_model(model_or_snapshot, raise_if_missing=True) 705 706 if expand and not isinstance(expand, bool): 707 expand = { 708 normalize_model_name( 709 x, default_catalog=self.default_catalog, dialect=self.default_dialect 710 ) 711 for x in expand 712 } 713 714 expand = self.dag.upstream(model.fqn) if expand is True else expand or [] 715 716 if model.is_seed: 717 df = next( 718 model.render( 719 context=self.execution_context(), 720 start=start, 721 end=end, 722 execution_time=execution_time, 723 **kwargs, 724 ) 725 ) 726 return next(pandas_to_sql(t.cast(pd.DataFrame, df), model.columns_to_types)) 727 728 return model.render_query_or_raise( 729 start=start, 730 end=end, 731 execution_time=execution_time, 732 snapshots=self.snapshots, 733 expand=expand, 734 engine_adapter=self.engine_adapter, 735 **kwargs, 736 ) 737 738 def evaluate( 739 self, 740 model_or_snapshot: ModelOrSnapshot, 741 start: TimeLike, 742 end: TimeLike, 743 execution_time: TimeLike, 744 limit: t.Optional[int] = None, 745 **kwargs: t.Any, 746 ) -> DF: 747 """Evaluate a model or snapshot (running its query against a DB/Engine). 748 749 This method is used to test or iterate on models without side effects. 750 751 Args: 752 model_or_snapshot: The model, model name, or snapshot to render. 753 start: The start of the interval to evaluate. 754 end: The end of the interval to evaluate. 755 execution_time: The date/time time reference to use for execution time. 756 limit: A limit applied to the model. 757 """ 758 snapshot = self.get_snapshot(model_or_snapshot, raise_if_missing=True) 759 760 df = self.snapshot_evaluator.evaluate_and_fetch( 761 snapshot, 762 start=start, 763 end=end, 764 execution_time=execution_time, 765 snapshots=self.snapshots, 766 limit=limit or c.DEFAULT_MAX_LIMIT, 767 ) 768 769 if df is None: 770 raise RuntimeError(f"Error evaluating {snapshot.name}") 771 772 return df 773 774 def format( 775 self, 776 transpile: t.Optional[str] = None, 777 append_newline: t.Optional[bool] = None, 778 **kwargs: t.Any, 779 ) -> None: 780 """Format all SQL models.""" 781 for model in self._models.values(): 782 if not model._path.suffix == ".sql": 783 continue 784 with open(model._path, "r+", encoding="utf-8") as file: 785 expressions = parse( 786 file.read(), default_dialect=self.config_for_node(model).dialect 787 ) 788 if transpile: 789 for prop in expressions[0].expressions: 790 if prop.name.lower() == "dialect": 791 prop.replace( 792 exp.Property( 793 this="dialect", 794 value=exp.Literal.string(transpile or model.dialect), 795 ) 796 ) 797 format = self.config_for_node(model).format 798 opts = {**format.generator_options, **kwargs} 799 file.seek(0) 800 file.write( 801 format_model_expressions(expressions, transpile or model.dialect, **opts) 802 ) 803 if append_newline is None: 804 append_newline = format.append_newline 805 if append_newline: 806 file.write("\n") 807 file.truncate() 808 809 def plan( 810 self, 811 environment: t.Optional[str] = None, 812 *, 813 start: t.Optional[TimeLike] = None, 814 end: t.Optional[TimeLike] = None, 815 execution_time: t.Optional[TimeLike] = None, 816 create_from: t.Optional[str] = None, 817 skip_tests: bool = False, 818 restate_models: t.Optional[t.Iterable[str]] = None, 819 no_gaps: bool = False, 820 skip_backfill: bool = False, 821 forward_only: t.Optional[bool] = None, 822 no_prompts: t.Optional[bool] = None, 823 auto_apply: t.Optional[bool] = None, 824 no_auto_categorization: t.Optional[bool] = None, 825 effective_from: t.Optional[TimeLike] = None, 826 include_unmodified: t.Optional[bool] = None, 827 select_models: t.Optional[t.Collection[str]] = None, 828 backfill_models: t.Optional[t.Collection[str]] = None, 829 categorizer_config: t.Optional[CategorizerConfig] = None, 830 enable_preview: t.Optional[bool] = None, 831 no_diff: t.Optional[bool] = None, 832 run: bool = False, 833 ) -> Plan: 834 """Interactively creates a plan. 835 836 This method compares the current context with the target environment. It then presents 837 the differences and asks whether to backfill each modified model. 838 839 Args: 840 environment: The environment to diff and plan against. 841 start: The start date of the backfill if there is one. 842 end: The end date of the backfill if there is one. 843 execution_time: The date/time reference to use for execution time. Defaults to now. 844 create_from: The environment to create the target environment from if it 845 doesn't exist. If not specified, the "prod" environment will be used. 846 skip_tests: Unit tests are run by default so this will skip them if enabled 847 restate_models: A list of either internal or external models, or tags, that need to be restated 848 for the given plan interval. If the target environment is a production environment, 849 ALL snapshots that depended on these upstream tables will have their intervals deleted 850 (even ones not in this current environment). Only the snapshots in this environment will 851 be backfilled whereas others need to be recovered on a future plan application. For development 852 environments only snapshots that are part of this plan will be affected. 853 no_gaps: Whether to ensure that new snapshots for models that are already a 854 part of the target environment have no data gaps when compared against previous 855 snapshots for same models. 856 skip_backfill: Whether to skip the backfill step. Default: False. 857 forward_only: Whether the purpose of the plan is to make forward only changes. 858 no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that 859 if this flag is set to true and there are uncategorized changes the plan creation will 860 fail. Default: False. 861 auto_apply: Whether to automatically apply the new plan after creation. Default: False. 862 no_auto_categorization: Indicates whether to disable automatic categorization of model 863 changes (breaking / non-breaking). If not provided, then the corresponding configuration 864 option determines the behavior. 865 categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the 866 project config by default. 867 effective_from: The effective date from which to apply forward-only changes on production. 868 include_unmodified: Indicates whether to include unmodified models in the target development environment. 869 select_models: A list of model selection strings to filter the models that should be included into this plan. 870 backfill_models: A list of model selection strings to filter the models for which the data should be backfilled. 871 enable_preview: Indicates whether to enable preview for forward-only models in development environments. 872 no_diff: Hide text differences for changed models. 873 run: Whether to run latest intervals as part of the plan application. 874 875 Returns: 876 The populated Plan object. 877 """ 878 plan_builder = self.plan_builder( 879 environment, 880 start=start, 881 end=end, 882 execution_time=execution_time, 883 create_from=create_from, 884 skip_tests=skip_tests, 885 restate_models=restate_models, 886 no_gaps=no_gaps, 887 skip_backfill=skip_backfill, 888 forward_only=forward_only, 889 no_auto_categorization=no_auto_categorization, 890 effective_from=effective_from, 891 include_unmodified=include_unmodified, 892 select_models=select_models, 893 backfill_models=backfill_models, 894 categorizer_config=categorizer_config, 895 enable_preview=enable_preview, 896 run=run, 897 ) 898 899 self.console.plan( 900 plan_builder, 901 auto_apply if auto_apply is not None else self.config.plan.auto_apply, 902 self.default_catalog, 903 no_diff=no_diff if no_diff is not None else self.config.plan.no_diff, 904 no_prompts=no_prompts if no_prompts is not None else self.config.plan.no_prompts, 905 ) 906 907 return plan_builder.build() 908 909 def plan_builder( 910 self, 911 environment: t.Optional[str] = None, 912 *, 913 start: t.Optional[TimeLike] = None, 914 end: t.Optional[TimeLike] = None, 915 execution_time: t.Optional[TimeLike] = None, 916 create_from: t.Optional[str] = None, 917 skip_tests: bool = False, 918 restate_models: t.Optional[t.Iterable[str]] = None, 919 no_gaps: bool = False, 920 skip_backfill: bool = False, 921 forward_only: t.Optional[bool] = None, 922 no_auto_categorization: t.Optional[bool] = None, 923 effective_from: t.Optional[TimeLike] = None, 924 include_unmodified: t.Optional[bool] = None, 925 select_models: t.Optional[t.Collection[str]] = None, 926 backfill_models: t.Optional[t.Collection[str]] = None, 927 categorizer_config: t.Optional[CategorizerConfig] = None, 928 enable_preview: t.Optional[bool] = None, 929 run: bool = False, 930 ) -> PlanBuilder: 931 """Creates a plan builder. 932 933 Args: 934 environment: The environment to diff and plan against. 935 start: The start date of the backfill if there is one. 936 end: The end date of the backfill if there is one. 937 execution_time: The date/time reference to use for execution time. Defaults to now. 938 create_from: The environment to create the target environment from if it 939 doesn't exist. If not specified, the "prod" environment will be used. 940 skip_tests: Unit tests are run by default so this will skip them if enabled 941 restate_models: A list of either internal or external models, or tags, that need to be restated 942 for the given plan interval. If the target environment is a production environment, 943 ALL snapshots that depended on these upstream tables will have their intervals deleted 944 (even ones not in this current environment). Only the snapshots in this environment will 945 be backfilled whereas others need to be recovered on a future plan application. For development 946 environments only snapshots that are part of this plan will be affected. 947 no_gaps: Whether to ensure that new snapshots for models that are already a 948 part of the target environment have no data gaps when compared against previous 949 snapshots for same models. 950 skip_backfill: Whether to skip the backfill step. Default: False. 951 forward_only: Whether the purpose of the plan is to make forward only changes. 952 no_auto_categorization: Indicates whether to disable automatic categorization of model 953 changes (breaking / non-breaking). If not provided, then the corresponding configuration 954 option determines the behavior. 955 categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the 956 project config by default. 957 effective_from: The effective date from which to apply forward-only changes on production. 958 include_unmodified: Indicates whether to include unmodified models in the target development environment. 959 select_models: A list of model selection strings to filter the models that should be included into this plan. 960 backfill_models: A list of model selection strings to filter the models for which the data should be backfilled. 961 enable_preview: Indicates whether to enable preview for forward-only models in development environments. 962 run: Whether to run latest intervals as part of the plan application. 963 964 Returns: 965 The plan builder. 966 """ 967 environment = environment or self.config.default_target_environment 968 environment = Environment.normalize_name(environment) 969 is_dev = environment != c.PROD 970 971 if skip_backfill and not no_gaps and not is_dev: 972 raise ConfigError( 973 "When targeting the production environment either the backfill should not be skipped or the lack of data gaps should be enforced (--no-gaps flag)." 974 ) 975 976 if run and is_dev: 977 raise ConfigError("The '--run' flag is only supported for the production environment.") 978 979 self._run_plan_tests(skip_tests=skip_tests) 980 981 environment_ttl = ( 982 self.environment_ttl if environment not in self.pinned_environments else None 983 ) 984 985 model_selector = self._new_selector() 986 987 if backfill_models: 988 backfill_models = model_selector.expand_model_selections(backfill_models) 989 else: 990 backfill_models = None 991 992 models_override: t.Optional[UniqueKeyDict[str, Model]] = None 993 if select_models: 994 models_override = model_selector.select_models( 995 select_models, 996 environment, 997 fallback_env_name=create_from or c.PROD, 998 ensure_finalized_snapshots=self.config.plan.use_finalized_state, 999 ) 1000 if not backfill_models: 1001 # Only backfill selected models unless explicitly specified. 1002 backfill_models = model_selector.expand_model_selections(select_models) 1003 1004 expanded_restate_models = None 1005 if restate_models is not None: 1006 expanded_restate_models = model_selector.expand_model_selections(restate_models) 1007 if not expanded_restate_models: 1008 self.console.log_error( 1009 f"Provided restated models do not match any models. No models will be included in plan. Provided: {', '.join(restate_models)}" 1010 ) 1011 1012 snapshots = self._snapshots(models_override) 1013 context_diff = self._context_diff( 1014 environment or c.PROD, 1015 snapshots=snapshots, 1016 create_from=create_from, 1017 force_no_diff=(restate_models is not None and not expanded_restate_models) 1018 or (backfill_models is not None and not backfill_models), 1019 ensure_finalized_snapshots=self.config.plan.use_finalized_state, 1020 ) 1021 1022 # If no end date is specified, use the max interval end from prod 1023 # to prevent unintended evaluation of the entire DAG. 1024 if not run: 1025 if backfill_models is not None: 1026 # Only consider selected models for the default end value. 1027 models_for_default_end = backfill_models.copy() 1028 for name in backfill_models: 1029 if name not in snapshots: 1030 continue 1031 snapshot = snapshots[name] 1032 snapshot_id = snapshot.snapshot_id 1033 if ( 1034 snapshot_id in context_diff.added 1035 and snapshot_id in context_diff.new_snapshots 1036 ): 1037 # If the selected model is a newly added model, then we should narrow down the intervals 1038 # that should be considered for the default plan end value by including its parents. 1039 models_for_default_end |= {s.name for s in snapshot.parents} 1040 default_end = self.state_sync.greatest_common_interval_end( 1041 c.PROD, 1042 models_for_default_end, 1043 ensure_finalized_snapshots=self.config.plan.use_finalized_state, 1044 ) 1045 else: 1046 default_end = self.state_sync.max_interval_end_for_environment( 1047 c.PROD, ensure_finalized_snapshots=self.config.plan.use_finalized_state 1048 ) 1049 else: 1050 default_end = None 1051 1052 default_start = to_date(default_end) - timedelta(days=1) if default_end and is_dev else None 1053 1054 return PlanBuilder( 1055 context_diff=context_diff, 1056 start=start, 1057 end=end, 1058 execution_time=execution_time, 1059 apply=self.apply, 1060 restate_models=expanded_restate_models, 1061 backfill_models=backfill_models, 1062 no_gaps=no_gaps, 1063 skip_backfill=skip_backfill, 1064 is_dev=is_dev, 1065 forward_only=( 1066 forward_only if forward_only is not None else self.config.plan.forward_only 1067 ), 1068 environment_ttl=environment_ttl, 1069 environment_suffix_target=self.config.environment_suffix_target, 1070 environment_catalog_mapping=self.config.environment_catalog_mapping, 1071 categorizer_config=categorizer_config or self.auto_categorize_changes, 1072 auto_categorization_enabled=not no_auto_categorization, 1073 effective_from=effective_from, 1074 include_unmodified=( 1075 include_unmodified 1076 if include_unmodified is not None 1077 else self.config.plan.include_unmodified 1078 ), 1079 default_start=default_start, 1080 default_end=default_end, 1081 enable_preview=( 1082 enable_preview if enable_preview is not None else self.config.plan.enable_preview 1083 ), 1084 end_bounded=not run, 1085 ensure_finalized_snapshots=self.config.plan.use_finalized_state, 1086 ) 1087 1088 def apply( 1089 self, 1090 plan: Plan, 1091 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 1092 ) -> None: 1093 """Applies a plan by pushing snapshots and backfilling data. 1094 1095 Given a plan, it pushes snapshots into the state sync and then uses the scheduler 1096 to backfill all models. 1097 1098 Args: 1099 plan: The plan to apply. 1100 circuit_breaker: An optional handler which checks if the apply should be aborted. 1101 """ 1102 if ( 1103 not plan.context_diff.has_changes 1104 and not plan.requires_backfill 1105 and not plan.has_unmodified_unpromoted 1106 ): 1107 return 1108 if plan.uncategorized: 1109 raise UncategorizedPlanError("Can't apply a plan with uncategorized changes.") 1110 self.notification_target_manager.notify( 1111 NotificationEvent.APPLY_START, 1112 environment=plan.environment_naming_info.name, 1113 plan_id=plan.plan_id, 1114 ) 1115 try: 1116 self._apply(plan, circuit_breaker) 1117 except Exception as e: 1118 self.notification_target_manager.notify( 1119 NotificationEvent.APPLY_FAILURE, 1120 environment=plan.environment_naming_info.name, 1121 plan_id=plan.plan_id, 1122 exc=traceback.format_exc(), 1123 ) 1124 logger.error(f"Apply Failure: {traceback.format_exc()}") 1125 raise e 1126 self.notification_target_manager.notify( 1127 NotificationEvent.APPLY_END, 1128 environment=plan.environment_naming_info.name, 1129 plan_id=plan.plan_id, 1130 ) 1131 1132 def invalidate_environment(self, name: str, sync: bool = False) -> None: 1133 """Invalidates the target environment by setting its expiration timestamp to now. 1134 1135 Args: 1136 name: The name of the environment to invalidate. 1137 sync: If True, the call blocks until the environment is deleted. Otherwise, the environment will 1138 be deleted asynchronously by the janitor process. 1139 """ 1140 self.state_sync.invalidate_environment(name) 1141 if sync: 1142 self._cleanup_environments() 1143 self.console.log_success(f"Environment '{name}' has been deleted.") 1144 else: 1145 self.console.log_success(f"Environment '{name}' has been invalidated.") 1146 1147 def diff(self, environment: t.Optional[str] = None, detailed: bool = False) -> bool: 1148 """Show a diff of the current context with a given environment. 1149 1150 Args: 1151 environment: The environment to diff against. 1152 detailed: Show the actual SQL differences if True. 1153 1154 Returns: 1155 True if there are changes, False otherwise. 1156 """ 1157 environment = environment or self.config.default_target_environment 1158 environment = Environment.normalize_name(environment) 1159 context_diff = self._context_diff(environment) 1160 self.console.show_model_difference_summary( 1161 context_diff, 1162 EnvironmentNamingInfo.from_environment_catalog_mapping( 1163 self.config.environment_catalog_mapping, 1164 name=environment, 1165 suffix_target=self.config.environment_suffix_target, 1166 ), 1167 self.default_catalog, 1168 no_diff=not detailed, 1169 ) 1170 return context_diff.has_changes 1171 1172 def table_diff( 1173 self, 1174 source: str, 1175 target: str, 1176 on: t.List[str] | exp.Condition | None = None, 1177 model_or_snapshot: t.Optional[ModelOrSnapshot] = None, 1178 where: t.Optional[str | exp.Condition] = None, 1179 limit: int = 20, 1180 show: bool = True, 1181 show_sample: bool = True, 1182 ) -> TableDiff: 1183 """Show a diff between two tables. 1184 1185 Args: 1186 source: The source environment or table. 1187 target: The target environment or table. 1188 on: The join condition, table aliases must be "s" and "t" for source and target. 1189 If omitted, the table's grain will be used. 1190 model_or_snapshot: The model or snapshot to use when environments are passed in. 1191 where: An optional where statement to filter results. 1192 limit: The limit of the sample dataframe. 1193 show: Show the table diff output in the console. 1194 show_sample: Show the sample dataframe in the console. Requires show=True. 1195 1196 Returns: 1197 The TableDiff object containing schema and summary differences. 1198 """ 1199 source_alias, target_alias = source, target 1200 if model_or_snapshot: 1201 model = self.get_model(model_or_snapshot, raise_if_missing=True) 1202 source_env = self.state_reader.get_environment(source) 1203 target_env = self.state_reader.get_environment(target) 1204 1205 if not source_env: 1206 raise SQLMeshError(f"Could not find environment '{source}'") 1207 if not target_env: 1208 raise SQLMeshError(f"Could not find environment '{target}')") 1209 1210 source = next( 1211 snapshot for snapshot in source_env.snapshots if snapshot.name == model.fqn 1212 ).table_name() 1213 target = next( 1214 snapshot for snapshot in target_env.snapshots if snapshot.name == model.fqn 1215 ).table_name() 1216 source_alias = source_env.name 1217 target_alias = target_env.name 1218 1219 if not on: 1220 for ref in model.all_references: 1221 if ref.unique: 1222 on = ref.columns 1223 1224 if not on: 1225 raise SQLMeshError( 1226 "SQLMesh doesn't know how to join the two tables. Specify the `grains` in each model definition or pass join column names in separate `-o` flags." 1227 ) 1228 1229 table_diff = TableDiff( 1230 adapter=self._engine_adapter, 1231 source=source, 1232 target=target, 1233 on=on, 1234 where=where, 1235 source_alias=source_alias, 1236 target_alias=target_alias, 1237 model_name=model.name if model_or_snapshot else None, 1238 limit=limit, 1239 ) 1240 if show: 1241 self.console.show_schema_diff(table_diff.schema_diff()) 1242 self.console.show_row_diff(table_diff.row_diff(), show_sample=show_sample) 1243 return table_diff 1244 1245 def get_dag( 1246 self, select_models: t.Optional[t.Collection[str]] = None, **options: t.Any 1247 ) -> GraphHTML: 1248 """Gets an HTML object representation of the DAG. 1249 1250 Args: 1251 select_models: A list of model selection strings that should be included in the dag. 1252 Returns: 1253 An html object that renders the dag. 1254 """ 1255 dag = ( 1256 self.dag.prune(*self._new_selector().expand_model_selections(select_models)) 1257 if select_models 1258 else self.dag 1259 ) 1260 1261 nodes = {} 1262 edges: t.List[t.Dict] = [] 1263 1264 for node, deps in dag.graph.items(): 1265 nodes[node] = { 1266 "id": node, 1267 "label": node.split(".")[-1], 1268 "title": f"<span>{node}</span>", 1269 } 1270 edges.extend({"from": d, "to": node} for d in deps) 1271 1272 return GraphHTML( 1273 nodes, 1274 edges, 1275 options={ 1276 "height": "100%", 1277 "width": "100%", 1278 "interaction": {}, 1279 "layout": { 1280 "hierarchical": { 1281 "enabled": True, 1282 "nodeSpacing": 200, 1283 "sortMethod": "directed", 1284 }, 1285 }, 1286 "nodes": { 1287 "shape": "box", 1288 }, 1289 **options, 1290 }, 1291 ) 1292 1293 def render_dag(self, path: str, select_models: t.Optional[t.Collection[str]] = None) -> None: 1294 """Render the dag as HTML and save it to a file. 1295 1296 Args: 1297 path: filename to save the dag html to 1298 select_models: A list of model selection strings that should be included in the dag. 1299 """ 1300 file_path = Path(path) 1301 suffix = file_path.suffix 1302 if suffix != ".html": 1303 if suffix: 1304 logger.warning( 1305 f"The extension {suffix} does not designate an html file. A file with a `.html` extension will be created instead." 1306 ) 1307 path = str(file_path.with_suffix(".html")) 1308 1309 with open(path, "w", encoding="utf-8") as file: 1310 file.write(str(self.get_dag(select_models))) 1311 1312 def create_test( 1313 self, 1314 model: str, 1315 input_queries: t.Dict[str, str], 1316 overwrite: bool = False, 1317 variables: t.Optional[t.Dict[str, str]] = None, 1318 path: t.Optional[str] = None, 1319 name: t.Optional[str] = None, 1320 include_ctes: bool = False, 1321 ) -> None: 1322 """Generate a unit test fixture for a given model. 1323 1324 Args: 1325 model: The model to test. 1326 input_queries: Mapping of model names to queries. Each model included in this mapping 1327 will be populated in the test based on the results of the corresponding query. 1328 overwrite: Whether to overwrite the existing test in case of a file path collision. 1329 When set to False, an error will be raised if there is such a collision. 1330 variables: Key-value pairs that will define variables needed by the model. 1331 path: The file path corresponding to the fixture, relative to the test directory. 1332 By default, the fixture will be created under the test directory and the file name 1333 will be inferred from the test's name. 1334 name: The name of the test. This is inferred from the model name by default. 1335 include_ctes: When true, CTE fixtures will also be generated. 1336 """ 1337 input_queries = { 1338 # The get_model here has two purposes: return normalized names & check for missing deps 1339 self.get_model(dep, raise_if_missing=True).fqn: query 1340 for dep, query in input_queries.items() 1341 } 1342 1343 try: 1344 test_adapter = self._test_connection_config.create_engine_adapter( 1345 register_comments_override=False 1346 ) 1347 generate_test( 1348 model=self.get_model(model, raise_if_missing=True), 1349 input_queries=input_queries, 1350 models=self._models, 1351 engine_adapter=self._engine_adapter, 1352 test_engine_adapter=test_adapter, 1353 project_path=self.path, 1354 overwrite=overwrite, 1355 variables=variables, 1356 path=path, 1357 name=name, 1358 include_ctes=include_ctes, 1359 ) 1360 finally: 1361 test_adapter.close() 1362 1363 def test( 1364 self, 1365 match_patterns: t.Optional[t.List[str]] = None, 1366 tests: t.Optional[t.List[str]] = None, 1367 verbose: bool = False, 1368 preserve_fixtures: bool = False, 1369 stream: t.Optional[t.TextIO] = None, 1370 ) -> ModelTextTestResult: 1371 """Discover and run model tests""" 1372 if verbose: 1373 pd.set_option("display.max_columns", None) 1374 verbosity = 2 1375 else: 1376 verbosity = 1 1377 1378 if tests: 1379 result = run_model_tests( 1380 tests=tests, 1381 models=self._models, 1382 config=self.config, 1383 gateway=self.gateway, 1384 dialect=self.default_dialect, 1385 verbosity=verbosity, 1386 patterns=match_patterns, 1387 preserve_fixtures=preserve_fixtures, 1388 stream=stream, 1389 default_catalog=self.default_catalog, 1390 default_catalog_dialect=self.engine_adapter.DIALECT, 1391 ) 1392 else: 1393 test_meta = [] 1394 1395 for path, config in self.configs.items(): 1396 test_meta.extend( 1397 get_all_model_tests( 1398 path / c.TESTS, 1399 patterns=match_patterns, 1400 ignore_patterns=config.ignore_patterns, 1401 ) 1402 ) 1403 1404 result = run_tests( 1405 model_test_metadata=test_meta, 1406 models=self._models, 1407 config=self.config, 1408 gateway=self.gateway, 1409 dialect=self.default_dialect, 1410 verbosity=verbosity, 1411 preserve_fixtures=preserve_fixtures, 1412 stream=stream, 1413 default_catalog=self.default_catalog, 1414 default_catalog_dialect=self.engine_adapter.DIALECT, 1415 ) 1416 1417 return result 1418 1419 def audit( 1420 self, 1421 start: TimeLike, 1422 end: TimeLike, 1423 *, 1424 models: t.Optional[t.Iterator[str]] = None, 1425 execution_time: t.Optional[TimeLike] = None, 1426 ) -> None: 1427 """Audit models. 1428 1429 Args: 1430 start: The start of the interval to audit. 1431 end: The end of the interval to audit. 1432 models: The models to audit. All models will be audited if not specified. 1433 execution_time: The date/time time reference to use for execution time. Defaults to now. 1434 """ 1435 1436 snapshots = ( 1437 [self.get_snapshot(model, raise_if_missing=True) for model in models] 1438 if models 1439 else self.snapshots.values() 1440 ) 1441 1442 num_audits = sum(len(snapshot.audits_with_args) for snapshot in snapshots) 1443 self.console.log_status_update(f"Found {num_audits} audit(s).") 1444 errors = [] 1445 skipped_count = 0 1446 for snapshot in snapshots: 1447 for audit_result in self.snapshot_evaluator.audit( 1448 snapshot=snapshot, 1449 start=start, 1450 end=end, 1451 snapshots=self.snapshots, 1452 raise_exception=False, 1453 ): 1454 audit_id = f"{audit_result.audit.name}" 1455 if audit_result.model: 1456 audit_id += f" on model {audit_result.model.name}" 1457 1458 if audit_result.skipped: 1459 self.console.log_status_update(f"{audit_id} ⏸️ SKIPPED.") 1460 skipped_count += 1 1461 elif audit_result.count: 1462 errors.append(audit_result) 1463 self.console.log_status_update( 1464 f"{audit_id} ❌ [red]FAIL [{audit_result.count}][/red]." 1465 ) 1466 else: 1467 self.console.log_status_update(f"{audit_id} ✅ [green]PASS[/green].") 1468 1469 self.console.log_status_update( 1470 f"\nFinished with {len(errors)} audit error{'' if len(errors) == 1 else 's'} " 1471 f"and {skipped_count} audit{'' if skipped_count == 1 else 's'} skipped." 1472 ) 1473 for error in errors: 1474 self.console.log_status_update( 1475 f"\nFailure in audit {error.audit.name} ({error.audit._path})." 1476 ) 1477 self.console.log_status_update(f"Got {error.count} results, expected 0.") 1478 if error.query: 1479 self.console.show_sql( 1480 f"{error.query.sql(dialect=self.snapshot_evaluator.adapter.dialect)}" 1481 ) 1482 1483 self.console.log_status_update("Done.") 1484 1485 def rewrite(self, sql: str, dialect: str = "") -> exp.Expression: 1486 """Rewrite a sql expression with semantic references into an executable query. 1487 1488 https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/ 1489 1490 Args: 1491 sql: The sql string to rewrite. 1492 dialect: The dialect of the sql string, defaults to the project dialect. 1493 1494 Returns: 1495 A SQLGlot expression with semantic references expanded. 1496 """ 1497 return rewrite( 1498 sql, 1499 graph=ReferenceGraph(self.models.values()), 1500 metrics=self._metrics, 1501 dialect=dialect or self.default_dialect, 1502 ) 1503 1504 def migrate(self) -> None: 1505 """Migrates SQLMesh to the current running version. 1506 1507 Please contact your SQLMesh administrator before doing this. 1508 """ 1509 self.notification_target_manager.notify(NotificationEvent.MIGRATION_START) 1510 try: 1511 self._new_state_sync().migrate( 1512 default_catalog=self.default_catalog, 1513 promoted_snapshots_only=self.config.migration.promoted_snapshots_only, 1514 ) 1515 except Exception as e: 1516 self.notification_target_manager.notify( 1517 NotificationEvent.MIGRATION_FAILURE, traceback.format_exc() 1518 ) 1519 raise e 1520 self.notification_target_manager.notify(NotificationEvent.MIGRATION_END) 1521 1522 def rollback(self) -> None: 1523 """Rolls back SQLMesh to the previous migration. 1524 1525 Please contact your SQLMesh administrator before doing this. This action cannot be undone. 1526 """ 1527 self._new_state_sync().rollback() 1528 1529 def create_external_models(self) -> None: 1530 """Create a schema file with all external models. 1531 1532 The schema file contains all columns and types of external models, allowing for more robust 1533 lineage, validation, and optimizations. 1534 """ 1535 if not self._models: 1536 self.load(update_schemas=False) 1537 1538 for path, config in self.configs.items(): 1539 create_schema_file( 1540 path=path / c.SCHEMA_YAML, 1541 models=UniqueKeyDict( 1542 "models", 1543 { 1544 fqn: model 1545 for fqn, model in self._models.items() 1546 if self.config_for_node(model) is config 1547 }, 1548 ), 1549 adapter=self._engine_adapter, 1550 state_reader=self.state_reader, 1551 dialect=config.model_defaults.dialect, 1552 max_workers=self.concurrent_tasks, 1553 ) 1554 1555 def print_info(self) -> None: 1556 """Prints information about connections, models, macros, etc. to the console.""" 1557 self.console.log_status_update(f"Models: {len(self.models)}") 1558 self.console.log_status_update(f"Macros: {len(self._macros) - len(macro.get_registry())}") 1559 1560 self._try_connection("data warehouse", self._engine_adapter) 1561 1562 state_connection = self.config.get_state_connection(self.gateway) 1563 if state_connection: 1564 self._try_connection("state backend", state_connection.create_engine_adapter()) 1565 1566 def close(self) -> None: 1567 """Releases all resources allocated by this context.""" 1568 self.snapshot_evaluator.close() 1569 self.state_sync.close() 1570 1571 def _run( 1572 self, 1573 environment: str, 1574 *, 1575 start: t.Optional[TimeLike], 1576 end: t.Optional[TimeLike], 1577 execution_time: t.Optional[TimeLike], 1578 skip_janitor: bool, 1579 ignore_cron: bool, 1580 ) -> bool: 1581 if not skip_janitor and environment.lower() == c.PROD: 1582 self._run_janitor() 1583 1584 env_check_attempts_num = max( 1585 1, 1586 self.config.run.environment_check_max_wait 1587 // self.config.run.environment_check_interval, 1588 ) 1589 1590 def _block_until_finalized() -> str: 1591 for _ in range(env_check_attempts_num): 1592 assert environment is not None # mypy 1593 environment_state = self.state_sync.get_environment(environment) 1594 if not environment_state: 1595 raise SQLMeshError(f"Environment '{environment}' was not found.") 1596 if environment_state.finalized_ts: 1597 return environment_state.plan_id 1598 logger.warning( 1599 "Environment '%s' is being updated by plan '%s'. Retrying in %s seconds...", 1600 environment, 1601 environment_state.plan_id, 1602 self.config.run.environment_check_interval, 1603 ) 1604 time.sleep(self.config.run.environment_check_interval) 1605 raise SQLMeshError( 1606 f"Exceeded the maximum wait time for environment '{environment}' to be ready. " 1607 "This means that the environment either failed to update or the update is taking longer than expected. " 1608 "See https://sqlmesh.readthedocs.io/en/stable/reference/configuration/#run to adjust the timeout settings." 1609 ) 1610 1611 done = False 1612 while not done: 1613 plan_id_at_start = _block_until_finalized() 1614 1615 def _has_environment_changed() -> bool: 1616 assert environment is not None # mypy 1617 current_environment_state = self.state_sync.get_environment(environment) 1618 return ( 1619 not current_environment_state 1620 or current_environment_state.plan_id != plan_id_at_start 1621 or not current_environment_state.finalized_ts 1622 ) 1623 1624 try: 1625 success = self.scheduler(environment=environment).run( 1626 environment, 1627 start=start, 1628 end=end, 1629 execution_time=execution_time, 1630 ignore_cron=ignore_cron, 1631 circuit_breaker=_has_environment_changed, 1632 ) 1633 done = True 1634 except CircuitBreakerError: 1635 logger.warning( 1636 "Environment '%s' has been modified while running. Restarting the run...", 1637 environment, 1638 ) 1639 1640 return success 1641 1642 def _apply(self, plan: Plan, circuit_breaker: t.Optional[t.Callable[[], bool]]) -> None: 1643 self._scheduler.create_plan_evaluator(self).evaluate(plan, circuit_breaker=circuit_breaker) 1644 1645 def table_name(self, model_name: str, dev: bool) -> str: 1646 """Returns the name of the pysical table for the given model name. 1647 1648 Args: 1649 model_name: The name of the model. 1650 dev: Whether to use the deployability index for the table name. 1651 1652 Returns: 1653 The name of the physical table. 1654 """ 1655 deployability_index = ( 1656 DeployabilityIndex.create(self.snapshots.values()) 1657 if dev 1658 else DeployabilityIndex.all_deployable() 1659 ) 1660 snapshot = self.get_snapshot(model_name) 1661 if not snapshot: 1662 raise SQLMeshError(f"Model '{model_name}' was not found.") 1663 return snapshot.table_name(is_deployable=deployability_index.is_deployable(snapshot)) 1664 1665 def clear_caches(self) -> None: 1666 for path in self.configs: 1667 rmtree(path / c.CACHE) 1668 1669 def _run_tests(self, verbose: bool = False) -> t.Tuple[unittest.result.TestResult, str]: 1670 test_output_io = StringIO() 1671 result = self.test(stream=test_output_io, verbose=verbose) 1672 return result, test_output_io.getvalue() 1673 1674 def _run_plan_tests( 1675 self, skip_tests: bool = False 1676 ) -> t.Tuple[t.Optional[unittest.result.TestResult], t.Optional[str]]: 1677 if not skip_tests: 1678 result, test_output = self._run_tests() 1679 if result.testsRun > 0: 1680 self.console.log_test_results( 1681 result, test_output, self._test_connection_config._engine_adapter.DIALECT 1682 ) 1683 if not result.wasSuccessful(): 1684 raise PlanError( 1685 "Cannot generate plan due to failing test(s). Fix test(s) and run again" 1686 ) 1687 return result, test_output 1688 return None, None 1689 1690 @property 1691 def _model_tables(self) -> t.Dict[str, str]: 1692 """Mapping of model name to physical table name. 1693 1694 If a snapshot has not been versioned yet, its view name will be returned. 1695 """ 1696 return { 1697 fqn: ( 1698 snapshot.table_name() 1699 if snapshot.version 1700 else snapshot.qualified_view_name.for_environment( 1701 EnvironmentNamingInfo.from_environment_catalog_mapping( 1702 self.config.environment_catalog_mapping, 1703 name=c.PROD, 1704 suffix_target=self.config.environment_suffix_target, 1705 ) 1706 ) 1707 ) 1708 for fqn, snapshot in self.snapshots.items() 1709 } 1710 1711 def _snapshots( 1712 self, models_override: t.Optional[UniqueKeyDict[str, Model]] = None 1713 ) -> t.Dict[str, Snapshot]: 1714 prod = self.state_reader.get_environment(c.PROD) 1715 remote_snapshots = ( 1716 { 1717 snapshot.name: snapshot 1718 for snapshot in self.state_reader.get_snapshots(prod.snapshots).values() 1719 } 1720 if prod 1721 else {} 1722 ) 1723 1724 local_nodes = {**(models_override or self._models), **self._standalone_audits} 1725 nodes = local_nodes.copy() 1726 audits = self._audits.copy() 1727 projects = {config.project for config in self.configs.values()} 1728 1729 for name, snapshot in remote_snapshots.items(): 1730 if name not in nodes and snapshot.node.project not in projects: 1731 nodes[name] = snapshot.node 1732 if snapshot.is_model: 1733 for audit in snapshot.audits: 1734 if name not in audits: 1735 audits[name] = audit 1736 1737 def _nodes_to_snapshots(nodes: t.Dict[str, Node]) -> t.Dict[str, Snapshot]: 1738 snapshots: t.Dict[str, Snapshot] = {} 1739 fingerprint_cache: t.Dict[str, SnapshotFingerprint] = {} 1740 1741 for node in nodes.values(): 1742 if node.fqn not in local_nodes and node.fqn in remote_snapshots: 1743 ttl = remote_snapshots[node.fqn].ttl 1744 else: 1745 config = self.config_for_node(node) 1746 ttl = config.snapshot_ttl 1747 1748 snapshot = Snapshot.from_node( 1749 node, 1750 nodes=nodes, 1751 audits=audits, 1752 cache=fingerprint_cache, 1753 ttl=ttl, 1754 ) 1755 snapshots[snapshot.name] = snapshot 1756 return snapshots 1757 1758 snapshots = _nodes_to_snapshots(nodes) 1759 stored_snapshots = self.state_reader.get_snapshots(snapshots.values()) 1760 1761 unrestorable_snapshots = { 1762 snapshot 1763 for snapshot in stored_snapshots.values() 1764 if snapshot.name in local_nodes and snapshot.unrestorable 1765 } 1766 if unrestorable_snapshots: 1767 for snapshot in unrestorable_snapshots: 1768 logger.info( 1769 "Found a unrestorable snapshot %s. Restamping the model...", snapshot.name 1770 ) 1771 node = local_nodes[snapshot.name] 1772 nodes[snapshot.name] = node.copy( 1773 update={"stamp": f"revert to {snapshot.identifier}"} 1774 ) 1775 snapshots = _nodes_to_snapshots(nodes) 1776 stored_snapshots = self.state_reader.get_snapshots(snapshots.values()) 1777 1778 for snapshot in stored_snapshots.values(): 1779 # Keep the original model instance to preserve the query cache. 1780 snapshot.node = snapshots[snapshot.name].node 1781 1782 return {name: stored_snapshots.get(s.snapshot_id, s) for name, s in snapshots.items()} 1783 1784 def _context_diff( 1785 self, 1786 environment: str, 1787 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 1788 create_from: t.Optional[str] = None, 1789 force_no_diff: bool = False, 1790 ensure_finalized_snapshots: bool = False, 1791 ) -> ContextDiff: 1792 environment = Environment.normalize_name(environment) 1793 if force_no_diff: 1794 return ContextDiff.create_no_diff(environment) 1795 return ContextDiff.create( 1796 environment, 1797 snapshots=snapshots or self.snapshots, 1798 create_from=create_from or c.PROD, 1799 state_reader=self.state_reader, 1800 ensure_finalized_snapshots=ensure_finalized_snapshots, 1801 ) 1802 1803 def _run_janitor(self) -> None: 1804 self._cleanup_environments() 1805 expired_snapshots = self.state_sync.delete_expired_snapshots() 1806 self.snapshot_evaluator.cleanup( 1807 expired_snapshots, on_complete=self.console.update_cleanup_progress 1808 ) 1809 1810 self.state_sync.compact_intervals() 1811 1812 def _cleanup_environments(self) -> None: 1813 expired_environments = self.state_sync.delete_expired_environments() 1814 cleanup_expired_views(self.engine_adapter, expired_environments, console=self.console) 1815 1816 def _try_connection(self, connection_name: str, engine_adapter: EngineAdapter) -> None: 1817 connection_name = connection_name.capitalize() 1818 try: 1819 engine_adapter.fetchall("SELECT 1") 1820 self.console.log_status_update(f"{connection_name} connection [green]succeeded[/green]") 1821 except Exception as ex: 1822 self.console.log_error(f"{connection_name} connection failed. {ex}") 1823 1824 def _new_state_sync(self) -> StateSync: 1825 return self._provided_state_sync or self._scheduler.create_state_sync(self) 1826 1827 def _new_selector(self) -> Selector: 1828 return Selector( 1829 self.state_reader, 1830 self._models, 1831 context_path=self.path, 1832 default_catalog=self.default_catalog, 1833 dialect=self.default_dialect, 1834 ) 1835 1836 def _register_notification_targets(self) -> None: 1837 event_notifications = collections.defaultdict(set) 1838 for target in self.notification_targets: 1839 if target.is_configured: 1840 for event in target.notify_on: 1841 event_notifications[event].add(target) 1842 user_notification_targets = { 1843 user.username: set( 1844 target for target in user.notification_targets if target.is_configured 1845 ) 1846 for user in self.users 1847 } 1848 self.notification_target_manager = NotificationTargetManager( 1849 event_notifications, user_notification_targets, username=self.config.username 1850 ) 1851 1852 1853class Context(GenericContext[Config]): 1854 CONFIG_TYPE = Config
131class BaseContext(abc.ABC): 132 """The base context which defines methods to execute a model.""" 133 134 @property 135 @abc.abstractmethod 136 def default_dialect(self) -> t.Optional[str]: 137 """Returns the default dialect.""" 138 139 @property 140 @abc.abstractmethod 141 def _model_tables(self) -> t.Dict[str, str]: 142 """Returns a mapping of model names to tables.""" 143 144 @property 145 @abc.abstractmethod 146 def engine_adapter(self) -> EngineAdapter: 147 """Returns an engine adapter.""" 148 149 @property 150 def spark(self) -> t.Optional[PySparkSession]: 151 """Returns the spark session if it exists.""" 152 return self.engine_adapter.spark 153 154 @property 155 def default_catalog(self) -> t.Optional[str]: 156 raise NotImplementedError 157 158 def table(self, model_name: str) -> str: 159 """Gets the physical table name for a given model. 160 161 Args: 162 model_name: The model name. 163 164 Returns: 165 The physical table name. 166 """ 167 model_name = normalize_model_name(model_name, self.default_catalog, self.default_dialect) 168 169 # We generate SQL for the default dialect because the table name may be used in a 170 # fetchdf call and so the quotes need to be correct (eg. backticks for bigquery) 171 return parse_one(self._model_tables[model_name]).sql(dialect=self.default_dialect) 172 173 def fetchdf( 174 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 175 ) -> pd.DataFrame: 176 """Fetches a dataframe given a sql string or sqlglot expression. 177 178 Args: 179 query: SQL string or sqlglot expression. 180 quote_identifiers: Whether to quote all identifiers in the query. 181 182 Returns: 183 The default dataframe is Pandas, but for Spark a PySpark dataframe is returned. 184 """ 185 return self.engine_adapter.fetchdf(query, quote_identifiers=quote_identifiers) 186 187 def fetch_pyspark_df( 188 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 189 ) -> PySparkDataFrame: 190 """Fetches a PySpark dataframe given a sql string or sqlglot expression. 191 192 Args: 193 query: SQL string or sqlglot expression. 194 quote_identifiers: Whether to quote all identifiers in the query. 195 196 Returns: 197 A PySpark dataframe. 198 """ 199 return self.engine_adapter.fetch_pyspark_df(query, quote_identifiers=quote_identifiers)
The base context which defines methods to execute a model.
158 def table(self, model_name: str) -> str: 159 """Gets the physical table name for a given model. 160 161 Args: 162 model_name: The model name. 163 164 Returns: 165 The physical table name. 166 """ 167 model_name = normalize_model_name(model_name, self.default_catalog, self.default_dialect) 168 169 # We generate SQL for the default dialect because the table name may be used in a 170 # fetchdf call and so the quotes need to be correct (eg. backticks for bigquery) 171 return parse_one(self._model_tables[model_name]).sql(dialect=self.default_dialect)
Gets the physical table name for a given model.
Arguments:
- model_name: The model name.
Returns:
The physical table name.
173 def fetchdf( 174 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 175 ) -> pd.DataFrame: 176 """Fetches a dataframe given a sql string or sqlglot expression. 177 178 Args: 179 query: SQL string or sqlglot expression. 180 quote_identifiers: Whether to quote all identifiers in the query. 181 182 Returns: 183 The default dataframe is Pandas, but for Spark a PySpark dataframe is returned. 184 """ 185 return self.engine_adapter.fetchdf(query, quote_identifiers=quote_identifiers)
Fetches a dataframe given a sql string or sqlglot expression.
Arguments:
- query: SQL string or sqlglot expression.
- quote_identifiers: Whether to quote all identifiers in the query.
Returns:
The default dataframe is Pandas, but for Spark a PySpark dataframe is returned.
187 def fetch_pyspark_df( 188 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 189 ) -> PySparkDataFrame: 190 """Fetches a PySpark dataframe given a sql string or sqlglot expression. 191 192 Args: 193 query: SQL string or sqlglot expression. 194 quote_identifiers: Whether to quote all identifiers in the query. 195 196 Returns: 197 A PySpark dataframe. 198 """ 199 return self.engine_adapter.fetch_pyspark_df(query, quote_identifiers=quote_identifiers)
Fetches a PySpark dataframe given a sql string or sqlglot expression.
Arguments:
- query: SQL string or sqlglot expression.
- quote_identifiers: Whether to quote all identifiers in the query.
Returns:
A PySpark dataframe.
202class ExecutionContext(BaseContext): 203 """The minimal context needed to execute a model. 204 205 Args: 206 engine_adapter: The engine adapter to execute queries against. 207 snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations. 208 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 209 """ 210 211 def __init__( 212 self, 213 engine_adapter: EngineAdapter, 214 snapshots: t.Dict[str, Snapshot], 215 deployability_index: t.Optional[DeployabilityIndex] = None, 216 default_dialect: t.Optional[str] = None, 217 default_catalog: t.Optional[str] = None, 218 variables: t.Optional[t.Dict[str, t.Any]] = None, 219 ): 220 self.snapshots = snapshots 221 self.deployability_index = deployability_index 222 self._engine_adapter = engine_adapter 223 self._default_catalog = default_catalog 224 self._default_dialect = default_dialect 225 self._variables = variables or {} 226 227 @property 228 def default_dialect(self) -> t.Optional[str]: 229 return self._default_dialect 230 231 @property 232 def engine_adapter(self) -> EngineAdapter: 233 """Returns an engine adapter.""" 234 return self._engine_adapter 235 236 @cached_property 237 def _model_tables(self) -> t.Dict[str, str]: 238 """Returns a mapping of model names to tables.""" 239 return to_table_mapping(self.snapshots.values(), self.deployability_index) 240 241 @property 242 def default_catalog(self) -> t.Optional[str]: 243 return self._default_catalog 244 245 @property 246 def gateway(self) -> t.Optional[str]: 247 """Returns the gateway name.""" 248 return self.var(c.GATEWAY) 249 250 def var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]: 251 """Returns a variable value.""" 252 return self._variables.get(var_name.lower(), default) 253 254 def with_variables(self, variables: t.Dict[str, t.Any]) -> ExecutionContext: 255 """Returns a new ExecutionContext with additional variables.""" 256 return ExecutionContext( 257 self._engine_adapter, 258 self.snapshots, 259 self.deployability_index, 260 self._default_dialect, 261 self._default_catalog, 262 variables=variables, 263 )
The minimal context needed to execute a model.
Arguments:
- engine_adapter: The engine adapter to execute queries against.
- snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations.
- deployability_index: Determines snapshots that are deployable in the context of this evaluation.
211 def __init__( 212 self, 213 engine_adapter: EngineAdapter, 214 snapshots: t.Dict[str, Snapshot], 215 deployability_index: t.Optional[DeployabilityIndex] = None, 216 default_dialect: t.Optional[str] = None, 217 default_catalog: t.Optional[str] = None, 218 variables: t.Optional[t.Dict[str, t.Any]] = None, 219 ): 220 self.snapshots = snapshots 221 self.deployability_index = deployability_index 222 self._engine_adapter = engine_adapter 223 self._default_catalog = default_catalog 224 self._default_dialect = default_dialect 225 self._variables = variables or {}
250 def var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]: 251 """Returns a variable value.""" 252 return self._variables.get(var_name.lower(), default)
Returns a variable value.
254 def with_variables(self, variables: t.Dict[str, t.Any]) -> ExecutionContext: 255 """Returns a new ExecutionContext with additional variables.""" 256 return ExecutionContext( 257 self._engine_adapter, 258 self.snapshots, 259 self.deployability_index, 260 self._default_dialect, 261 self._default_catalog, 262 variables=variables, 263 )
Returns a new ExecutionContext with additional variables.
Inherited Members
266class GenericContext(BaseContext, t.Generic[C]): 267 """Encapsulates a SQLMesh environment supplying convenient functions to perform various tasks. 268 269 Args: 270 engine_adapter: The default engine adapter to use. 271 notification_targets: The notification target to use. Defaults to what is defined in config. 272 paths: The directories containing SQLMesh files. 273 config: A Config object or the name of a Config object in config.py. 274 connection: The name of the connection. If not specified the first connection as it appears 275 in configuration will be used. 276 test_connection: The name of the connection to use for tests. If not specified the first 277 connection as it appears in configuration will be used. 278 concurrent_tasks: The maximum number of tasks that can use the connection concurrently. 279 load: Whether or not to automatically load all models and macros (default True). 280 console: The rich instance used for printing out CLI command results. 281 users: A list of users to make known to SQLMesh. 282 config_type: The type of config object to use (default Config). 283 """ 284 285 CONFIG_TYPE: t.Type[C] 286 287 def __init__( 288 self, 289 engine_adapter: t.Optional[EngineAdapter] = None, 290 notification_targets: t.Optional[t.List[NotificationTarget]] = None, 291 state_sync: t.Optional[StateSync] = None, 292 paths: t.Union[str | Path, t.Iterable[str | Path]] = "", 293 config: t.Optional[t.Union[C, str, t.Dict[Path, C]]] = None, 294 gateway: t.Optional[str] = None, 295 concurrent_tasks: t.Optional[int] = None, 296 loader: t.Optional[t.Type[Loader]] = None, 297 load: bool = True, 298 console: t.Optional[Console] = None, 299 users: t.Optional[t.List[User]] = None, 300 ): 301 self.console = console or get_console() 302 self.configs = ( 303 config if isinstance(config, dict) else load_configs(config, self.CONFIG_TYPE, paths) 304 ) 305 self.dag: DAG[str] = DAG() 306 self._models: UniqueKeyDict[str, Model] = UniqueKeyDict("models") 307 self._audits: UniqueKeyDict[str, Audit] = UniqueKeyDict("audits") 308 self._standalone_audits: UniqueKeyDict[str, StandaloneAudit] = UniqueKeyDict( 309 "standaloneaudits" 310 ) 311 self._macros: UniqueKeyDict[str, ExecutableOrMacro] = UniqueKeyDict("macros") 312 self._metrics: UniqueKeyDict[str, Metric] = UniqueKeyDict("metrics") 313 self._jinja_macros = JinjaMacroRegistry() 314 self._default_catalog: t.Optional[str] = None 315 316 self.path, self.config = t.cast(t.Tuple[Path, C], next(iter(self.configs.items()))) 317 318 self.gateway = gateway 319 self._scheduler = self.config.get_scheduler(self.gateway) 320 self.environment_ttl = self.config.environment_ttl 321 self.pinned_environments = Environment.normalize_names(self.config.pinned_environments) 322 self.auto_categorize_changes = self.config.plan.auto_categorize_changes 323 324 self._connection_config = self.config.get_connection(self.gateway) 325 self.concurrent_tasks = concurrent_tasks or self._connection_config.concurrent_tasks 326 self._engine_adapter = engine_adapter or self._connection_config.create_engine_adapter() 327 328 self._test_connection_config = self.config.get_test_connection( 329 self.gateway, self.default_catalog, default_catalog_dialect=self.engine_adapter.DIALECT 330 ) 331 332 self._snapshot_evaluator: t.Optional[SnapshotEvaluator] = None 333 334 self._provided_state_sync: t.Optional[StateSync] = state_sync 335 self._state_sync: t.Optional[StateSync] = None 336 337 self._loader = (loader or self.config.loader)(**self.config.loader_kwargs) 338 339 # Should we dedupe notification_targets? If so how? 340 self.notification_targets = (notification_targets or []) + self.config.notification_targets 341 self.users = (users or []) + self.config.users 342 self.users = list({user.username: user for user in self.users}.values()) 343 self._register_notification_targets() 344 345 if ( 346 self.config.environment_catalog_mapping 347 and not self.engine_adapter.CATALOG_SUPPORT.is_multi_catalog_supported 348 ): 349 raise SQLMeshError( 350 "Environment catalog mapping is only supported for engine adapters that support multiple catalogs" 351 ) 352 353 if load: 354 self.load() 355 356 @property 357 def default_dialect(self) -> t.Optional[str]: 358 return self.config.dialect 359 360 @property 361 def engine_adapter(self) -> EngineAdapter: 362 """Returns an engine adapter.""" 363 return self._engine_adapter 364 365 @property 366 def snapshot_evaluator(self) -> SnapshotEvaluator: 367 if not self._snapshot_evaluator: 368 self._snapshot_evaluator = SnapshotEvaluator( 369 self.engine_adapter.with_log_level(logging.INFO), 370 ddl_concurrent_tasks=self.concurrent_tasks, 371 ) 372 return self._snapshot_evaluator 373 374 def execution_context( 375 self, deployability_index: t.Optional[DeployabilityIndex] = None 376 ) -> ExecutionContext: 377 """Returns an execution context.""" 378 return ExecutionContext( 379 engine_adapter=self._engine_adapter, 380 snapshots=self.snapshots, 381 deployability_index=deployability_index, 382 default_dialect=self.default_dialect, 383 default_catalog=self.default_catalog, 384 ) 385 386 def upsert_model(self, model: t.Union[str, Model], **kwargs: t.Any) -> Model: 387 """Update or insert a model. 388 389 The context's models dictionary will be updated to include these changes. 390 391 Args: 392 model: Model name or instance to update. 393 kwargs: The kwargs to update the model with. 394 395 Returns: 396 A new instance of the updated or inserted model. 397 """ 398 model = self.get_model(model, raise_if_missing=True) 399 path = model._path 400 401 # model.copy() can't be used here due to a cached state that can be a part of a model instance. 402 model = t.cast(Model, type(model)(**{**t.cast(Model, model).dict(), **kwargs})) 403 model._path = path 404 405 self._models.update({model.fqn: model}) 406 self.dag.add(model.fqn, model.depends_on) 407 update_model_schemas( 408 self.dag, 409 self._models, 410 self.path, 411 ) 412 413 model.validate_definition() 414 415 return model 416 417 def scheduler(self, environment: t.Optional[str] = None) -> Scheduler: 418 """Returns the built-in scheduler. 419 420 Args: 421 environment: The target environment to source model snapshots from, or None 422 if snapshots should be sourced from the currently loaded local state. 423 424 Returns: 425 The built-in scheduler instance. 426 """ 427 snapshots: t.Iterable[Snapshot] 428 if environment is not None: 429 stored_environment = self.state_sync.get_environment(environment) 430 if stored_environment is None: 431 raise ConfigError(f"Environment '{environment}' was not found.") 432 snapshots = self.state_sync.get_snapshots(stored_environment.snapshots).values() 433 else: 434 snapshots = self.snapshots.values() 435 436 if not snapshots: 437 raise ConfigError("No models were found") 438 439 return Scheduler( 440 snapshots, 441 self.snapshot_evaluator, 442 self.state_sync, 443 default_catalog=self.default_catalog, 444 max_workers=self.concurrent_tasks, 445 console=self.console, 446 notification_target_manager=self.notification_target_manager, 447 ) 448 449 @property 450 def state_sync(self) -> StateSync: 451 if not self._state_sync: 452 self._state_sync = self._new_state_sync() 453 454 if self._state_sync.get_versions(validate=False).schema_version == 0: 455 self._state_sync.migrate(default_catalog=self.default_catalog) 456 self._state_sync.get_versions() 457 self._state_sync = CachingStateSync(self._state_sync) # type: ignore 458 return self._state_sync 459 460 @property 461 def state_reader(self) -> StateReader: 462 return self.state_sync 463 464 def refresh(self) -> None: 465 """Refresh all models that have been updated.""" 466 if self._loader.reload_needed(): 467 self.load() 468 469 def load(self, update_schemas: bool = True) -> GenericContext[C]: 470 """Load all files in the context's path.""" 471 with sys_path(*self.configs): 472 gc.disable() 473 project = self._loader.load(self, update_schemas) 474 self._macros = project.macros 475 self._jinja_macros = project.jinja_macros 476 self._models = project.models 477 self._metrics = project.metrics 478 self._standalone_audits.clear() 479 self._audits.clear() 480 for name, audit in project.audits.items(): 481 if isinstance(audit, StandaloneAudit): 482 self._standalone_audits[name] = audit 483 else: 484 self._audits[name] = audit 485 self.dag = project.dag 486 gc.enable() 487 488 duplicates = set(self._models) & set(self._standalone_audits) 489 if duplicates: 490 raise ConfigError( 491 f"Models and Standalone audits cannot have the same name: {duplicates}" 492 ) 493 494 return self 495 496 def run( 497 self, 498 environment: t.Optional[str] = None, 499 *, 500 start: t.Optional[TimeLike] = None, 501 end: t.Optional[TimeLike] = None, 502 execution_time: t.Optional[TimeLike] = None, 503 skip_janitor: bool = False, 504 ignore_cron: bool = False, 505 ) -> bool: 506 """Run the entire dag through the scheduler. 507 508 Args: 509 environment: The target environment to source model snapshots from and virtually update. Default: prod. 510 start: The start of the interval to render. 511 end: The end of the interval to render. 512 execution_time: The date/time time reference to use for execution time. Defaults to now. 513 skip_janitor: Whether to skip the janitor task. 514 ignore_cron: Whether to ignore the model's cron schedule and run all available missing intervals. 515 516 Returns: 517 True if the run was successful, False otherwise. 518 """ 519 environment = environment or self.config.default_target_environment 520 521 self.notification_target_manager.notify( 522 NotificationEvent.RUN_START, environment=environment 523 ) 524 success = False 525 try: 526 success = self._run( 527 environment=environment, 528 start=start, 529 end=end, 530 execution_time=execution_time, 531 skip_janitor=skip_janitor, 532 ignore_cron=ignore_cron, 533 ) 534 except Exception as e: 535 self.notification_target_manager.notify( 536 NotificationEvent.RUN_FAILURE, traceback.format_exc() 537 ) 538 logger.error(f"Run Failure: {traceback.format_exc()}") 539 raise e 540 541 if success: 542 self.notification_target_manager.notify( 543 NotificationEvent.RUN_END, environment=environment 544 ) 545 self.console.log_success(f"Run finished for environment '{environment}'") 546 else: 547 self.notification_target_manager.notify( 548 NotificationEvent.RUN_FAILURE, "See console logs for details." 549 ) 550 551 return success 552 553 @t.overload 554 def get_model( 555 self, model_or_snapshot: ModelOrSnapshot, raise_if_missing: Literal[True] = True 556 ) -> Model: ... 557 558 @t.overload 559 def get_model( 560 self, 561 model_or_snapshot: ModelOrSnapshot, 562 raise_if_missing: Literal[False] = False, 563 ) -> t.Optional[Model]: ... 564 565 def get_model( 566 self, model_or_snapshot: ModelOrSnapshot, raise_if_missing: bool = False 567 ) -> t.Optional[Model]: 568 """Returns a model with the given name or None if a model with such name doesn't exist. 569 570 Args: 571 model_or_snapshot: A model name, model, or snapshot. 572 raise_if_missing: Raises an error if a model is not found. 573 574 Returns: 575 The expected model. 576 """ 577 if isinstance(model_or_snapshot, str): 578 normalized_name = normalize_model_name( 579 model_or_snapshot, 580 dialect=self.default_dialect, 581 default_catalog=self.default_catalog, 582 ) 583 model = self._models.get(normalized_name) 584 elif isinstance(model_or_snapshot, Snapshot): 585 model = model_or_snapshot.model 586 else: 587 model = model_or_snapshot 588 589 if raise_if_missing and not model: 590 raise SQLMeshError(f"Cannot find model for '{model_or_snapshot}'") 591 592 return model 593 594 @t.overload 595 def get_snapshot(self, node_or_snapshot: NodeOrSnapshot) -> t.Optional[Snapshot]: ... 596 597 @t.overload 598 def get_snapshot( 599 self, node_or_snapshot: NodeOrSnapshot, raise_if_missing: Literal[True] 600 ) -> Snapshot: ... 601 602 @t.overload 603 def get_snapshot( 604 self, node_or_snapshot: NodeOrSnapshot, raise_if_missing: Literal[False] 605 ) -> t.Optional[Snapshot]: ... 606 607 def get_snapshot( 608 self, node_or_snapshot: NodeOrSnapshot, raise_if_missing: bool = False 609 ) -> t.Optional[Snapshot]: 610 """Returns a snapshot with the given name or None if a snapshot with such name doesn't exist. 611 612 Args: 613 node_or_snapshot: A node name, node, or snapshot. 614 raise_if_missing: Raises an error if a snapshot is not found. 615 616 Returns: 617 The expected snapshot. 618 """ 619 if isinstance(node_or_snapshot, Snapshot): 620 return node_or_snapshot 621 if isinstance(node_or_snapshot, str) and not self.standalone_audits.get(node_or_snapshot): 622 node_or_snapshot = normalize_model_name( 623 node_or_snapshot, 624 dialect=self.default_dialect, 625 default_catalog=self.default_catalog, 626 ) 627 fqn = node_or_snapshot if isinstance(node_or_snapshot, str) else node_or_snapshot.fqn 628 snapshot = self.snapshots.get(fqn) 629 630 if raise_if_missing and not snapshot: 631 raise SQLMeshError(f"Cannot find snapshot for '{fqn}'") 632 633 return snapshot 634 635 def config_for_path(self, path: Path) -> Config: 636 for config_path, config in self.configs.items(): 637 try: 638 path.relative_to(config_path) 639 return config 640 except ValueError: 641 pass 642 return self.config 643 644 def config_for_node(self, node: str | Model | StandaloneAudit) -> Config: 645 if isinstance(node, str): 646 return self.config_for_path(self.get_snapshot(node, raise_if_missing=True).node._path) # type: ignore 647 return self.config_for_path(node._path) # type: ignore 648 649 @property 650 def models(self) -> MappingProxyType[str, Model]: 651 """Returns all registered models in this context.""" 652 return MappingProxyType(self._models) 653 654 @property 655 def metrics(self) -> MappingProxyType[str, Metric]: 656 """Returns all registered metrics in this context.""" 657 return MappingProxyType(self._metrics) 658 659 @property 660 def standalone_audits(self) -> MappingProxyType[str, StandaloneAudit]: 661 """Returns all registered standalone audits in this context.""" 662 return MappingProxyType(self._standalone_audits) 663 664 @property 665 def snapshots(self) -> t.Dict[str, Snapshot]: 666 """Generates and returns snapshots based on models registered in this context. 667 668 If one of the snapshots has been previously stored in the persisted state, the stored 669 instance will be returned. 670 """ 671 return self._snapshots() 672 673 @property 674 def default_catalog(self) -> t.Optional[str]: 675 if self._default_catalog is None: 676 self._default_catalog = self._scheduler.get_default_catalog(self) 677 return self._default_catalog 678 679 def render( 680 self, 681 model_or_snapshot: ModelOrSnapshot, 682 *, 683 start: t.Optional[TimeLike] = None, 684 end: t.Optional[TimeLike] = None, 685 execution_time: t.Optional[TimeLike] = None, 686 expand: t.Union[bool, t.Iterable[str]] = False, 687 **kwargs: t.Any, 688 ) -> exp.Expression: 689 """Renders a model's query, expanding macros with provided kwargs, and optionally expanding referenced models. 690 691 Args: 692 model_or_snapshot: The model, model name, or snapshot to render. 693 start: The start of the interval to render. 694 end: The end of the interval to render. 695 execution_time: The date/time time reference to use for execution time. Defaults to now. 696 expand: Whether or not to use expand materialized models, defaults to False. 697 If True, all referenced models are expanded as raw queries. 698 If a list, only referenced models are expanded as raw queries. 699 700 Returns: 701 The rendered expression. 702 """ 703 execution_time = execution_time or now_ds() 704 705 model = self.get_model(model_or_snapshot, raise_if_missing=True) 706 707 if expand and not isinstance(expand, bool): 708 expand = { 709 normalize_model_name( 710 x, default_catalog=self.default_catalog, dialect=self.default_dialect 711 ) 712 for x in expand 713 } 714 715 expand = self.dag.upstream(model.fqn) if expand is True else expand or [] 716 717 if model.is_seed: 718 df = next( 719 model.render( 720 context=self.execution_context(), 721 start=start, 722 end=end, 723 execution_time=execution_time, 724 **kwargs, 725 ) 726 ) 727 return next(pandas_to_sql(t.cast(pd.DataFrame, df), model.columns_to_types)) 728 729 return model.render_query_or_raise( 730 start=start, 731 end=end, 732 execution_time=execution_time, 733 snapshots=self.snapshots, 734 expand=expand, 735 engine_adapter=self.engine_adapter, 736 **kwargs, 737 ) 738 739 def evaluate( 740 self, 741 model_or_snapshot: ModelOrSnapshot, 742 start: TimeLike, 743 end: TimeLike, 744 execution_time: TimeLike, 745 limit: t.Optional[int] = None, 746 **kwargs: t.Any, 747 ) -> DF: 748 """Evaluate a model or snapshot (running its query against a DB/Engine). 749 750 This method is used to test or iterate on models without side effects. 751 752 Args: 753 model_or_snapshot: The model, model name, or snapshot to render. 754 start: The start of the interval to evaluate. 755 end: The end of the interval to evaluate. 756 execution_time: The date/time time reference to use for execution time. 757 limit: A limit applied to the model. 758 """ 759 snapshot = self.get_snapshot(model_or_snapshot, raise_if_missing=True) 760 761 df = self.snapshot_evaluator.evaluate_and_fetch( 762 snapshot, 763 start=start, 764 end=end, 765 execution_time=execution_time, 766 snapshots=self.snapshots, 767 limit=limit or c.DEFAULT_MAX_LIMIT, 768 ) 769 770 if df is None: 771 raise RuntimeError(f"Error evaluating {snapshot.name}") 772 773 return df 774 775 def format( 776 self, 777 transpile: t.Optional[str] = None, 778 append_newline: t.Optional[bool] = None, 779 **kwargs: t.Any, 780 ) -> None: 781 """Format all SQL models.""" 782 for model in self._models.values(): 783 if not model._path.suffix == ".sql": 784 continue 785 with open(model._path, "r+", encoding="utf-8") as file: 786 expressions = parse( 787 file.read(), default_dialect=self.config_for_node(model).dialect 788 ) 789 if transpile: 790 for prop in expressions[0].expressions: 791 if prop.name.lower() == "dialect": 792 prop.replace( 793 exp.Property( 794 this="dialect", 795 value=exp.Literal.string(transpile or model.dialect), 796 ) 797 ) 798 format = self.config_for_node(model).format 799 opts = {**format.generator_options, **kwargs} 800 file.seek(0) 801 file.write( 802 format_model_expressions(expressions, transpile or model.dialect, **opts) 803 ) 804 if append_newline is None: 805 append_newline = format.append_newline 806 if append_newline: 807 file.write("\n") 808 file.truncate() 809 810 def plan( 811 self, 812 environment: t.Optional[str] = None, 813 *, 814 start: t.Optional[TimeLike] = None, 815 end: t.Optional[TimeLike] = None, 816 execution_time: t.Optional[TimeLike] = None, 817 create_from: t.Optional[str] = None, 818 skip_tests: bool = False, 819 restate_models: t.Optional[t.Iterable[str]] = None, 820 no_gaps: bool = False, 821 skip_backfill: bool = False, 822 forward_only: t.Optional[bool] = None, 823 no_prompts: t.Optional[bool] = None, 824 auto_apply: t.Optional[bool] = None, 825 no_auto_categorization: t.Optional[bool] = None, 826 effective_from: t.Optional[TimeLike] = None, 827 include_unmodified: t.Optional[bool] = None, 828 select_models: t.Optional[t.Collection[str]] = None, 829 backfill_models: t.Optional[t.Collection[str]] = None, 830 categorizer_config: t.Optional[CategorizerConfig] = None, 831 enable_preview: t.Optional[bool] = None, 832 no_diff: t.Optional[bool] = None, 833 run: bool = False, 834 ) -> Plan: 835 """Interactively creates a plan. 836 837 This method compares the current context with the target environment. It then presents 838 the differences and asks whether to backfill each modified model. 839 840 Args: 841 environment: The environment to diff and plan against. 842 start: The start date of the backfill if there is one. 843 end: The end date of the backfill if there is one. 844 execution_time: The date/time reference to use for execution time. Defaults to now. 845 create_from: The environment to create the target environment from if it 846 doesn't exist. If not specified, the "prod" environment will be used. 847 skip_tests: Unit tests are run by default so this will skip them if enabled 848 restate_models: A list of either internal or external models, or tags, that need to be restated 849 for the given plan interval. If the target environment is a production environment, 850 ALL snapshots that depended on these upstream tables will have their intervals deleted 851 (even ones not in this current environment). Only the snapshots in this environment will 852 be backfilled whereas others need to be recovered on a future plan application. For development 853 environments only snapshots that are part of this plan will be affected. 854 no_gaps: Whether to ensure that new snapshots for models that are already a 855 part of the target environment have no data gaps when compared against previous 856 snapshots for same models. 857 skip_backfill: Whether to skip the backfill step. Default: False. 858 forward_only: Whether the purpose of the plan is to make forward only changes. 859 no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that 860 if this flag is set to true and there are uncategorized changes the plan creation will 861 fail. Default: False. 862 auto_apply: Whether to automatically apply the new plan after creation. Default: False. 863 no_auto_categorization: Indicates whether to disable automatic categorization of model 864 changes (breaking / non-breaking). If not provided, then the corresponding configuration 865 option determines the behavior. 866 categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the 867 project config by default. 868 effective_from: The effective date from which to apply forward-only changes on production. 869 include_unmodified: Indicates whether to include unmodified models in the target development environment. 870 select_models: A list of model selection strings to filter the models that should be included into this plan. 871 backfill_models: A list of model selection strings to filter the models for which the data should be backfilled. 872 enable_preview: Indicates whether to enable preview for forward-only models in development environments. 873 no_diff: Hide text differences for changed models. 874 run: Whether to run latest intervals as part of the plan application. 875 876 Returns: 877 The populated Plan object. 878 """ 879 plan_builder = self.plan_builder( 880 environment, 881 start=start, 882 end=end, 883 execution_time=execution_time, 884 create_from=create_from, 885 skip_tests=skip_tests, 886 restate_models=restate_models, 887 no_gaps=no_gaps, 888 skip_backfill=skip_backfill, 889 forward_only=forward_only, 890 no_auto_categorization=no_auto_categorization, 891 effective_from=effective_from, 892 include_unmodified=include_unmodified, 893 select_models=select_models, 894 backfill_models=backfill_models, 895 categorizer_config=categorizer_config, 896 enable_preview=enable_preview, 897 run=run, 898 ) 899 900 self.console.plan( 901 plan_builder, 902 auto_apply if auto_apply is not None else self.config.plan.auto_apply, 903 self.default_catalog, 904 no_diff=no_diff if no_diff is not None else self.config.plan.no_diff, 905 no_prompts=no_prompts if no_prompts is not None else self.config.plan.no_prompts, 906 ) 907 908 return plan_builder.build() 909 910 def plan_builder( 911 self, 912 environment: t.Optional[str] = None, 913 *, 914 start: t.Optional[TimeLike] = None, 915 end: t.Optional[TimeLike] = None, 916 execution_time: t.Optional[TimeLike] = None, 917 create_from: t.Optional[str] = None, 918 skip_tests: bool = False, 919 restate_models: t.Optional[t.Iterable[str]] = None, 920 no_gaps: bool = False, 921 skip_backfill: bool = False, 922 forward_only: t.Optional[bool] = None, 923 no_auto_categorization: t.Optional[bool] = None, 924 effective_from: t.Optional[TimeLike] = None, 925 include_unmodified: t.Optional[bool] = None, 926 select_models: t.Optional[t.Collection[str]] = None, 927 backfill_models: t.Optional[t.Collection[str]] = None, 928 categorizer_config: t.Optional[CategorizerConfig] = None, 929 enable_preview: t.Optional[bool] = None, 930 run: bool = False, 931 ) -> PlanBuilder: 932 """Creates a plan builder. 933 934 Args: 935 environment: The environment to diff and plan against. 936 start: The start date of the backfill if there is one. 937 end: The end date of the backfill if there is one. 938 execution_time: The date/time reference to use for execution time. Defaults to now. 939 create_from: The environment to create the target environment from if it 940 doesn't exist. If not specified, the "prod" environment will be used. 941 skip_tests: Unit tests are run by default so this will skip them if enabled 942 restate_models: A list of either internal or external models, or tags, that need to be restated 943 for the given plan interval. If the target environment is a production environment, 944 ALL snapshots that depended on these upstream tables will have their intervals deleted 945 (even ones not in this current environment). Only the snapshots in this environment will 946 be backfilled whereas others need to be recovered on a future plan application. For development 947 environments only snapshots that are part of this plan will be affected. 948 no_gaps: Whether to ensure that new snapshots for models that are already a 949 part of the target environment have no data gaps when compared against previous 950 snapshots for same models. 951 skip_backfill: Whether to skip the backfill step. Default: False. 952 forward_only: Whether the purpose of the plan is to make forward only changes. 953 no_auto_categorization: Indicates whether to disable automatic categorization of model 954 changes (breaking / non-breaking). If not provided, then the corresponding configuration 955 option determines the behavior. 956 categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the 957 project config by default. 958 effective_from: The effective date from which to apply forward-only changes on production. 959 include_unmodified: Indicates whether to include unmodified models in the target development environment. 960 select_models: A list of model selection strings to filter the models that should be included into this plan. 961 backfill_models: A list of model selection strings to filter the models for which the data should be backfilled. 962 enable_preview: Indicates whether to enable preview for forward-only models in development environments. 963 run: Whether to run latest intervals as part of the plan application. 964 965 Returns: 966 The plan builder. 967 """ 968 environment = environment or self.config.default_target_environment 969 environment = Environment.normalize_name(environment) 970 is_dev = environment != c.PROD 971 972 if skip_backfill and not no_gaps and not is_dev: 973 raise ConfigError( 974 "When targeting the production environment either the backfill should not be skipped or the lack of data gaps should be enforced (--no-gaps flag)." 975 ) 976 977 if run and is_dev: 978 raise ConfigError("The '--run' flag is only supported for the production environment.") 979 980 self._run_plan_tests(skip_tests=skip_tests) 981 982 environment_ttl = ( 983 self.environment_ttl if environment not in self.pinned_environments else None 984 ) 985 986 model_selector = self._new_selector() 987 988 if backfill_models: 989 backfill_models = model_selector.expand_model_selections(backfill_models) 990 else: 991 backfill_models = None 992 993 models_override: t.Optional[UniqueKeyDict[str, Model]] = None 994 if select_models: 995 models_override = model_selector.select_models( 996 select_models, 997 environment, 998 fallback_env_name=create_from or c.PROD, 999 ensure_finalized_snapshots=self.config.plan.use_finalized_state, 1000 ) 1001 if not backfill_models: 1002 # Only backfill selected models unless explicitly specified. 1003 backfill_models = model_selector.expand_model_selections(select_models) 1004 1005 expanded_restate_models = None 1006 if restate_models is not None: 1007 expanded_restate_models = model_selector.expand_model_selections(restate_models) 1008 if not expanded_restate_models: 1009 self.console.log_error( 1010 f"Provided restated models do not match any models. No models will be included in plan. Provided: {', '.join(restate_models)}" 1011 ) 1012 1013 snapshots = self._snapshots(models_override) 1014 context_diff = self._context_diff( 1015 environment or c.PROD, 1016 snapshots=snapshots, 1017 create_from=create_from, 1018 force_no_diff=(restate_models is not None and not expanded_restate_models) 1019 or (backfill_models is not None and not backfill_models), 1020 ensure_finalized_snapshots=self.config.plan.use_finalized_state, 1021 ) 1022 1023 # If no end date is specified, use the max interval end from prod 1024 # to prevent unintended evaluation of the entire DAG. 1025 if not run: 1026 if backfill_models is not None: 1027 # Only consider selected models for the default end value. 1028 models_for_default_end = backfill_models.copy() 1029 for name in backfill_models: 1030 if name not in snapshots: 1031 continue 1032 snapshot = snapshots[name] 1033 snapshot_id = snapshot.snapshot_id 1034 if ( 1035 snapshot_id in context_diff.added 1036 and snapshot_id in context_diff.new_snapshots 1037 ): 1038 # If the selected model is a newly added model, then we should narrow down the intervals 1039 # that should be considered for the default plan end value by including its parents. 1040 models_for_default_end |= {s.name for s in snapshot.parents} 1041 default_end = self.state_sync.greatest_common_interval_end( 1042 c.PROD, 1043 models_for_default_end, 1044 ensure_finalized_snapshots=self.config.plan.use_finalized_state, 1045 ) 1046 else: 1047 default_end = self.state_sync.max_interval_end_for_environment( 1048 c.PROD, ensure_finalized_snapshots=self.config.plan.use_finalized_state 1049 ) 1050 else: 1051 default_end = None 1052 1053 default_start = to_date(default_end) - timedelta(days=1) if default_end and is_dev else None 1054 1055 return PlanBuilder( 1056 context_diff=context_diff, 1057 start=start, 1058 end=end, 1059 execution_time=execution_time, 1060 apply=self.apply, 1061 restate_models=expanded_restate_models, 1062 backfill_models=backfill_models, 1063 no_gaps=no_gaps, 1064 skip_backfill=skip_backfill, 1065 is_dev=is_dev, 1066 forward_only=( 1067 forward_only if forward_only is not None else self.config.plan.forward_only 1068 ), 1069 environment_ttl=environment_ttl, 1070 environment_suffix_target=self.config.environment_suffix_target, 1071 environment_catalog_mapping=self.config.environment_catalog_mapping, 1072 categorizer_config=categorizer_config or self.auto_categorize_changes, 1073 auto_categorization_enabled=not no_auto_categorization, 1074 effective_from=effective_from, 1075 include_unmodified=( 1076 include_unmodified 1077 if include_unmodified is not None 1078 else self.config.plan.include_unmodified 1079 ), 1080 default_start=default_start, 1081 default_end=default_end, 1082 enable_preview=( 1083 enable_preview if enable_preview is not None else self.config.plan.enable_preview 1084 ), 1085 end_bounded=not run, 1086 ensure_finalized_snapshots=self.config.plan.use_finalized_state, 1087 ) 1088 1089 def apply( 1090 self, 1091 plan: Plan, 1092 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 1093 ) -> None: 1094 """Applies a plan by pushing snapshots and backfilling data. 1095 1096 Given a plan, it pushes snapshots into the state sync and then uses the scheduler 1097 to backfill all models. 1098 1099 Args: 1100 plan: The plan to apply. 1101 circuit_breaker: An optional handler which checks if the apply should be aborted. 1102 """ 1103 if ( 1104 not plan.context_diff.has_changes 1105 and not plan.requires_backfill 1106 and not plan.has_unmodified_unpromoted 1107 ): 1108 return 1109 if plan.uncategorized: 1110 raise UncategorizedPlanError("Can't apply a plan with uncategorized changes.") 1111 self.notification_target_manager.notify( 1112 NotificationEvent.APPLY_START, 1113 environment=plan.environment_naming_info.name, 1114 plan_id=plan.plan_id, 1115 ) 1116 try: 1117 self._apply(plan, circuit_breaker) 1118 except Exception as e: 1119 self.notification_target_manager.notify( 1120 NotificationEvent.APPLY_FAILURE, 1121 environment=plan.environment_naming_info.name, 1122 plan_id=plan.plan_id, 1123 exc=traceback.format_exc(), 1124 ) 1125 logger.error(f"Apply Failure: {traceback.format_exc()}") 1126 raise e 1127 self.notification_target_manager.notify( 1128 NotificationEvent.APPLY_END, 1129 environment=plan.environment_naming_info.name, 1130 plan_id=plan.plan_id, 1131 ) 1132 1133 def invalidate_environment(self, name: str, sync: bool = False) -> None: 1134 """Invalidates the target environment by setting its expiration timestamp to now. 1135 1136 Args: 1137 name: The name of the environment to invalidate. 1138 sync: If True, the call blocks until the environment is deleted. Otherwise, the environment will 1139 be deleted asynchronously by the janitor process. 1140 """ 1141 self.state_sync.invalidate_environment(name) 1142 if sync: 1143 self._cleanup_environments() 1144 self.console.log_success(f"Environment '{name}' has been deleted.") 1145 else: 1146 self.console.log_success(f"Environment '{name}' has been invalidated.") 1147 1148 def diff(self, environment: t.Optional[str] = None, detailed: bool = False) -> bool: 1149 """Show a diff of the current context with a given environment. 1150 1151 Args: 1152 environment: The environment to diff against. 1153 detailed: Show the actual SQL differences if True. 1154 1155 Returns: 1156 True if there are changes, False otherwise. 1157 """ 1158 environment = environment or self.config.default_target_environment 1159 environment = Environment.normalize_name(environment) 1160 context_diff = self._context_diff(environment) 1161 self.console.show_model_difference_summary( 1162 context_diff, 1163 EnvironmentNamingInfo.from_environment_catalog_mapping( 1164 self.config.environment_catalog_mapping, 1165 name=environment, 1166 suffix_target=self.config.environment_suffix_target, 1167 ), 1168 self.default_catalog, 1169 no_diff=not detailed, 1170 ) 1171 return context_diff.has_changes 1172 1173 def table_diff( 1174 self, 1175 source: str, 1176 target: str, 1177 on: t.List[str] | exp.Condition | None = None, 1178 model_or_snapshot: t.Optional[ModelOrSnapshot] = None, 1179 where: t.Optional[str | exp.Condition] = None, 1180 limit: int = 20, 1181 show: bool = True, 1182 show_sample: bool = True, 1183 ) -> TableDiff: 1184 """Show a diff between two tables. 1185 1186 Args: 1187 source: The source environment or table. 1188 target: The target environment or table. 1189 on: The join condition, table aliases must be "s" and "t" for source and target. 1190 If omitted, the table's grain will be used. 1191 model_or_snapshot: The model or snapshot to use when environments are passed in. 1192 where: An optional where statement to filter results. 1193 limit: The limit of the sample dataframe. 1194 show: Show the table diff output in the console. 1195 show_sample: Show the sample dataframe in the console. Requires show=True. 1196 1197 Returns: 1198 The TableDiff object containing schema and summary differences. 1199 """ 1200 source_alias, target_alias = source, target 1201 if model_or_snapshot: 1202 model = self.get_model(model_or_snapshot, raise_if_missing=True) 1203 source_env = self.state_reader.get_environment(source) 1204 target_env = self.state_reader.get_environment(target) 1205 1206 if not source_env: 1207 raise SQLMeshError(f"Could not find environment '{source}'") 1208 if not target_env: 1209 raise SQLMeshError(f"Could not find environment '{target}')") 1210 1211 source = next( 1212 snapshot for snapshot in source_env.snapshots if snapshot.name == model.fqn 1213 ).table_name() 1214 target = next( 1215 snapshot for snapshot in target_env.snapshots if snapshot.name == model.fqn 1216 ).table_name() 1217 source_alias = source_env.name 1218 target_alias = target_env.name 1219 1220 if not on: 1221 for ref in model.all_references: 1222 if ref.unique: 1223 on = ref.columns 1224 1225 if not on: 1226 raise SQLMeshError( 1227 "SQLMesh doesn't know how to join the two tables. Specify the `grains` in each model definition or pass join column names in separate `-o` flags." 1228 ) 1229 1230 table_diff = TableDiff( 1231 adapter=self._engine_adapter, 1232 source=source, 1233 target=target, 1234 on=on, 1235 where=where, 1236 source_alias=source_alias, 1237 target_alias=target_alias, 1238 model_name=model.name if model_or_snapshot else None, 1239 limit=limit, 1240 ) 1241 if show: 1242 self.console.show_schema_diff(table_diff.schema_diff()) 1243 self.console.show_row_diff(table_diff.row_diff(), show_sample=show_sample) 1244 return table_diff 1245 1246 def get_dag( 1247 self, select_models: t.Optional[t.Collection[str]] = None, **options: t.Any 1248 ) -> GraphHTML: 1249 """Gets an HTML object representation of the DAG. 1250 1251 Args: 1252 select_models: A list of model selection strings that should be included in the dag. 1253 Returns: 1254 An html object that renders the dag. 1255 """ 1256 dag = ( 1257 self.dag.prune(*self._new_selector().expand_model_selections(select_models)) 1258 if select_models 1259 else self.dag 1260 ) 1261 1262 nodes = {} 1263 edges: t.List[t.Dict] = [] 1264 1265 for node, deps in dag.graph.items(): 1266 nodes[node] = { 1267 "id": node, 1268 "label": node.split(".")[-1], 1269 "title": f"<span>{node}</span>", 1270 } 1271 edges.extend({"from": d, "to": node} for d in deps) 1272 1273 return GraphHTML( 1274 nodes, 1275 edges, 1276 options={ 1277 "height": "100%", 1278 "width": "100%", 1279 "interaction": {}, 1280 "layout": { 1281 "hierarchical": { 1282 "enabled": True, 1283 "nodeSpacing": 200, 1284 "sortMethod": "directed", 1285 }, 1286 }, 1287 "nodes": { 1288 "shape": "box", 1289 }, 1290 **options, 1291 }, 1292 ) 1293 1294 def render_dag(self, path: str, select_models: t.Optional[t.Collection[str]] = None) -> None: 1295 """Render the dag as HTML and save it to a file. 1296 1297 Args: 1298 path: filename to save the dag html to 1299 select_models: A list of model selection strings that should be included in the dag. 1300 """ 1301 file_path = Path(path) 1302 suffix = file_path.suffix 1303 if suffix != ".html": 1304 if suffix: 1305 logger.warning( 1306 f"The extension {suffix} does not designate an html file. A file with a `.html` extension will be created instead." 1307 ) 1308 path = str(file_path.with_suffix(".html")) 1309 1310 with open(path, "w", encoding="utf-8") as file: 1311 file.write(str(self.get_dag(select_models))) 1312 1313 def create_test( 1314 self, 1315 model: str, 1316 input_queries: t.Dict[str, str], 1317 overwrite: bool = False, 1318 variables: t.Optional[t.Dict[str, str]] = None, 1319 path: t.Optional[str] = None, 1320 name: t.Optional[str] = None, 1321 include_ctes: bool = False, 1322 ) -> None: 1323 """Generate a unit test fixture for a given model. 1324 1325 Args: 1326 model: The model to test. 1327 input_queries: Mapping of model names to queries. Each model included in this mapping 1328 will be populated in the test based on the results of the corresponding query. 1329 overwrite: Whether to overwrite the existing test in case of a file path collision. 1330 When set to False, an error will be raised if there is such a collision. 1331 variables: Key-value pairs that will define variables needed by the model. 1332 path: The file path corresponding to the fixture, relative to the test directory. 1333 By default, the fixture will be created under the test directory and the file name 1334 will be inferred from the test's name. 1335 name: The name of the test. This is inferred from the model name by default. 1336 include_ctes: When true, CTE fixtures will also be generated. 1337 """ 1338 input_queries = { 1339 # The get_model here has two purposes: return normalized names & check for missing deps 1340 self.get_model(dep, raise_if_missing=True).fqn: query 1341 for dep, query in input_queries.items() 1342 } 1343 1344 try: 1345 test_adapter = self._test_connection_config.create_engine_adapter( 1346 register_comments_override=False 1347 ) 1348 generate_test( 1349 model=self.get_model(model, raise_if_missing=True), 1350 input_queries=input_queries, 1351 models=self._models, 1352 engine_adapter=self._engine_adapter, 1353 test_engine_adapter=test_adapter, 1354 project_path=self.path, 1355 overwrite=overwrite, 1356 variables=variables, 1357 path=path, 1358 name=name, 1359 include_ctes=include_ctes, 1360 ) 1361 finally: 1362 test_adapter.close() 1363 1364 def test( 1365 self, 1366 match_patterns: t.Optional[t.List[str]] = None, 1367 tests: t.Optional[t.List[str]] = None, 1368 verbose: bool = False, 1369 preserve_fixtures: bool = False, 1370 stream: t.Optional[t.TextIO] = None, 1371 ) -> ModelTextTestResult: 1372 """Discover and run model tests""" 1373 if verbose: 1374 pd.set_option("display.max_columns", None) 1375 verbosity = 2 1376 else: 1377 verbosity = 1 1378 1379 if tests: 1380 result = run_model_tests( 1381 tests=tests, 1382 models=self._models, 1383 config=self.config, 1384 gateway=self.gateway, 1385 dialect=self.default_dialect, 1386 verbosity=verbosity, 1387 patterns=match_patterns, 1388 preserve_fixtures=preserve_fixtures, 1389 stream=stream, 1390 default_catalog=self.default_catalog, 1391 default_catalog_dialect=self.engine_adapter.DIALECT, 1392 ) 1393 else: 1394 test_meta = [] 1395 1396 for path, config in self.configs.items(): 1397 test_meta.extend( 1398 get_all_model_tests( 1399 path / c.TESTS, 1400 patterns=match_patterns, 1401 ignore_patterns=config.ignore_patterns, 1402 ) 1403 ) 1404 1405 result = run_tests( 1406 model_test_metadata=test_meta, 1407 models=self._models, 1408 config=self.config, 1409 gateway=self.gateway, 1410 dialect=self.default_dialect, 1411 verbosity=verbosity, 1412 preserve_fixtures=preserve_fixtures, 1413 stream=stream, 1414 default_catalog=self.default_catalog, 1415 default_catalog_dialect=self.engine_adapter.DIALECT, 1416 ) 1417 1418 return result 1419 1420 def audit( 1421 self, 1422 start: TimeLike, 1423 end: TimeLike, 1424 *, 1425 models: t.Optional[t.Iterator[str]] = None, 1426 execution_time: t.Optional[TimeLike] = None, 1427 ) -> None: 1428 """Audit models. 1429 1430 Args: 1431 start: The start of the interval to audit. 1432 end: The end of the interval to audit. 1433 models: The models to audit. All models will be audited if not specified. 1434 execution_time: The date/time time reference to use for execution time. Defaults to now. 1435 """ 1436 1437 snapshots = ( 1438 [self.get_snapshot(model, raise_if_missing=True) for model in models] 1439 if models 1440 else self.snapshots.values() 1441 ) 1442 1443 num_audits = sum(len(snapshot.audits_with_args) for snapshot in snapshots) 1444 self.console.log_status_update(f"Found {num_audits} audit(s).") 1445 errors = [] 1446 skipped_count = 0 1447 for snapshot in snapshots: 1448 for audit_result in self.snapshot_evaluator.audit( 1449 snapshot=snapshot, 1450 start=start, 1451 end=end, 1452 snapshots=self.snapshots, 1453 raise_exception=False, 1454 ): 1455 audit_id = f"{audit_result.audit.name}" 1456 if audit_result.model: 1457 audit_id += f" on model {audit_result.model.name}" 1458 1459 if audit_result.skipped: 1460 self.console.log_status_update(f"{audit_id} ⏸️ SKIPPED.") 1461 skipped_count += 1 1462 elif audit_result.count: 1463 errors.append(audit_result) 1464 self.console.log_status_update( 1465 f"{audit_id} ❌ [red]FAIL [{audit_result.count}][/red]." 1466 ) 1467 else: 1468 self.console.log_status_update(f"{audit_id} ✅ [green]PASS[/green].") 1469 1470 self.console.log_status_update( 1471 f"\nFinished with {len(errors)} audit error{'' if len(errors) == 1 else 's'} " 1472 f"and {skipped_count} audit{'' if skipped_count == 1 else 's'} skipped." 1473 ) 1474 for error in errors: 1475 self.console.log_status_update( 1476 f"\nFailure in audit {error.audit.name} ({error.audit._path})." 1477 ) 1478 self.console.log_status_update(f"Got {error.count} results, expected 0.") 1479 if error.query: 1480 self.console.show_sql( 1481 f"{error.query.sql(dialect=self.snapshot_evaluator.adapter.dialect)}" 1482 ) 1483 1484 self.console.log_status_update("Done.") 1485 1486 def rewrite(self, sql: str, dialect: str = "") -> exp.Expression: 1487 """Rewrite a sql expression with semantic references into an executable query. 1488 1489 https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/ 1490 1491 Args: 1492 sql: The sql string to rewrite. 1493 dialect: The dialect of the sql string, defaults to the project dialect. 1494 1495 Returns: 1496 A SQLGlot expression with semantic references expanded. 1497 """ 1498 return rewrite( 1499 sql, 1500 graph=ReferenceGraph(self.models.values()), 1501 metrics=self._metrics, 1502 dialect=dialect or self.default_dialect, 1503 ) 1504 1505 def migrate(self) -> None: 1506 """Migrates SQLMesh to the current running version. 1507 1508 Please contact your SQLMesh administrator before doing this. 1509 """ 1510 self.notification_target_manager.notify(NotificationEvent.MIGRATION_START) 1511 try: 1512 self._new_state_sync().migrate( 1513 default_catalog=self.default_catalog, 1514 promoted_snapshots_only=self.config.migration.promoted_snapshots_only, 1515 ) 1516 except Exception as e: 1517 self.notification_target_manager.notify( 1518 NotificationEvent.MIGRATION_FAILURE, traceback.format_exc() 1519 ) 1520 raise e 1521 self.notification_target_manager.notify(NotificationEvent.MIGRATION_END) 1522 1523 def rollback(self) -> None: 1524 """Rolls back SQLMesh to the previous migration. 1525 1526 Please contact your SQLMesh administrator before doing this. This action cannot be undone. 1527 """ 1528 self._new_state_sync().rollback() 1529 1530 def create_external_models(self) -> None: 1531 """Create a schema file with all external models. 1532 1533 The schema file contains all columns and types of external models, allowing for more robust 1534 lineage, validation, and optimizations. 1535 """ 1536 if not self._models: 1537 self.load(update_schemas=False) 1538 1539 for path, config in self.configs.items(): 1540 create_schema_file( 1541 path=path / c.SCHEMA_YAML, 1542 models=UniqueKeyDict( 1543 "models", 1544 { 1545 fqn: model 1546 for fqn, model in self._models.items() 1547 if self.config_for_node(model) is config 1548 }, 1549 ), 1550 adapter=self._engine_adapter, 1551 state_reader=self.state_reader, 1552 dialect=config.model_defaults.dialect, 1553 max_workers=self.concurrent_tasks, 1554 ) 1555 1556 def print_info(self) -> None: 1557 """Prints information about connections, models, macros, etc. to the console.""" 1558 self.console.log_status_update(f"Models: {len(self.models)}") 1559 self.console.log_status_update(f"Macros: {len(self._macros) - len(macro.get_registry())}") 1560 1561 self._try_connection("data warehouse", self._engine_adapter) 1562 1563 state_connection = self.config.get_state_connection(self.gateway) 1564 if state_connection: 1565 self._try_connection("state backend", state_connection.create_engine_adapter()) 1566 1567 def close(self) -> None: 1568 """Releases all resources allocated by this context.""" 1569 self.snapshot_evaluator.close() 1570 self.state_sync.close() 1571 1572 def _run( 1573 self, 1574 environment: str, 1575 *, 1576 start: t.Optional[TimeLike], 1577 end: t.Optional[TimeLike], 1578 execution_time: t.Optional[TimeLike], 1579 skip_janitor: bool, 1580 ignore_cron: bool, 1581 ) -> bool: 1582 if not skip_janitor and environment.lower() == c.PROD: 1583 self._run_janitor() 1584 1585 env_check_attempts_num = max( 1586 1, 1587 self.config.run.environment_check_max_wait 1588 // self.config.run.environment_check_interval, 1589 ) 1590 1591 def _block_until_finalized() -> str: 1592 for _ in range(env_check_attempts_num): 1593 assert environment is not None # mypy 1594 environment_state = self.state_sync.get_environment(environment) 1595 if not environment_state: 1596 raise SQLMeshError(f"Environment '{environment}' was not found.") 1597 if environment_state.finalized_ts: 1598 return environment_state.plan_id 1599 logger.warning( 1600 "Environment '%s' is being updated by plan '%s'. Retrying in %s seconds...", 1601 environment, 1602 environment_state.plan_id, 1603 self.config.run.environment_check_interval, 1604 ) 1605 time.sleep(self.config.run.environment_check_interval) 1606 raise SQLMeshError( 1607 f"Exceeded the maximum wait time for environment '{environment}' to be ready. " 1608 "This means that the environment either failed to update or the update is taking longer than expected. " 1609 "See https://sqlmesh.readthedocs.io/en/stable/reference/configuration/#run to adjust the timeout settings." 1610 ) 1611 1612 done = False 1613 while not done: 1614 plan_id_at_start = _block_until_finalized() 1615 1616 def _has_environment_changed() -> bool: 1617 assert environment is not None # mypy 1618 current_environment_state = self.state_sync.get_environment(environment) 1619 return ( 1620 not current_environment_state 1621 or current_environment_state.plan_id != plan_id_at_start 1622 or not current_environment_state.finalized_ts 1623 ) 1624 1625 try: 1626 success = self.scheduler(environment=environment).run( 1627 environment, 1628 start=start, 1629 end=end, 1630 execution_time=execution_time, 1631 ignore_cron=ignore_cron, 1632 circuit_breaker=_has_environment_changed, 1633 ) 1634 done = True 1635 except CircuitBreakerError: 1636 logger.warning( 1637 "Environment '%s' has been modified while running. Restarting the run...", 1638 environment, 1639 ) 1640 1641 return success 1642 1643 def _apply(self, plan: Plan, circuit_breaker: t.Optional[t.Callable[[], bool]]) -> None: 1644 self._scheduler.create_plan_evaluator(self).evaluate(plan, circuit_breaker=circuit_breaker) 1645 1646 def table_name(self, model_name: str, dev: bool) -> str: 1647 """Returns the name of the pysical table for the given model name. 1648 1649 Args: 1650 model_name: The name of the model. 1651 dev: Whether to use the deployability index for the table name. 1652 1653 Returns: 1654 The name of the physical table. 1655 """ 1656 deployability_index = ( 1657 DeployabilityIndex.create(self.snapshots.values()) 1658 if dev 1659 else DeployabilityIndex.all_deployable() 1660 ) 1661 snapshot = self.get_snapshot(model_name) 1662 if not snapshot: 1663 raise SQLMeshError(f"Model '{model_name}' was not found.") 1664 return snapshot.table_name(is_deployable=deployability_index.is_deployable(snapshot)) 1665 1666 def clear_caches(self) -> None: 1667 for path in self.configs: 1668 rmtree(path / c.CACHE) 1669 1670 def _run_tests(self, verbose: bool = False) -> t.Tuple[unittest.result.TestResult, str]: 1671 test_output_io = StringIO() 1672 result = self.test(stream=test_output_io, verbose=verbose) 1673 return result, test_output_io.getvalue() 1674 1675 def _run_plan_tests( 1676 self, skip_tests: bool = False 1677 ) -> t.Tuple[t.Optional[unittest.result.TestResult], t.Optional[str]]: 1678 if not skip_tests: 1679 result, test_output = self._run_tests() 1680 if result.testsRun > 0: 1681 self.console.log_test_results( 1682 result, test_output, self._test_connection_config._engine_adapter.DIALECT 1683 ) 1684 if not result.wasSuccessful(): 1685 raise PlanError( 1686 "Cannot generate plan due to failing test(s). Fix test(s) and run again" 1687 ) 1688 return result, test_output 1689 return None, None 1690 1691 @property 1692 def _model_tables(self) -> t.Dict[str, str]: 1693 """Mapping of model name to physical table name. 1694 1695 If a snapshot has not been versioned yet, its view name will be returned. 1696 """ 1697 return { 1698 fqn: ( 1699 snapshot.table_name() 1700 if snapshot.version 1701 else snapshot.qualified_view_name.for_environment( 1702 EnvironmentNamingInfo.from_environment_catalog_mapping( 1703 self.config.environment_catalog_mapping, 1704 name=c.PROD, 1705 suffix_target=self.config.environment_suffix_target, 1706 ) 1707 ) 1708 ) 1709 for fqn, snapshot in self.snapshots.items() 1710 } 1711 1712 def _snapshots( 1713 self, models_override: t.Optional[UniqueKeyDict[str, Model]] = None 1714 ) -> t.Dict[str, Snapshot]: 1715 prod = self.state_reader.get_environment(c.PROD) 1716 remote_snapshots = ( 1717 { 1718 snapshot.name: snapshot 1719 for snapshot in self.state_reader.get_snapshots(prod.snapshots).values() 1720 } 1721 if prod 1722 else {} 1723 ) 1724 1725 local_nodes = {**(models_override or self._models), **self._standalone_audits} 1726 nodes = local_nodes.copy() 1727 audits = self._audits.copy() 1728 projects = {config.project for config in self.configs.values()} 1729 1730 for name, snapshot in remote_snapshots.items(): 1731 if name not in nodes and snapshot.node.project not in projects: 1732 nodes[name] = snapshot.node 1733 if snapshot.is_model: 1734 for audit in snapshot.audits: 1735 if name not in audits: 1736 audits[name] = audit 1737 1738 def _nodes_to_snapshots(nodes: t.Dict[str, Node]) -> t.Dict[str, Snapshot]: 1739 snapshots: t.Dict[str, Snapshot] = {} 1740 fingerprint_cache: t.Dict[str, SnapshotFingerprint] = {} 1741 1742 for node in nodes.values(): 1743 if node.fqn not in local_nodes and node.fqn in remote_snapshots: 1744 ttl = remote_snapshots[node.fqn].ttl 1745 else: 1746 config = self.config_for_node(node) 1747 ttl = config.snapshot_ttl 1748 1749 snapshot = Snapshot.from_node( 1750 node, 1751 nodes=nodes, 1752 audits=audits, 1753 cache=fingerprint_cache, 1754 ttl=ttl, 1755 ) 1756 snapshots[snapshot.name] = snapshot 1757 return snapshots 1758 1759 snapshots = _nodes_to_snapshots(nodes) 1760 stored_snapshots = self.state_reader.get_snapshots(snapshots.values()) 1761 1762 unrestorable_snapshots = { 1763 snapshot 1764 for snapshot in stored_snapshots.values() 1765 if snapshot.name in local_nodes and snapshot.unrestorable 1766 } 1767 if unrestorable_snapshots: 1768 for snapshot in unrestorable_snapshots: 1769 logger.info( 1770 "Found a unrestorable snapshot %s. Restamping the model...", snapshot.name 1771 ) 1772 node = local_nodes[snapshot.name] 1773 nodes[snapshot.name] = node.copy( 1774 update={"stamp": f"revert to {snapshot.identifier}"} 1775 ) 1776 snapshots = _nodes_to_snapshots(nodes) 1777 stored_snapshots = self.state_reader.get_snapshots(snapshots.values()) 1778 1779 for snapshot in stored_snapshots.values(): 1780 # Keep the original model instance to preserve the query cache. 1781 snapshot.node = snapshots[snapshot.name].node 1782 1783 return {name: stored_snapshots.get(s.snapshot_id, s) for name, s in snapshots.items()} 1784 1785 def _context_diff( 1786 self, 1787 environment: str, 1788 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 1789 create_from: t.Optional[str] = None, 1790 force_no_diff: bool = False, 1791 ensure_finalized_snapshots: bool = False, 1792 ) -> ContextDiff: 1793 environment = Environment.normalize_name(environment) 1794 if force_no_diff: 1795 return ContextDiff.create_no_diff(environment) 1796 return ContextDiff.create( 1797 environment, 1798 snapshots=snapshots or self.snapshots, 1799 create_from=create_from or c.PROD, 1800 state_reader=self.state_reader, 1801 ensure_finalized_snapshots=ensure_finalized_snapshots, 1802 ) 1803 1804 def _run_janitor(self) -> None: 1805 self._cleanup_environments() 1806 expired_snapshots = self.state_sync.delete_expired_snapshots() 1807 self.snapshot_evaluator.cleanup( 1808 expired_snapshots, on_complete=self.console.update_cleanup_progress 1809 ) 1810 1811 self.state_sync.compact_intervals() 1812 1813 def _cleanup_environments(self) -> None: 1814 expired_environments = self.state_sync.delete_expired_environments() 1815 cleanup_expired_views(self.engine_adapter, expired_environments, console=self.console) 1816 1817 def _try_connection(self, connection_name: str, engine_adapter: EngineAdapter) -> None: 1818 connection_name = connection_name.capitalize() 1819 try: 1820 engine_adapter.fetchall("SELECT 1") 1821 self.console.log_status_update(f"{connection_name} connection [green]succeeded[/green]") 1822 except Exception as ex: 1823 self.console.log_error(f"{connection_name} connection failed. {ex}") 1824 1825 def _new_state_sync(self) -> StateSync: 1826 return self._provided_state_sync or self._scheduler.create_state_sync(self) 1827 1828 def _new_selector(self) -> Selector: 1829 return Selector( 1830 self.state_reader, 1831 self._models, 1832 context_path=self.path, 1833 default_catalog=self.default_catalog, 1834 dialect=self.default_dialect, 1835 ) 1836 1837 def _register_notification_targets(self) -> None: 1838 event_notifications = collections.defaultdict(set) 1839 for target in self.notification_targets: 1840 if target.is_configured: 1841 for event in target.notify_on: 1842 event_notifications[event].add(target) 1843 user_notification_targets = { 1844 user.username: set( 1845 target for target in user.notification_targets if target.is_configured 1846 ) 1847 for user in self.users 1848 } 1849 self.notification_target_manager = NotificationTargetManager( 1850 event_notifications, user_notification_targets, username=self.config.username 1851 )
Encapsulates a SQLMesh environment supplying convenient functions to perform various tasks.
Arguments:
- engine_adapter: The default engine adapter to use.
- notification_targets: The notification target to use. Defaults to what is defined in config.
- paths: The directories containing SQLMesh files.
- config: A Config object or the name of a Config object in config.py.
- connection: The name of the connection. If not specified the first connection as it appears in configuration will be used.
- test_connection: The name of the connection to use for tests. If not specified the first connection as it appears in configuration will be used.
- concurrent_tasks: The maximum number of tasks that can use the connection concurrently.
- load: Whether or not to automatically load all models and macros (default True).
- console: The rich instance used for printing out CLI command results.
- users: A list of users to make known to SQLMesh.
- config_type: The type of config object to use (default Config).
287 def __init__( 288 self, 289 engine_adapter: t.Optional[EngineAdapter] = None, 290 notification_targets: t.Optional[t.List[NotificationTarget]] = None, 291 state_sync: t.Optional[StateSync] = None, 292 paths: t.Union[str | Path, t.Iterable[str | Path]] = "", 293 config: t.Optional[t.Union[C, str, t.Dict[Path, C]]] = None, 294 gateway: t.Optional[str] = None, 295 concurrent_tasks: t.Optional[int] = None, 296 loader: t.Optional[t.Type[Loader]] = None, 297 load: bool = True, 298 console: t.Optional[Console] = None, 299 users: t.Optional[t.List[User]] = None, 300 ): 301 self.console = console or get_console() 302 self.configs = ( 303 config if isinstance(config, dict) else load_configs(config, self.CONFIG_TYPE, paths) 304 ) 305 self.dag: DAG[str] = DAG() 306 self._models: UniqueKeyDict[str, Model] = UniqueKeyDict("models") 307 self._audits: UniqueKeyDict[str, Audit] = UniqueKeyDict("audits") 308 self._standalone_audits: UniqueKeyDict[str, StandaloneAudit] = UniqueKeyDict( 309 "standaloneaudits" 310 ) 311 self._macros: UniqueKeyDict[str, ExecutableOrMacro] = UniqueKeyDict("macros") 312 self._metrics: UniqueKeyDict[str, Metric] = UniqueKeyDict("metrics") 313 self._jinja_macros = JinjaMacroRegistry() 314 self._default_catalog: t.Optional[str] = None 315 316 self.path, self.config = t.cast(t.Tuple[Path, C], next(iter(self.configs.items()))) 317 318 self.gateway = gateway 319 self._scheduler = self.config.get_scheduler(self.gateway) 320 self.environment_ttl = self.config.environment_ttl 321 self.pinned_environments = Environment.normalize_names(self.config.pinned_environments) 322 self.auto_categorize_changes = self.config.plan.auto_categorize_changes 323 324 self._connection_config = self.config.get_connection(self.gateway) 325 self.concurrent_tasks = concurrent_tasks or self._connection_config.concurrent_tasks 326 self._engine_adapter = engine_adapter or self._connection_config.create_engine_adapter() 327 328 self._test_connection_config = self.config.get_test_connection( 329 self.gateway, self.default_catalog, default_catalog_dialect=self.engine_adapter.DIALECT 330 ) 331 332 self._snapshot_evaluator: t.Optional[SnapshotEvaluator] = None 333 334 self._provided_state_sync: t.Optional[StateSync] = state_sync 335 self._state_sync: t.Optional[StateSync] = None 336 337 self._loader = (loader or self.config.loader)(**self.config.loader_kwargs) 338 339 # Should we dedupe notification_targets? If so how? 340 self.notification_targets = (notification_targets or []) + self.config.notification_targets 341 self.users = (users or []) + self.config.users 342 self.users = list({user.username: user for user in self.users}.values()) 343 self._register_notification_targets() 344 345 if ( 346 self.config.environment_catalog_mapping 347 and not self.engine_adapter.CATALOG_SUPPORT.is_multi_catalog_supported 348 ): 349 raise SQLMeshError( 350 "Environment catalog mapping is only supported for engine adapters that support multiple catalogs" 351 ) 352 353 if load: 354 self.load()
374 def execution_context( 375 self, deployability_index: t.Optional[DeployabilityIndex] = None 376 ) -> ExecutionContext: 377 """Returns an execution context.""" 378 return ExecutionContext( 379 engine_adapter=self._engine_adapter, 380 snapshots=self.snapshots, 381 deployability_index=deployability_index, 382 default_dialect=self.default_dialect, 383 default_catalog=self.default_catalog, 384 )
Returns an execution context.
386 def upsert_model(self, model: t.Union[str, Model], **kwargs: t.Any) -> Model: 387 """Update or insert a model. 388 389 The context's models dictionary will be updated to include these changes. 390 391 Args: 392 model: Model name or instance to update. 393 kwargs: The kwargs to update the model with. 394 395 Returns: 396 A new instance of the updated or inserted model. 397 """ 398 model = self.get_model(model, raise_if_missing=True) 399 path = model._path 400 401 # model.copy() can't be used here due to a cached state that can be a part of a model instance. 402 model = t.cast(Model, type(model)(**{**t.cast(Model, model).dict(), **kwargs})) 403 model._path = path 404 405 self._models.update({model.fqn: model}) 406 self.dag.add(model.fqn, model.depends_on) 407 update_model_schemas( 408 self.dag, 409 self._models, 410 self.path, 411 ) 412 413 model.validate_definition() 414 415 return model
Update or insert a model.
The context's models dictionary will be updated to include these changes.
Arguments:
- model: Model name or instance to update.
- kwargs: The kwargs to update the model with.
Returns:
A new instance of the updated or inserted model.
417 def scheduler(self, environment: t.Optional[str] = None) -> Scheduler: 418 """Returns the built-in scheduler. 419 420 Args: 421 environment: The target environment to source model snapshots from, or None 422 if snapshots should be sourced from the currently loaded local state. 423 424 Returns: 425 The built-in scheduler instance. 426 """ 427 snapshots: t.Iterable[Snapshot] 428 if environment is not None: 429 stored_environment = self.state_sync.get_environment(environment) 430 if stored_environment is None: 431 raise ConfigError(f"Environment '{environment}' was not found.") 432 snapshots = self.state_sync.get_snapshots(stored_environment.snapshots).values() 433 else: 434 snapshots = self.snapshots.values() 435 436 if not snapshots: 437 raise ConfigError("No models were found") 438 439 return Scheduler( 440 snapshots, 441 self.snapshot_evaluator, 442 self.state_sync, 443 default_catalog=self.default_catalog, 444 max_workers=self.concurrent_tasks, 445 console=self.console, 446 notification_target_manager=self.notification_target_manager, 447 )
Returns the built-in scheduler.
Arguments:
- environment: The target environment to source model snapshots from, or None if snapshots should be sourced from the currently loaded local state.
Returns:
The built-in scheduler instance.
464 def refresh(self) -> None: 465 """Refresh all models that have been updated.""" 466 if self._loader.reload_needed(): 467 self.load()
Refresh all models that have been updated.
469 def load(self, update_schemas: bool = True) -> GenericContext[C]: 470 """Load all files in the context's path.""" 471 with sys_path(*self.configs): 472 gc.disable() 473 project = self._loader.load(self, update_schemas) 474 self._macros = project.macros 475 self._jinja_macros = project.jinja_macros 476 self._models = project.models 477 self._metrics = project.metrics 478 self._standalone_audits.clear() 479 self._audits.clear() 480 for name, audit in project.audits.items(): 481 if isinstance(audit, StandaloneAudit): 482 self._standalone_audits[name] = audit 483 else: 484 self._audits[name] = audit 485 self.dag = project.dag 486 gc.enable() 487 488 duplicates = set(self._models) & set(self._standalone_audits) 489 if duplicates: 490 raise ConfigError( 491 f"Models and Standalone audits cannot have the same name: {duplicates}" 492 ) 493 494 return self
Load all files in the context's path.
496 def run( 497 self, 498 environment: t.Optional[str] = None, 499 *, 500 start: t.Optional[TimeLike] = None, 501 end: t.Optional[TimeLike] = None, 502 execution_time: t.Optional[TimeLike] = None, 503 skip_janitor: bool = False, 504 ignore_cron: bool = False, 505 ) -> bool: 506 """Run the entire dag through the scheduler. 507 508 Args: 509 environment: The target environment to source model snapshots from and virtually update. Default: prod. 510 start: The start of the interval to render. 511 end: The end of the interval to render. 512 execution_time: The date/time time reference to use for execution time. Defaults to now. 513 skip_janitor: Whether to skip the janitor task. 514 ignore_cron: Whether to ignore the model's cron schedule and run all available missing intervals. 515 516 Returns: 517 True if the run was successful, False otherwise. 518 """ 519 environment = environment or self.config.default_target_environment 520 521 self.notification_target_manager.notify( 522 NotificationEvent.RUN_START, environment=environment 523 ) 524 success = False 525 try: 526 success = self._run( 527 environment=environment, 528 start=start, 529 end=end, 530 execution_time=execution_time, 531 skip_janitor=skip_janitor, 532 ignore_cron=ignore_cron, 533 ) 534 except Exception as e: 535 self.notification_target_manager.notify( 536 NotificationEvent.RUN_FAILURE, traceback.format_exc() 537 ) 538 logger.error(f"Run Failure: {traceback.format_exc()}") 539 raise e 540 541 if success: 542 self.notification_target_manager.notify( 543 NotificationEvent.RUN_END, environment=environment 544 ) 545 self.console.log_success(f"Run finished for environment '{environment}'") 546 else: 547 self.notification_target_manager.notify( 548 NotificationEvent.RUN_FAILURE, "See console logs for details." 549 ) 550 551 return success
Run the entire dag through the scheduler.
Arguments:
- environment: The target environment to source model snapshots from and virtually update. Default: prod.
- start: The start of the interval to render.
- end: The end of the interval to render.
- execution_time: The date/time time reference to use for execution time. Defaults to now.
- skip_janitor: Whether to skip the janitor task.
- ignore_cron: Whether to ignore the model's cron schedule and run all available missing intervals.
Returns:
True if the run was successful, False otherwise.
565 def get_model( 566 self, model_or_snapshot: ModelOrSnapshot, raise_if_missing: bool = False 567 ) -> t.Optional[Model]: 568 """Returns a model with the given name or None if a model with such name doesn't exist. 569 570 Args: 571 model_or_snapshot: A model name, model, or snapshot. 572 raise_if_missing: Raises an error if a model is not found. 573 574 Returns: 575 The expected model. 576 """ 577 if isinstance(model_or_snapshot, str): 578 normalized_name = normalize_model_name( 579 model_or_snapshot, 580 dialect=self.default_dialect, 581 default_catalog=self.default_catalog, 582 ) 583 model = self._models.get(normalized_name) 584 elif isinstance(model_or_snapshot, Snapshot): 585 model = model_or_snapshot.model 586 else: 587 model = model_or_snapshot 588 589 if raise_if_missing and not model: 590 raise SQLMeshError(f"Cannot find model for '{model_or_snapshot}'") 591 592 return model
Returns a model with the given name or None if a model with such name doesn't exist.
Arguments:
- model_or_snapshot: A model name, model, or snapshot.
- raise_if_missing: Raises an error if a model is not found.
Returns:
The expected model.
607 def get_snapshot( 608 self, node_or_snapshot: NodeOrSnapshot, raise_if_missing: bool = False 609 ) -> t.Optional[Snapshot]: 610 """Returns a snapshot with the given name or None if a snapshot with such name doesn't exist. 611 612 Args: 613 node_or_snapshot: A node name, node, or snapshot. 614 raise_if_missing: Raises an error if a snapshot is not found. 615 616 Returns: 617 The expected snapshot. 618 """ 619 if isinstance(node_or_snapshot, Snapshot): 620 return node_or_snapshot 621 if isinstance(node_or_snapshot, str) and not self.standalone_audits.get(node_or_snapshot): 622 node_or_snapshot = normalize_model_name( 623 node_or_snapshot, 624 dialect=self.default_dialect, 625 default_catalog=self.default_catalog, 626 ) 627 fqn = node_or_snapshot if isinstance(node_or_snapshot, str) else node_or_snapshot.fqn 628 snapshot = self.snapshots.get(fqn) 629 630 if raise_if_missing and not snapshot: 631 raise SQLMeshError(f"Cannot find snapshot for '{fqn}'") 632 633 return snapshot
Returns a snapshot with the given name or None if a snapshot with such name doesn't exist.
Arguments:
- node_or_snapshot: A node name, node, or snapshot.
- raise_if_missing: Raises an error if a snapshot is not found.
Returns:
The expected snapshot.
Returns all registered standalone audits in this context.
Generates and returns snapshots based on models registered in this context.
If one of the snapshots has been previously stored in the persisted state, the stored instance will be returned.
679 def render( 680 self, 681 model_or_snapshot: ModelOrSnapshot, 682 *, 683 start: t.Optional[TimeLike] = None, 684 end: t.Optional[TimeLike] = None, 685 execution_time: t.Optional[TimeLike] = None, 686 expand: t.Union[bool, t.Iterable[str]] = False, 687 **kwargs: t.Any, 688 ) -> exp.Expression: 689 """Renders a model's query, expanding macros with provided kwargs, and optionally expanding referenced models. 690 691 Args: 692 model_or_snapshot: The model, model name, or snapshot to render. 693 start: The start of the interval to render. 694 end: The end of the interval to render. 695 execution_time: The date/time time reference to use for execution time. Defaults to now. 696 expand: Whether or not to use expand materialized models, defaults to False. 697 If True, all referenced models are expanded as raw queries. 698 If a list, only referenced models are expanded as raw queries. 699 700 Returns: 701 The rendered expression. 702 """ 703 execution_time = execution_time or now_ds() 704 705 model = self.get_model(model_or_snapshot, raise_if_missing=True) 706 707 if expand and not isinstance(expand, bool): 708 expand = { 709 normalize_model_name( 710 x, default_catalog=self.default_catalog, dialect=self.default_dialect 711 ) 712 for x in expand 713 } 714 715 expand = self.dag.upstream(model.fqn) if expand is True else expand or [] 716 717 if model.is_seed: 718 df = next( 719 model.render( 720 context=self.execution_context(), 721 start=start, 722 end=end, 723 execution_time=execution_time, 724 **kwargs, 725 ) 726 ) 727 return next(pandas_to_sql(t.cast(pd.DataFrame, df), model.columns_to_types)) 728 729 return model.render_query_or_raise( 730 start=start, 731 end=end, 732 execution_time=execution_time, 733 snapshots=self.snapshots, 734 expand=expand, 735 engine_adapter=self.engine_adapter, 736 **kwargs, 737 )
Renders a model's query, expanding macros with provided kwargs, and optionally expanding referenced models.
Arguments:
- model_or_snapshot: The model, model name, or snapshot to render.
- start: The start of the interval to render.
- end: The end of the interval to render.
- execution_time: The date/time time reference to use for execution time. Defaults to now.
- expand: Whether or not to use expand materialized models, defaults to False. If True, all referenced models are expanded as raw queries. If a list, only referenced models are expanded as raw queries.
Returns:
The rendered expression.
739 def evaluate( 740 self, 741 model_or_snapshot: ModelOrSnapshot, 742 start: TimeLike, 743 end: TimeLike, 744 execution_time: TimeLike, 745 limit: t.Optional[int] = None, 746 **kwargs: t.Any, 747 ) -> DF: 748 """Evaluate a model or snapshot (running its query against a DB/Engine). 749 750 This method is used to test or iterate on models without side effects. 751 752 Args: 753 model_or_snapshot: The model, model name, or snapshot to render. 754 start: The start of the interval to evaluate. 755 end: The end of the interval to evaluate. 756 execution_time: The date/time time reference to use for execution time. 757 limit: A limit applied to the model. 758 """ 759 snapshot = self.get_snapshot(model_or_snapshot, raise_if_missing=True) 760 761 df = self.snapshot_evaluator.evaluate_and_fetch( 762 snapshot, 763 start=start, 764 end=end, 765 execution_time=execution_time, 766 snapshots=self.snapshots, 767 limit=limit or c.DEFAULT_MAX_LIMIT, 768 ) 769 770 if df is None: 771 raise RuntimeError(f"Error evaluating {snapshot.name}") 772 773 return df
Evaluate a model or snapshot (running its query against a DB/Engine).
This method is used to test or iterate on models without side effects.
Arguments:
- model_or_snapshot: The model, model name, or snapshot to render.
- start: The start of the interval to evaluate.
- end: The end of the interval to evaluate.
- execution_time: The date/time time reference to use for execution time.
- limit: A limit applied to the model.
775 def format( 776 self, 777 transpile: t.Optional[str] = None, 778 append_newline: t.Optional[bool] = None, 779 **kwargs: t.Any, 780 ) -> None: 781 """Format all SQL models.""" 782 for model in self._models.values(): 783 if not model._path.suffix == ".sql": 784 continue 785 with open(model._path, "r+", encoding="utf-8") as file: 786 expressions = parse( 787 file.read(), default_dialect=self.config_for_node(model).dialect 788 ) 789 if transpile: 790 for prop in expressions[0].expressions: 791 if prop.name.lower() == "dialect": 792 prop.replace( 793 exp.Property( 794 this="dialect", 795 value=exp.Literal.string(transpile or model.dialect), 796 ) 797 ) 798 format = self.config_for_node(model).format 799 opts = {**format.generator_options, **kwargs} 800 file.seek(0) 801 file.write( 802 format_model_expressions(expressions, transpile or model.dialect, **opts) 803 ) 804 if append_newline is None: 805 append_newline = format.append_newline 806 if append_newline: 807 file.write("\n") 808 file.truncate()
Format all SQL models.
810 def plan( 811 self, 812 environment: t.Optional[str] = None, 813 *, 814 start: t.Optional[TimeLike] = None, 815 end: t.Optional[TimeLike] = None, 816 execution_time: t.Optional[TimeLike] = None, 817 create_from: t.Optional[str] = None, 818 skip_tests: bool = False, 819 restate_models: t.Optional[t.Iterable[str]] = None, 820 no_gaps: bool = False, 821 skip_backfill: bool = False, 822 forward_only: t.Optional[bool] = None, 823 no_prompts: t.Optional[bool] = None, 824 auto_apply: t.Optional[bool] = None, 825 no_auto_categorization: t.Optional[bool] = None, 826 effective_from: t.Optional[TimeLike] = None, 827 include_unmodified: t.Optional[bool] = None, 828 select_models: t.Optional[t.Collection[str]] = None, 829 backfill_models: t.Optional[t.Collection[str]] = None, 830 categorizer_config: t.Optional[CategorizerConfig] = None, 831 enable_preview: t.Optional[bool] = None, 832 no_diff: t.Optional[bool] = None, 833 run: bool = False, 834 ) -> Plan: 835 """Interactively creates a plan. 836 837 This method compares the current context with the target environment. It then presents 838 the differences and asks whether to backfill each modified model. 839 840 Args: 841 environment: The environment to diff and plan against. 842 start: The start date of the backfill if there is one. 843 end: The end date of the backfill if there is one. 844 execution_time: The date/time reference to use for execution time. Defaults to now. 845 create_from: The environment to create the target environment from if it 846 doesn't exist. If not specified, the "prod" environment will be used. 847 skip_tests: Unit tests are run by default so this will skip them if enabled 848 restate_models: A list of either internal or external models, or tags, that need to be restated 849 for the given plan interval. If the target environment is a production environment, 850 ALL snapshots that depended on these upstream tables will have their intervals deleted 851 (even ones not in this current environment). Only the snapshots in this environment will 852 be backfilled whereas others need to be recovered on a future plan application. For development 853 environments only snapshots that are part of this plan will be affected. 854 no_gaps: Whether to ensure that new snapshots for models that are already a 855 part of the target environment have no data gaps when compared against previous 856 snapshots for same models. 857 skip_backfill: Whether to skip the backfill step. Default: False. 858 forward_only: Whether the purpose of the plan is to make forward only changes. 859 no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that 860 if this flag is set to true and there are uncategorized changes the plan creation will 861 fail. Default: False. 862 auto_apply: Whether to automatically apply the new plan after creation. Default: False. 863 no_auto_categorization: Indicates whether to disable automatic categorization of model 864 changes (breaking / non-breaking). If not provided, then the corresponding configuration 865 option determines the behavior. 866 categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the 867 project config by default. 868 effective_from: The effective date from which to apply forward-only changes on production. 869 include_unmodified: Indicates whether to include unmodified models in the target development environment. 870 select_models: A list of model selection strings to filter the models that should be included into this plan. 871 backfill_models: A list of model selection strings to filter the models for which the data should be backfilled. 872 enable_preview: Indicates whether to enable preview for forward-only models in development environments. 873 no_diff: Hide text differences for changed models. 874 run: Whether to run latest intervals as part of the plan application. 875 876 Returns: 877 The populated Plan object. 878 """ 879 plan_builder = self.plan_builder( 880 environment, 881 start=start, 882 end=end, 883 execution_time=execution_time, 884 create_from=create_from, 885 skip_tests=skip_tests, 886 restate_models=restate_models, 887 no_gaps=no_gaps, 888 skip_backfill=skip_backfill, 889 forward_only=forward_only, 890 no_auto_categorization=no_auto_categorization, 891 effective_from=effective_from, 892 include_unmodified=include_unmodified, 893 select_models=select_models, 894 backfill_models=backfill_models, 895 categorizer_config=categorizer_config, 896 enable_preview=enable_preview, 897 run=run, 898 ) 899 900 self.console.plan( 901 plan_builder, 902 auto_apply if auto_apply is not None else self.config.plan.auto_apply, 903 self.default_catalog, 904 no_diff=no_diff if no_diff is not None else self.config.plan.no_diff, 905 no_prompts=no_prompts if no_prompts is not None else self.config.plan.no_prompts, 906 ) 907 908 return plan_builder.build()
Interactively creates a plan.
This method compares the current context with the target environment. It then presents the differences and asks whether to backfill each modified model.
Arguments:
- environment: The environment to diff and plan against.
- start: The start date of the backfill if there is one.
- end: The end date of the backfill if there is one.
- execution_time: The date/time reference to use for execution time. Defaults to now.
- create_from: The environment to create the target environment from if it doesn't exist. If not specified, the "prod" environment will be used.
- skip_tests: Unit tests are run by default so this will skip them if enabled
- restate_models: A list of either internal or external models, or tags, that need to be restated for the given plan interval. If the target environment is a production environment, ALL snapshots that depended on these upstream tables will have their intervals deleted (even ones not in this current environment). Only the snapshots in this environment will be backfilled whereas others need to be recovered on a future plan application. For development environments only snapshots that are part of this plan will be affected.
- no_gaps: Whether to ensure that new snapshots for models that are already a part of the target environment have no data gaps when compared against previous snapshots for same models.
- skip_backfill: Whether to skip the backfill step. Default: False.
- forward_only: Whether the purpose of the plan is to make forward only changes.
- no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that if this flag is set to true and there are uncategorized changes the plan creation will fail. Default: False.
- auto_apply: Whether to automatically apply the new plan after creation. Default: False.
- no_auto_categorization: Indicates whether to disable automatic categorization of model changes (breaking / non-breaking). If not provided, then the corresponding configuration option determines the behavior.
- categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the project config by default.
- effective_from: The effective date from which to apply forward-only changes on production.
- include_unmodified: Indicates whether to include unmodified models in the target development environment.
- select_models: A list of model selection strings to filter the models that should be included into this plan.
- backfill_models: A list of model selection strings to filter the models for which the data should be backfilled.
- enable_preview: Indicates whether to enable preview for forward-only models in development environments.
- no_diff: Hide text differences for changed models.
- run: Whether to run latest intervals as part of the plan application.
Returns:
The populated Plan object.
910 def plan_builder( 911 self, 912 environment: t.Optional[str] = None, 913 *, 914 start: t.Optional[TimeLike] = None, 915 end: t.Optional[TimeLike] = None, 916 execution_time: t.Optional[TimeLike] = None, 917 create_from: t.Optional[str] = None, 918 skip_tests: bool = False, 919 restate_models: t.Optional[t.Iterable[str]] = None, 920 no_gaps: bool = False, 921 skip_backfill: bool = False, 922 forward_only: t.Optional[bool] = None, 923 no_auto_categorization: t.Optional[bool] = None, 924 effective_from: t.Optional[TimeLike] = None, 925 include_unmodified: t.Optional[bool] = None, 926 select_models: t.Optional[t.Collection[str]] = None, 927 backfill_models: t.Optional[t.Collection[str]] = None, 928 categorizer_config: t.Optional[CategorizerConfig] = None, 929 enable_preview: t.Optional[bool] = None, 930 run: bool = False, 931 ) -> PlanBuilder: 932 """Creates a plan builder. 933 934 Args: 935 environment: The environment to diff and plan against. 936 start: The start date of the backfill if there is one. 937 end: The end date of the backfill if there is one. 938 execution_time: The date/time reference to use for execution time. Defaults to now. 939 create_from: The environment to create the target environment from if it 940 doesn't exist. If not specified, the "prod" environment will be used. 941 skip_tests: Unit tests are run by default so this will skip them if enabled 942 restate_models: A list of either internal or external models, or tags, that need to be restated 943 for the given plan interval. If the target environment is a production environment, 944 ALL snapshots that depended on these upstream tables will have their intervals deleted 945 (even ones not in this current environment). Only the snapshots in this environment will 946 be backfilled whereas others need to be recovered on a future plan application. For development 947 environments only snapshots that are part of this plan will be affected. 948 no_gaps: Whether to ensure that new snapshots for models that are already a 949 part of the target environment have no data gaps when compared against previous 950 snapshots for same models. 951 skip_backfill: Whether to skip the backfill step. Default: False. 952 forward_only: Whether the purpose of the plan is to make forward only changes. 953 no_auto_categorization: Indicates whether to disable automatic categorization of model 954 changes (breaking / non-breaking). If not provided, then the corresponding configuration 955 option determines the behavior. 956 categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the 957 project config by default. 958 effective_from: The effective date from which to apply forward-only changes on production. 959 include_unmodified: Indicates whether to include unmodified models in the target development environment. 960 select_models: A list of model selection strings to filter the models that should be included into this plan. 961 backfill_models: A list of model selection strings to filter the models for which the data should be backfilled. 962 enable_preview: Indicates whether to enable preview for forward-only models in development environments. 963 run: Whether to run latest intervals as part of the plan application. 964 965 Returns: 966 The plan builder. 967 """ 968 environment = environment or self.config.default_target_environment 969 environment = Environment.normalize_name(environment) 970 is_dev = environment != c.PROD 971 972 if skip_backfill and not no_gaps and not is_dev: 973 raise ConfigError( 974 "When targeting the production environment either the backfill should not be skipped or the lack of data gaps should be enforced (--no-gaps flag)." 975 ) 976 977 if run and is_dev: 978 raise ConfigError("The '--run' flag is only supported for the production environment.") 979 980 self._run_plan_tests(skip_tests=skip_tests) 981 982 environment_ttl = ( 983 self.environment_ttl if environment not in self.pinned_environments else None 984 ) 985 986 model_selector = self._new_selector() 987 988 if backfill_models: 989 backfill_models = model_selector.expand_model_selections(backfill_models) 990 else: 991 backfill_models = None 992 993 models_override: t.Optional[UniqueKeyDict[str, Model]] = None 994 if select_models: 995 models_override = model_selector.select_models( 996 select_models, 997 environment, 998 fallback_env_name=create_from or c.PROD, 999 ensure_finalized_snapshots=self.config.plan.use_finalized_state, 1000 ) 1001 if not backfill_models: 1002 # Only backfill selected models unless explicitly specified. 1003 backfill_models = model_selector.expand_model_selections(select_models) 1004 1005 expanded_restate_models = None 1006 if restate_models is not None: 1007 expanded_restate_models = model_selector.expand_model_selections(restate_models) 1008 if not expanded_restate_models: 1009 self.console.log_error( 1010 f"Provided restated models do not match any models. No models will be included in plan. Provided: {', '.join(restate_models)}" 1011 ) 1012 1013 snapshots = self._snapshots(models_override) 1014 context_diff = self._context_diff( 1015 environment or c.PROD, 1016 snapshots=snapshots, 1017 create_from=create_from, 1018 force_no_diff=(restate_models is not None and not expanded_restate_models) 1019 or (backfill_models is not None and not backfill_models), 1020 ensure_finalized_snapshots=self.config.plan.use_finalized_state, 1021 ) 1022 1023 # If no end date is specified, use the max interval end from prod 1024 # to prevent unintended evaluation of the entire DAG. 1025 if not run: 1026 if backfill_models is not None: 1027 # Only consider selected models for the default end value. 1028 models_for_default_end = backfill_models.copy() 1029 for name in backfill_models: 1030 if name not in snapshots: 1031 continue 1032 snapshot = snapshots[name] 1033 snapshot_id = snapshot.snapshot_id 1034 if ( 1035 snapshot_id in context_diff.added 1036 and snapshot_id in context_diff.new_snapshots 1037 ): 1038 # If the selected model is a newly added model, then we should narrow down the intervals 1039 # that should be considered for the default plan end value by including its parents. 1040 models_for_default_end |= {s.name for s in snapshot.parents} 1041 default_end = self.state_sync.greatest_common_interval_end( 1042 c.PROD, 1043 models_for_default_end, 1044 ensure_finalized_snapshots=self.config.plan.use_finalized_state, 1045 ) 1046 else: 1047 default_end = self.state_sync.max_interval_end_for_environment( 1048 c.PROD, ensure_finalized_snapshots=self.config.plan.use_finalized_state 1049 ) 1050 else: 1051 default_end = None 1052 1053 default_start = to_date(default_end) - timedelta(days=1) if default_end and is_dev else None 1054 1055 return PlanBuilder( 1056 context_diff=context_diff, 1057 start=start, 1058 end=end, 1059 execution_time=execution_time, 1060 apply=self.apply, 1061 restate_models=expanded_restate_models, 1062 backfill_models=backfill_models, 1063 no_gaps=no_gaps, 1064 skip_backfill=skip_backfill, 1065 is_dev=is_dev, 1066 forward_only=( 1067 forward_only if forward_only is not None else self.config.plan.forward_only 1068 ), 1069 environment_ttl=environment_ttl, 1070 environment_suffix_target=self.config.environment_suffix_target, 1071 environment_catalog_mapping=self.config.environment_catalog_mapping, 1072 categorizer_config=categorizer_config or self.auto_categorize_changes, 1073 auto_categorization_enabled=not no_auto_categorization, 1074 effective_from=effective_from, 1075 include_unmodified=( 1076 include_unmodified 1077 if include_unmodified is not None 1078 else self.config.plan.include_unmodified 1079 ), 1080 default_start=default_start, 1081 default_end=default_end, 1082 enable_preview=( 1083 enable_preview if enable_preview is not None else self.config.plan.enable_preview 1084 ), 1085 end_bounded=not run, 1086 ensure_finalized_snapshots=self.config.plan.use_finalized_state, 1087 )
Creates a plan builder.
Arguments:
- environment: The environment to diff and plan against.
- start: The start date of the backfill if there is one.
- end: The end date of the backfill if there is one.
- execution_time: The date/time reference to use for execution time. Defaults to now.
- create_from: The environment to create the target environment from if it doesn't exist. If not specified, the "prod" environment will be used.
- skip_tests: Unit tests are run by default so this will skip them if enabled
- restate_models: A list of either internal or external models, or tags, that need to be restated for the given plan interval. If the target environment is a production environment, ALL snapshots that depended on these upstream tables will have their intervals deleted (even ones not in this current environment). Only the snapshots in this environment will be backfilled whereas others need to be recovered on a future plan application. For development environments only snapshots that are part of this plan will be affected.
- no_gaps: Whether to ensure that new snapshots for models that are already a part of the target environment have no data gaps when compared against previous snapshots for same models.
- skip_backfill: Whether to skip the backfill step. Default: False.
- forward_only: Whether the purpose of the plan is to make forward only changes.
- no_auto_categorization: Indicates whether to disable automatic categorization of model changes (breaking / non-breaking). If not provided, then the corresponding configuration option determines the behavior.
- categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the project config by default.
- effective_from: The effective date from which to apply forward-only changes on production.
- include_unmodified: Indicates whether to include unmodified models in the target development environment.
- select_models: A list of model selection strings to filter the models that should be included into this plan.
- backfill_models: A list of model selection strings to filter the models for which the data should be backfilled.
- enable_preview: Indicates whether to enable preview for forward-only models in development environments.
- run: Whether to run latest intervals as part of the plan application.
Returns:
The plan builder.
1089 def apply( 1090 self, 1091 plan: Plan, 1092 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 1093 ) -> None: 1094 """Applies a plan by pushing snapshots and backfilling data. 1095 1096 Given a plan, it pushes snapshots into the state sync and then uses the scheduler 1097 to backfill all models. 1098 1099 Args: 1100 plan: The plan to apply. 1101 circuit_breaker: An optional handler which checks if the apply should be aborted. 1102 """ 1103 if ( 1104 not plan.context_diff.has_changes 1105 and not plan.requires_backfill 1106 and not plan.has_unmodified_unpromoted 1107 ): 1108 return 1109 if plan.uncategorized: 1110 raise UncategorizedPlanError("Can't apply a plan with uncategorized changes.") 1111 self.notification_target_manager.notify( 1112 NotificationEvent.APPLY_START, 1113 environment=plan.environment_naming_info.name, 1114 plan_id=plan.plan_id, 1115 ) 1116 try: 1117 self._apply(plan, circuit_breaker) 1118 except Exception as e: 1119 self.notification_target_manager.notify( 1120 NotificationEvent.APPLY_FAILURE, 1121 environment=plan.environment_naming_info.name, 1122 plan_id=plan.plan_id, 1123 exc=traceback.format_exc(), 1124 ) 1125 logger.error(f"Apply Failure: {traceback.format_exc()}") 1126 raise e 1127 self.notification_target_manager.notify( 1128 NotificationEvent.APPLY_END, 1129 environment=plan.environment_naming_info.name, 1130 plan_id=plan.plan_id, 1131 )
Applies a plan by pushing snapshots and backfilling data.
Given a plan, it pushes snapshots into the state sync and then uses the scheduler to backfill all models.
Arguments:
- plan: The plan to apply.
- circuit_breaker: An optional handler which checks if the apply should be aborted.
1133 def invalidate_environment(self, name: str, sync: bool = False) -> None: 1134 """Invalidates the target environment by setting its expiration timestamp to now. 1135 1136 Args: 1137 name: The name of the environment to invalidate. 1138 sync: If True, the call blocks until the environment is deleted. Otherwise, the environment will 1139 be deleted asynchronously by the janitor process. 1140 """ 1141 self.state_sync.invalidate_environment(name) 1142 if sync: 1143 self._cleanup_environments() 1144 self.console.log_success(f"Environment '{name}' has been deleted.") 1145 else: 1146 self.console.log_success(f"Environment '{name}' has been invalidated.")
Invalidates the target environment by setting its expiration timestamp to now.
Arguments:
- name: The name of the environment to invalidate.
- sync: If True, the call blocks until the environment is deleted. Otherwise, the environment will be deleted asynchronously by the janitor process.
1148 def diff(self, environment: t.Optional[str] = None, detailed: bool = False) -> bool: 1149 """Show a diff of the current context with a given environment. 1150 1151 Args: 1152 environment: The environment to diff against. 1153 detailed: Show the actual SQL differences if True. 1154 1155 Returns: 1156 True if there are changes, False otherwise. 1157 """ 1158 environment = environment or self.config.default_target_environment 1159 environment = Environment.normalize_name(environment) 1160 context_diff = self._context_diff(environment) 1161 self.console.show_model_difference_summary( 1162 context_diff, 1163 EnvironmentNamingInfo.from_environment_catalog_mapping( 1164 self.config.environment_catalog_mapping, 1165 name=environment, 1166 suffix_target=self.config.environment_suffix_target, 1167 ), 1168 self.default_catalog, 1169 no_diff=not detailed, 1170 ) 1171 return context_diff.has_changes
Show a diff of the current context with a given environment.
Arguments:
- environment: The environment to diff against.
- detailed: Show the actual SQL differences if True.
Returns:
True if there are changes, False otherwise.
1173 def table_diff( 1174 self, 1175 source: str, 1176 target: str, 1177 on: t.List[str] | exp.Condition | None = None, 1178 model_or_snapshot: t.Optional[ModelOrSnapshot] = None, 1179 where: t.Optional[str | exp.Condition] = None, 1180 limit: int = 20, 1181 show: bool = True, 1182 show_sample: bool = True, 1183 ) -> TableDiff: 1184 """Show a diff between two tables. 1185 1186 Args: 1187 source: The source environment or table. 1188 target: The target environment or table. 1189 on: The join condition, table aliases must be "s" and "t" for source and target. 1190 If omitted, the table's grain will be used. 1191 model_or_snapshot: The model or snapshot to use when environments are passed in. 1192 where: An optional where statement to filter results. 1193 limit: The limit of the sample dataframe. 1194 show: Show the table diff output in the console. 1195 show_sample: Show the sample dataframe in the console. Requires show=True. 1196 1197 Returns: 1198 The TableDiff object containing schema and summary differences. 1199 """ 1200 source_alias, target_alias = source, target 1201 if model_or_snapshot: 1202 model = self.get_model(model_or_snapshot, raise_if_missing=True) 1203 source_env = self.state_reader.get_environment(source) 1204 target_env = self.state_reader.get_environment(target) 1205 1206 if not source_env: 1207 raise SQLMeshError(f"Could not find environment '{source}'") 1208 if not target_env: 1209 raise SQLMeshError(f"Could not find environment '{target}')") 1210 1211 source = next( 1212 snapshot for snapshot in source_env.snapshots if snapshot.name == model.fqn 1213 ).table_name() 1214 target = next( 1215 snapshot for snapshot in target_env.snapshots if snapshot.name == model.fqn 1216 ).table_name() 1217 source_alias = source_env.name 1218 target_alias = target_env.name 1219 1220 if not on: 1221 for ref in model.all_references: 1222 if ref.unique: 1223 on = ref.columns 1224 1225 if not on: 1226 raise SQLMeshError( 1227 "SQLMesh doesn't know how to join the two tables. Specify the `grains` in each model definition or pass join column names in separate `-o` flags." 1228 ) 1229 1230 table_diff = TableDiff( 1231 adapter=self._engine_adapter, 1232 source=source, 1233 target=target, 1234 on=on, 1235 where=where, 1236 source_alias=source_alias, 1237 target_alias=target_alias, 1238 model_name=model.name if model_or_snapshot else None, 1239 limit=limit, 1240 ) 1241 if show: 1242 self.console.show_schema_diff(table_diff.schema_diff()) 1243 self.console.show_row_diff(table_diff.row_diff(), show_sample=show_sample) 1244 return table_diff
Show a diff between two tables.
Arguments:
- source: The source environment or table.
- target: The target environment or table.
- on: The join condition, table aliases must be "s" and "t" for source and target. If omitted, the table's grain will be used.
- model_or_snapshot: The model or snapshot to use when environments are passed in.
- where: An optional where statement to filter results.
- limit: The limit of the sample dataframe.
- show: Show the table diff output in the console.
- show_sample: Show the sample dataframe in the console. Requires show=True.
Returns:
The TableDiff object containing schema and summary differences.
1246 def get_dag( 1247 self, select_models: t.Optional[t.Collection[str]] = None, **options: t.Any 1248 ) -> GraphHTML: 1249 """Gets an HTML object representation of the DAG. 1250 1251 Args: 1252 select_models: A list of model selection strings that should be included in the dag. 1253 Returns: 1254 An html object that renders the dag. 1255 """ 1256 dag = ( 1257 self.dag.prune(*self._new_selector().expand_model_selections(select_models)) 1258 if select_models 1259 else self.dag 1260 ) 1261 1262 nodes = {} 1263 edges: t.List[t.Dict] = [] 1264 1265 for node, deps in dag.graph.items(): 1266 nodes[node] = { 1267 "id": node, 1268 "label": node.split(".")[-1], 1269 "title": f"<span>{node}</span>", 1270 } 1271 edges.extend({"from": d, "to": node} for d in deps) 1272 1273 return GraphHTML( 1274 nodes, 1275 edges, 1276 options={ 1277 "height": "100%", 1278 "width": "100%", 1279 "interaction": {}, 1280 "layout": { 1281 "hierarchical": { 1282 "enabled": True, 1283 "nodeSpacing": 200, 1284 "sortMethod": "directed", 1285 }, 1286 }, 1287 "nodes": { 1288 "shape": "box", 1289 }, 1290 **options, 1291 }, 1292 )
Gets an HTML object representation of the DAG.
Arguments:
- select_models: A list of model selection strings that should be included in the dag.
Returns:
An html object that renders the dag.
1294 def render_dag(self, path: str, select_models: t.Optional[t.Collection[str]] = None) -> None: 1295 """Render the dag as HTML and save it to a file. 1296 1297 Args: 1298 path: filename to save the dag html to 1299 select_models: A list of model selection strings that should be included in the dag. 1300 """ 1301 file_path = Path(path) 1302 suffix = file_path.suffix 1303 if suffix != ".html": 1304 if suffix: 1305 logger.warning( 1306 f"The extension {suffix} does not designate an html file. A file with a `.html` extension will be created instead." 1307 ) 1308 path = str(file_path.with_suffix(".html")) 1309 1310 with open(path, "w", encoding="utf-8") as file: 1311 file.write(str(self.get_dag(select_models)))
Render the dag as HTML and save it to a file.
Arguments:
- path: filename to save the dag html to
- select_models: A list of model selection strings that should be included in the dag.
1313 def create_test( 1314 self, 1315 model: str, 1316 input_queries: t.Dict[str, str], 1317 overwrite: bool = False, 1318 variables: t.Optional[t.Dict[str, str]] = None, 1319 path: t.Optional[str] = None, 1320 name: t.Optional[str] = None, 1321 include_ctes: bool = False, 1322 ) -> None: 1323 """Generate a unit test fixture for a given model. 1324 1325 Args: 1326 model: The model to test. 1327 input_queries: Mapping of model names to queries. Each model included in this mapping 1328 will be populated in the test based on the results of the corresponding query. 1329 overwrite: Whether to overwrite the existing test in case of a file path collision. 1330 When set to False, an error will be raised if there is such a collision. 1331 variables: Key-value pairs that will define variables needed by the model. 1332 path: The file path corresponding to the fixture, relative to the test directory. 1333 By default, the fixture will be created under the test directory and the file name 1334 will be inferred from the test's name. 1335 name: The name of the test. This is inferred from the model name by default. 1336 include_ctes: When true, CTE fixtures will also be generated. 1337 """ 1338 input_queries = { 1339 # The get_model here has two purposes: return normalized names & check for missing deps 1340 self.get_model(dep, raise_if_missing=True).fqn: query 1341 for dep, query in input_queries.items() 1342 } 1343 1344 try: 1345 test_adapter = self._test_connection_config.create_engine_adapter( 1346 register_comments_override=False 1347 ) 1348 generate_test( 1349 model=self.get_model(model, raise_if_missing=True), 1350 input_queries=input_queries, 1351 models=self._models, 1352 engine_adapter=self._engine_adapter, 1353 test_engine_adapter=test_adapter, 1354 project_path=self.path, 1355 overwrite=overwrite, 1356 variables=variables, 1357 path=path, 1358 name=name, 1359 include_ctes=include_ctes, 1360 ) 1361 finally: 1362 test_adapter.close()
Generate a unit test fixture for a given model.
Arguments:
- model: The model to test.
- input_queries: Mapping of model names to queries. Each model included in this mapping will be populated in the test based on the results of the corresponding query.
- overwrite: Whether to overwrite the existing test in case of a file path collision. When set to False, an error will be raised if there is such a collision.
- variables: Key-value pairs that will define variables needed by the model.
- path: The file path corresponding to the fixture, relative to the test directory. By default, the fixture will be created under the test directory and the file name will be inferred from the test's name.
- name: The name of the test. This is inferred from the model name by default.
- include_ctes: When true, CTE fixtures will also be generated.
1364 def test( 1365 self, 1366 match_patterns: t.Optional[t.List[str]] = None, 1367 tests: t.Optional[t.List[str]] = None, 1368 verbose: bool = False, 1369 preserve_fixtures: bool = False, 1370 stream: t.Optional[t.TextIO] = None, 1371 ) -> ModelTextTestResult: 1372 """Discover and run model tests""" 1373 if verbose: 1374 pd.set_option("display.max_columns", None) 1375 verbosity = 2 1376 else: 1377 verbosity = 1 1378 1379 if tests: 1380 result = run_model_tests( 1381 tests=tests, 1382 models=self._models, 1383 config=self.config, 1384 gateway=self.gateway, 1385 dialect=self.default_dialect, 1386 verbosity=verbosity, 1387 patterns=match_patterns, 1388 preserve_fixtures=preserve_fixtures, 1389 stream=stream, 1390 default_catalog=self.default_catalog, 1391 default_catalog_dialect=self.engine_adapter.DIALECT, 1392 ) 1393 else: 1394 test_meta = [] 1395 1396 for path, config in self.configs.items(): 1397 test_meta.extend( 1398 get_all_model_tests( 1399 path / c.TESTS, 1400 patterns=match_patterns, 1401 ignore_patterns=config.ignore_patterns, 1402 ) 1403 ) 1404 1405 result = run_tests( 1406 model_test_metadata=test_meta, 1407 models=self._models, 1408 config=self.config, 1409 gateway=self.gateway, 1410 dialect=self.default_dialect, 1411 verbosity=verbosity, 1412 preserve_fixtures=preserve_fixtures, 1413 stream=stream, 1414 default_catalog=self.default_catalog, 1415 default_catalog_dialect=self.engine_adapter.DIALECT, 1416 ) 1417 1418 return result
Discover and run model tests
1420 def audit( 1421 self, 1422 start: TimeLike, 1423 end: TimeLike, 1424 *, 1425 models: t.Optional[t.Iterator[str]] = None, 1426 execution_time: t.Optional[TimeLike] = None, 1427 ) -> None: 1428 """Audit models. 1429 1430 Args: 1431 start: The start of the interval to audit. 1432 end: The end of the interval to audit. 1433 models: The models to audit. All models will be audited if not specified. 1434 execution_time: The date/time time reference to use for execution time. Defaults to now. 1435 """ 1436 1437 snapshots = ( 1438 [self.get_snapshot(model, raise_if_missing=True) for model in models] 1439 if models 1440 else self.snapshots.values() 1441 ) 1442 1443 num_audits = sum(len(snapshot.audits_with_args) for snapshot in snapshots) 1444 self.console.log_status_update(f"Found {num_audits} audit(s).") 1445 errors = [] 1446 skipped_count = 0 1447 for snapshot in snapshots: 1448 for audit_result in self.snapshot_evaluator.audit( 1449 snapshot=snapshot, 1450 start=start, 1451 end=end, 1452 snapshots=self.snapshots, 1453 raise_exception=False, 1454 ): 1455 audit_id = f"{audit_result.audit.name}" 1456 if audit_result.model: 1457 audit_id += f" on model {audit_result.model.name}" 1458 1459 if audit_result.skipped: 1460 self.console.log_status_update(f"{audit_id} ⏸️ SKIPPED.") 1461 skipped_count += 1 1462 elif audit_result.count: 1463 errors.append(audit_result) 1464 self.console.log_status_update( 1465 f"{audit_id} ❌ [red]FAIL [{audit_result.count}][/red]." 1466 ) 1467 else: 1468 self.console.log_status_update(f"{audit_id} ✅ [green]PASS[/green].") 1469 1470 self.console.log_status_update( 1471 f"\nFinished with {len(errors)} audit error{'' if len(errors) == 1 else 's'} " 1472 f"and {skipped_count} audit{'' if skipped_count == 1 else 's'} skipped." 1473 ) 1474 for error in errors: 1475 self.console.log_status_update( 1476 f"\nFailure in audit {error.audit.name} ({error.audit._path})." 1477 ) 1478 self.console.log_status_update(f"Got {error.count} results, expected 0.") 1479 if error.query: 1480 self.console.show_sql( 1481 f"{error.query.sql(dialect=self.snapshot_evaluator.adapter.dialect)}" 1482 ) 1483 1484 self.console.log_status_update("Done.")
Audit models.
Arguments:
- start: The start of the interval to audit.
- end: The end of the interval to audit.
- models: The models to audit. All models will be audited if not specified.
- execution_time: The date/time time reference to use for execution time. Defaults to now.
1486 def rewrite(self, sql: str, dialect: str = "") -> exp.Expression: 1487 """Rewrite a sql expression with semantic references into an executable query. 1488 1489 https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/ 1490 1491 Args: 1492 sql: The sql string to rewrite. 1493 dialect: The dialect of the sql string, defaults to the project dialect. 1494 1495 Returns: 1496 A SQLGlot expression with semantic references expanded. 1497 """ 1498 return rewrite( 1499 sql, 1500 graph=ReferenceGraph(self.models.values()), 1501 metrics=self._metrics, 1502 dialect=dialect or self.default_dialect, 1503 )
Rewrite a sql expression with semantic references into an executable query.
https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/
Arguments:
- sql: The sql string to rewrite.
- dialect: The dialect of the sql string, defaults to the project dialect.
Returns:
A SQLGlot expression with semantic references expanded.
1505 def migrate(self) -> None: 1506 """Migrates SQLMesh to the current running version. 1507 1508 Please contact your SQLMesh administrator before doing this. 1509 """ 1510 self.notification_target_manager.notify(NotificationEvent.MIGRATION_START) 1511 try: 1512 self._new_state_sync().migrate( 1513 default_catalog=self.default_catalog, 1514 promoted_snapshots_only=self.config.migration.promoted_snapshots_only, 1515 ) 1516 except Exception as e: 1517 self.notification_target_manager.notify( 1518 NotificationEvent.MIGRATION_FAILURE, traceback.format_exc() 1519 ) 1520 raise e 1521 self.notification_target_manager.notify(NotificationEvent.MIGRATION_END)
Migrates SQLMesh to the current running version.
Please contact your SQLMesh administrator before doing this.
1523 def rollback(self) -> None: 1524 """Rolls back SQLMesh to the previous migration. 1525 1526 Please contact your SQLMesh administrator before doing this. This action cannot be undone. 1527 """ 1528 self._new_state_sync().rollback()
Rolls back SQLMesh to the previous migration.
Please contact your SQLMesh administrator before doing this. This action cannot be undone.
1530 def create_external_models(self) -> None: 1531 """Create a schema file with all external models. 1532 1533 The schema file contains all columns and types of external models, allowing for more robust 1534 lineage, validation, and optimizations. 1535 """ 1536 if not self._models: 1537 self.load(update_schemas=False) 1538 1539 for path, config in self.configs.items(): 1540 create_schema_file( 1541 path=path / c.SCHEMA_YAML, 1542 models=UniqueKeyDict( 1543 "models", 1544 { 1545 fqn: model 1546 for fqn, model in self._models.items() 1547 if self.config_for_node(model) is config 1548 }, 1549 ), 1550 adapter=self._engine_adapter, 1551 state_reader=self.state_reader, 1552 dialect=config.model_defaults.dialect, 1553 max_workers=self.concurrent_tasks, 1554 )
Create a schema file with all external models.
The schema file contains all columns and types of external models, allowing for more robust lineage, validation, and optimizations.
1556 def print_info(self) -> None: 1557 """Prints information about connections, models, macros, etc. to the console.""" 1558 self.console.log_status_update(f"Models: {len(self.models)}") 1559 self.console.log_status_update(f"Macros: {len(self._macros) - len(macro.get_registry())}") 1560 1561 self._try_connection("data warehouse", self._engine_adapter) 1562 1563 state_connection = self.config.get_state_connection(self.gateway) 1564 if state_connection: 1565 self._try_connection("state backend", state_connection.create_engine_adapter())
Prints information about connections, models, macros, etc. to the console.
1567 def close(self) -> None: 1568 """Releases all resources allocated by this context.""" 1569 self.snapshot_evaluator.close() 1570 self.state_sync.close()
Releases all resources allocated by this context.
1646 def table_name(self, model_name: str, dev: bool) -> str: 1647 """Returns the name of the pysical table for the given model name. 1648 1649 Args: 1650 model_name: The name of the model. 1651 dev: Whether to use the deployability index for the table name. 1652 1653 Returns: 1654 The name of the physical table. 1655 """ 1656 deployability_index = ( 1657 DeployabilityIndex.create(self.snapshots.values()) 1658 if dev 1659 else DeployabilityIndex.all_deployable() 1660 ) 1661 snapshot = self.get_snapshot(model_name) 1662 if not snapshot: 1663 raise SQLMeshError(f"Model '{model_name}' was not found.") 1664 return snapshot.table_name(is_deployable=deployability_index.is_deployable(snapshot))
Returns the name of the pysical table for the given model name.
Arguments:
- model_name: The name of the model.
- dev: Whether to use the deployability index for the table name.
Returns:
The name of the physical table.
Inherited Members
Encapsulates a SQLMesh environment supplying convenient functions to perform various tasks.
Arguments:
- engine_adapter: The default engine adapter to use.
- notification_targets: The notification target to use. Defaults to what is defined in config.
- paths: The directories containing SQLMesh files.
- config: A Config object or the name of a Config object in config.py.
- connection: The name of the connection. If not specified the first connection as it appears in configuration will be used.
- test_connection: The name of the connection to use for tests. If not specified the first connection as it appears in configuration will be used.
- concurrent_tasks: The maximum number of tasks that can use the connection concurrently.
- load: Whether or not to automatically load all models and macros (default True).
- console: The rich instance used for printing out CLI command results.
- users: A list of users to make known to SQLMesh.
- config_type: The type of config object to use (default Config).
Inherited Members
- GenericContext
- GenericContext
- default_dialect
- engine_adapter
- execution_context
- upsert_model
- scheduler
- refresh
- load
- run
- get_model
- get_snapshot
- config_for_path
- config_for_node
- models
- metrics
- standalone_audits
- snapshots
- render
- evaluate
- format
- plan
- plan_builder
- apply
- invalidate_environment
- diff
- table_diff
- get_dag
- render_dag
- create_test
- test
- audit
- rewrite
- migrate
- rollback
- create_external_models
- print_info
- close
- table_name
- clear_caches