Context
A SQLMesh context encapsulates a SQLMesh environment. When you create a new context, it will discover and
load your project's models, macros, and audits. Afterwards, you can use the context to create and apply
plans, visualize your model's lineage, run your audits and model tests, and perform various other tasks.
For more information regarding what a context can do, see sqlmesh.core.context.Context
.
Examples:
Creating and applying a plan against the staging environment.
from sqlmesh.core.context import Context
context = Context(path="example", config="local_config")
plan = context.plan("staging")
context.apply(plan)
Running audits on your data.
from sqlmesh.core.context import Context
context = Context(path="example", config="local_config")
context.audit("yesterday", "now")
Running tests on your models.
from sqlmesh.core.context import Context
context = Context(path="example")
context.test()
1""" 2# Context 3 4A SQLMesh context encapsulates a SQLMesh environment. When you create a new context, it will discover and 5load your project's models, macros, and audits. Afterwards, you can use the context to create and apply 6plans, visualize your model's lineage, run your audits and model tests, and perform various other tasks. 7For more information regarding what a context can do, see `sqlmesh.core.context.Context`. 8 9# Examples: 10 11Creating and applying a plan against the staging environment. 12```python 13from sqlmesh.core.context import Context 14context = Context(path="example", config="local_config") 15plan = context.plan("staging") 16context.apply(plan) 17``` 18 19Running audits on your data. 20```python 21from sqlmesh.core.context import Context 22context = Context(path="example", config="local_config") 23context.audit("yesterday", "now") 24``` 25 26Running tests on your models. 27```python 28from sqlmesh.core.context import Context 29context = Context(path="example") 30context.test() 31``` 32""" 33 34from __future__ import annotations 35 36import abc 37import collections 38import gc 39import logging 40import time 41import traceback 42import typing as t 43import unittest.result 44from datetime import timedelta 45from functools import cached_property 46from io import StringIO 47from pathlib import Path 48from shutil import rmtree 49from types import MappingProxyType 50 51import pandas as pd 52from sqlglot import exp 53from sqlglot.lineage import GraphHTML 54 55from sqlmesh.core import constants as c 56from sqlmesh.core.audit import Audit, StandaloneAudit 57from sqlmesh.core.config import CategorizerConfig, Config, load_configs 58from sqlmesh.core.config.loader import C 59from sqlmesh.core.console import Console, get_console 60from sqlmesh.core.context_diff import ContextDiff 61from sqlmesh.core.dialect import ( 62 format_model_expressions, 63 normalize_model_name, 64 pandas_to_sql, 65 parse, 66 parse_one, 67) 68from sqlmesh.core.engine_adapter import EngineAdapter 69from sqlmesh.core.environment import Environment, EnvironmentNamingInfo 70from sqlmesh.core.loader import Loader, update_model_schemas 71from sqlmesh.core.macros import ExecutableOrMacro, macro 72from sqlmesh.core.metric import Metric, rewrite 73from sqlmesh.core.model import Model 74from sqlmesh.core.notification_target import ( 75 NotificationEvent, 76 NotificationTarget, 77 NotificationTargetManager, 78) 79from sqlmesh.core.plan import Plan, PlanBuilder 80from sqlmesh.core.reference import ReferenceGraph 81from sqlmesh.core.scheduler import Scheduler 82from sqlmesh.core.schema_loader import create_schema_file 83from sqlmesh.core.selector import Selector 84from sqlmesh.core.snapshot import ( 85 DeployabilityIndex, 86 Snapshot, 87 SnapshotEvaluator, 88 SnapshotFingerprint, 89 to_table_mapping, 90) 91from sqlmesh.core.state_sync import ( 92 CachingStateSync, 93 StateReader, 94 StateSync, 95 cleanup_expired_views, 96) 97from sqlmesh.core.table_diff import TableDiff 98from sqlmesh.core.test import ( 99 ModelTextTestResult, 100 generate_test, 101 get_all_model_tests, 102 run_model_tests, 103 run_tests, 104) 105from sqlmesh.core.user import User 106from sqlmesh.utils import UniqueKeyDict, sys_path 107from sqlmesh.utils.dag import DAG 108from sqlmesh.utils.date import TimeLike, now_ds, to_date 109from sqlmesh.utils.errors import ( 110 CircuitBreakerError, 111 ConfigError, 112 PlanError, 113 SQLMeshError, 114 UncategorizedPlanError, 115) 116from sqlmesh.utils.jinja import JinjaMacroRegistry 117 118if t.TYPE_CHECKING: 119 from typing_extensions import Literal 120 121 from sqlmesh.core.engine_adapter._typing import DF, PySparkDataFrame, PySparkSession 122 from sqlmesh.core.snapshot import Node 123 124 ModelOrSnapshot = t.Union[str, Model, Snapshot] 125 NodeOrSnapshot = t.Union[str, Model, StandaloneAudit, Snapshot] 126 127logger = logging.getLogger(__name__) 128 129 130class BaseContext(abc.ABC): 131 """The base context which defines methods to execute a model.""" 132 133 @property 134 @abc.abstractmethod 135 def default_dialect(self) -> t.Optional[str]: 136 """Returns the default dialect.""" 137 138 @property 139 @abc.abstractmethod 140 def _model_tables(self) -> t.Dict[str, str]: 141 """Returns a mapping of model names to tables.""" 142 143 @property 144 @abc.abstractmethod 145 def engine_adapter(self) -> EngineAdapter: 146 """Returns an engine adapter.""" 147 148 @property 149 def spark(self) -> t.Optional[PySparkSession]: 150 """Returns the spark session if it exists.""" 151 return self.engine_adapter.spark 152 153 @property 154 def default_catalog(self) -> t.Optional[str]: 155 raise NotImplementedError 156 157 def table(self, model_name: str) -> str: 158 """Gets the physical table name for a given model. 159 160 Args: 161 model_name: The model name. 162 163 Returns: 164 The physical table name. 165 """ 166 model_name = normalize_model_name(model_name, self.default_catalog, self.default_dialect) 167 168 # We generate SQL for the default dialect because the table name may be used in a 169 # fetchdf call and so the quotes need to be correct (eg. backticks for bigquery) 170 return parse_one(self._model_tables[model_name]).sql(dialect=self.default_dialect) 171 172 def fetchdf( 173 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 174 ) -> pd.DataFrame: 175 """Fetches a dataframe given a sql string or sqlglot expression. 176 177 Args: 178 query: SQL string or sqlglot expression. 179 quote_identifiers: Whether to quote all identifiers in the query. 180 181 Returns: 182 The default dataframe is Pandas, but for Spark a PySpark dataframe is returned. 183 """ 184 return self.engine_adapter.fetchdf(query, quote_identifiers=quote_identifiers) 185 186 def fetch_pyspark_df( 187 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 188 ) -> PySparkDataFrame: 189 """Fetches a PySpark dataframe given a sql string or sqlglot expression. 190 191 Args: 192 query: SQL string or sqlglot expression. 193 quote_identifiers: Whether to quote all identifiers in the query. 194 195 Returns: 196 A PySpark dataframe. 197 """ 198 return self.engine_adapter.fetch_pyspark_df(query, quote_identifiers=quote_identifiers) 199 200 201class ExecutionContext(BaseContext): 202 """The minimal context needed to execute a model. 203 204 Args: 205 engine_adapter: The engine adapter to execute queries against. 206 snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations. 207 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 208 """ 209 210 def __init__( 211 self, 212 engine_adapter: EngineAdapter, 213 snapshots: t.Dict[str, Snapshot], 214 deployability_index: t.Optional[DeployabilityIndex] = None, 215 default_dialect: t.Optional[str] = None, 216 default_catalog: t.Optional[str] = None, 217 variables: t.Optional[t.Dict[str, t.Any]] = None, 218 ): 219 self.snapshots = snapshots 220 self.deployability_index = deployability_index 221 self._engine_adapter = engine_adapter 222 self._default_catalog = default_catalog 223 self._default_dialect = default_dialect 224 self._variables = variables or {} 225 226 @property 227 def default_dialect(self) -> t.Optional[str]: 228 return self._default_dialect 229 230 @property 231 def engine_adapter(self) -> EngineAdapter: 232 """Returns an engine adapter.""" 233 return self._engine_adapter 234 235 @cached_property 236 def _model_tables(self) -> t.Dict[str, str]: 237 """Returns a mapping of model names to tables.""" 238 return to_table_mapping(self.snapshots.values(), self.deployability_index) 239 240 @property 241 def default_catalog(self) -> t.Optional[str]: 242 return self._default_catalog 243 244 @property 245 def gateway(self) -> t.Optional[str]: 246 """Returns the gateway name.""" 247 return self.var(c.GATEWAY) 248 249 def var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]: 250 """Returns a variable value.""" 251 return self._variables.get(var_name.lower(), default) 252 253 def with_variables(self, variables: t.Dict[str, t.Any]) -> ExecutionContext: 254 """Returns a new ExecutionContext with additional variables.""" 255 return ExecutionContext( 256 self._engine_adapter, 257 self.snapshots, 258 self.deployability_index, 259 self._default_dialect, 260 self._default_catalog, 261 variables=variables, 262 ) 263 264 265class GenericContext(BaseContext, t.Generic[C]): 266 """Encapsulates a SQLMesh environment supplying convenient functions to perform various tasks. 267 268 Args: 269 engine_adapter: The default engine adapter to use. 270 notification_targets: The notification target to use. Defaults to what is defined in config. 271 paths: The directories containing SQLMesh files. 272 config: A Config object or the name of a Config object in config.py. 273 connection: The name of the connection. If not specified the first connection as it appears 274 in configuration will be used. 275 test_connection: The name of the connection to use for tests. If not specified the first 276 connection as it appears in configuration will be used. 277 concurrent_tasks: The maximum number of tasks that can use the connection concurrently. 278 load: Whether or not to automatically load all models and macros (default True). 279 console: The rich instance used for printing out CLI command results. 280 users: A list of users to make known to SQLMesh. 281 config_type: The type of config object to use (default Config). 282 """ 283 284 CONFIG_TYPE: t.Type[C] 285 286 def __init__( 287 self, 288 engine_adapter: t.Optional[EngineAdapter] = None, 289 notification_targets: t.Optional[t.List[NotificationTarget]] = None, 290 state_sync: t.Optional[StateSync] = None, 291 paths: t.Union[str | Path, t.Iterable[str | Path]] = "", 292 config: t.Optional[t.Union[C, str, t.Dict[Path, C]]] = None, 293 gateway: t.Optional[str] = None, 294 concurrent_tasks: t.Optional[int] = None, 295 loader: t.Optional[t.Type[Loader]] = None, 296 load: bool = True, 297 console: t.Optional[Console] = None, 298 users: t.Optional[t.List[User]] = None, 299 ): 300 self.console = console or get_console() 301 self.configs = ( 302 config if isinstance(config, dict) else load_configs(config, self.CONFIG_TYPE, paths) 303 ) 304 self.dag: DAG[str] = DAG() 305 self._models: UniqueKeyDict[str, Model] = UniqueKeyDict("models") 306 self._audits: UniqueKeyDict[str, Audit] = UniqueKeyDict("audits") 307 self._standalone_audits: UniqueKeyDict[str, StandaloneAudit] = UniqueKeyDict( 308 "standaloneaudits" 309 ) 310 self._macros: UniqueKeyDict[str, ExecutableOrMacro] = UniqueKeyDict("macros") 311 self._metrics: UniqueKeyDict[str, Metric] = UniqueKeyDict("metrics") 312 self._jinja_macros = JinjaMacroRegistry() 313 self._default_catalog: t.Optional[str] = None 314 315 self.path, self.config = t.cast(t.Tuple[Path, C], next(iter(self.configs.items()))) 316 317 self.gateway = gateway 318 self._scheduler = self.config.get_scheduler(self.gateway) 319 self.environment_ttl = self.config.environment_ttl 320 self.pinned_environments = Environment.normalize_names(self.config.pinned_environments) 321 self.auto_categorize_changes = self.config.plan.auto_categorize_changes 322 323 self._connection_config = self.config.get_connection(self.gateway) 324 self.concurrent_tasks = concurrent_tasks or self._connection_config.concurrent_tasks 325 self._engine_adapter = engine_adapter or self._connection_config.create_engine_adapter() 326 327 test_connection_config = self.config.get_test_connection( 328 self.gateway, self.default_catalog, default_catalog_dialect=self.engine_adapter.DIALECT 329 ) 330 self._test_engine_adapter = test_connection_config.create_engine_adapter( 331 register_comments_override=False 332 ) 333 334 self._snapshot_evaluator: t.Optional[SnapshotEvaluator] = None 335 336 self._provided_state_sync: t.Optional[StateSync] = state_sync 337 self._state_sync: t.Optional[StateSync] = None 338 339 self._loader = (loader or self.config.loader)(**self.config.loader_kwargs) 340 341 # Should we dedupe notification_targets? If so how? 342 self.notification_targets = (notification_targets or []) + self.config.notification_targets 343 self.users = (users or []) + self.config.users 344 self.users = list({user.username: user for user in self.users}.values()) 345 self._register_notification_targets() 346 347 if ( 348 self.config.environment_catalog_mapping 349 and not self.engine_adapter.CATALOG_SUPPORT.is_multi_catalog_supported 350 ): 351 raise SQLMeshError( 352 "Environment catalog mapping is only supported for engine adapters that support multiple catalogs" 353 ) 354 355 if load: 356 self.load() 357 358 @property 359 def default_dialect(self) -> t.Optional[str]: 360 return self.config.dialect 361 362 @property 363 def engine_adapter(self) -> EngineAdapter: 364 """Returns an engine adapter.""" 365 return self._engine_adapter 366 367 @property 368 def snapshot_evaluator(self) -> SnapshotEvaluator: 369 if not self._snapshot_evaluator: 370 self._snapshot_evaluator = SnapshotEvaluator( 371 self.engine_adapter.with_log_level(logging.INFO), 372 ddl_concurrent_tasks=self.concurrent_tasks, 373 ) 374 return self._snapshot_evaluator 375 376 def execution_context( 377 self, deployability_index: t.Optional[DeployabilityIndex] = None 378 ) -> ExecutionContext: 379 """Returns an execution context.""" 380 return ExecutionContext( 381 engine_adapter=self._engine_adapter, 382 snapshots=self.snapshots, 383 deployability_index=deployability_index, 384 default_dialect=self.default_dialect, 385 default_catalog=self.default_catalog, 386 ) 387 388 def upsert_model(self, model: t.Union[str, Model], **kwargs: t.Any) -> Model: 389 """Update or insert a model. 390 391 The context's models dictionary will be updated to include these changes. 392 393 Args: 394 model: Model name or instance to update. 395 kwargs: The kwargs to update the model with. 396 397 Returns: 398 A new instance of the updated or inserted model. 399 """ 400 model = self.get_model(model, raise_if_missing=True) 401 path = model._path 402 403 # model.copy() can't be used here due to a cached state that can be a part of a model instance. 404 model = t.cast(Model, type(model)(**{**t.cast(Model, model).dict(), **kwargs})) 405 model._path = path 406 407 self._models.update({model.fqn: model}) 408 self.dag.add(model.fqn, model.depends_on) 409 update_model_schemas( 410 self.dag, 411 self._models, 412 self.path, 413 ) 414 415 model.validate_definition() 416 417 return model 418 419 def scheduler(self, environment: t.Optional[str] = None) -> Scheduler: 420 """Returns the built-in scheduler. 421 422 Args: 423 environment: The target environment to source model snapshots from, or None 424 if snapshots should be sourced from the currently loaded local state. 425 426 Returns: 427 The built-in scheduler instance. 428 """ 429 snapshots: t.Iterable[Snapshot] 430 if environment is not None: 431 stored_environment = self.state_sync.get_environment(environment) 432 if stored_environment is None: 433 raise ConfigError(f"Environment '{environment}' was not found.") 434 snapshots = self.state_sync.get_snapshots(stored_environment.snapshots).values() 435 else: 436 snapshots = self.snapshots.values() 437 438 if not snapshots: 439 raise ConfigError("No models were found") 440 441 return Scheduler( 442 snapshots, 443 self.snapshot_evaluator, 444 self.state_sync, 445 default_catalog=self.default_catalog, 446 max_workers=self.concurrent_tasks, 447 console=self.console, 448 notification_target_manager=self.notification_target_manager, 449 ) 450 451 @property 452 def state_sync(self) -> StateSync: 453 if not self._state_sync: 454 self._state_sync = self._new_state_sync() 455 456 if self._state_sync.get_versions(validate=False).schema_version == 0: 457 self._state_sync.migrate(default_catalog=self.default_catalog) 458 self._state_sync.get_versions() 459 self._state_sync = CachingStateSync(self._state_sync) # type: ignore 460 return self._state_sync 461 462 @property 463 def state_reader(self) -> StateReader: 464 return self.state_sync 465 466 def refresh(self) -> None: 467 """Refresh all models that have been updated.""" 468 if self._loader.reload_needed(): 469 self.load() 470 471 def load(self, update_schemas: bool = True) -> GenericContext[C]: 472 """Load all files in the context's path.""" 473 with sys_path(*self.configs): 474 gc.disable() 475 project = self._loader.load(self, update_schemas) 476 self._macros = project.macros 477 self._jinja_macros = project.jinja_macros 478 self._models = project.models 479 self._metrics = project.metrics 480 self._standalone_audits.clear() 481 self._audits.clear() 482 for name, audit in project.audits.items(): 483 if isinstance(audit, StandaloneAudit): 484 self._standalone_audits[name] = audit 485 else: 486 self._audits[name] = audit 487 self.dag = project.dag 488 gc.enable() 489 490 duplicates = set(self._models) & set(self._standalone_audits) 491 if duplicates: 492 raise ConfigError( 493 f"Models and Standalone audits cannot have the same name: {duplicates}" 494 ) 495 496 return self 497 498 def run( 499 self, 500 environment: t.Optional[str] = None, 501 *, 502 start: t.Optional[TimeLike] = None, 503 end: t.Optional[TimeLike] = None, 504 execution_time: t.Optional[TimeLike] = None, 505 skip_janitor: bool = False, 506 ignore_cron: bool = False, 507 ) -> bool: 508 """Run the entire dag through the scheduler. 509 510 Args: 511 environment: The target environment to source model snapshots from and virtually update. Default: prod. 512 start: The start of the interval to render. 513 end: The end of the interval to render. 514 execution_time: The date/time time reference to use for execution time. Defaults to now. 515 skip_janitor: Whether to skip the janitor task. 516 ignore_cron: Whether to ignore the model's cron schedule and run all available missing intervals. 517 518 Returns: 519 True if the run was successful, False otherwise. 520 """ 521 environment = environment or self.config.default_target_environment 522 523 self.notification_target_manager.notify( 524 NotificationEvent.RUN_START, environment=environment 525 ) 526 success = False 527 try: 528 success = self._run( 529 environment=environment, 530 start=start, 531 end=end, 532 execution_time=execution_time, 533 skip_janitor=skip_janitor, 534 ignore_cron=ignore_cron, 535 ) 536 except Exception as e: 537 self.notification_target_manager.notify( 538 NotificationEvent.RUN_FAILURE, traceback.format_exc() 539 ) 540 logger.error(f"Run Failure: {traceback.format_exc()}") 541 raise e 542 543 if success: 544 self.notification_target_manager.notify( 545 NotificationEvent.RUN_END, environment=environment 546 ) 547 self.console.log_success(f"Run finished for environment '{environment}'") 548 else: 549 self.notification_target_manager.notify( 550 NotificationEvent.RUN_FAILURE, "See console logs for details." 551 ) 552 553 return success 554 555 @t.overload 556 def get_model( 557 self, model_or_snapshot: ModelOrSnapshot, raise_if_missing: Literal[True] = True 558 ) -> Model: ... 559 560 @t.overload 561 def get_model( 562 self, 563 model_or_snapshot: ModelOrSnapshot, 564 raise_if_missing: Literal[False] = False, 565 ) -> t.Optional[Model]: ... 566 567 def get_model( 568 self, model_or_snapshot: ModelOrSnapshot, raise_if_missing: bool = False 569 ) -> t.Optional[Model]: 570 """Returns a model with the given name or None if a model with such name doesn't exist. 571 572 Args: 573 model_or_snapshot: A model name, model, or snapshot. 574 raise_if_missing: Raises an error if a model is not found. 575 576 Returns: 577 The expected model. 578 """ 579 if isinstance(model_or_snapshot, str): 580 normalized_name = normalize_model_name( 581 model_or_snapshot, 582 dialect=self.default_dialect, 583 default_catalog=self.default_catalog, 584 ) 585 model = self._models.get(normalized_name) 586 elif isinstance(model_or_snapshot, Snapshot): 587 model = model_or_snapshot.model 588 else: 589 model = model_or_snapshot 590 591 if raise_if_missing and not model: 592 raise SQLMeshError(f"Cannot find model for '{model_or_snapshot}'") 593 594 return model 595 596 @t.overload 597 def get_snapshot(self, node_or_snapshot: NodeOrSnapshot) -> t.Optional[Snapshot]: ... 598 599 @t.overload 600 def get_snapshot( 601 self, node_or_snapshot: NodeOrSnapshot, raise_if_missing: Literal[True] 602 ) -> Snapshot: ... 603 604 @t.overload 605 def get_snapshot( 606 self, node_or_snapshot: NodeOrSnapshot, raise_if_missing: Literal[False] 607 ) -> t.Optional[Snapshot]: ... 608 609 def get_snapshot( 610 self, node_or_snapshot: NodeOrSnapshot, raise_if_missing: bool = False 611 ) -> t.Optional[Snapshot]: 612 """Returns a snapshot with the given name or None if a snapshot with such name doesn't exist. 613 614 Args: 615 node_or_snapshot: A node name, node, or snapshot. 616 raise_if_missing: Raises an error if a snapshot is not found. 617 618 Returns: 619 The expected snapshot. 620 """ 621 if isinstance(node_or_snapshot, Snapshot): 622 return node_or_snapshot 623 if isinstance(node_or_snapshot, str) and not self.standalone_audits.get(node_or_snapshot): 624 node_or_snapshot = normalize_model_name( 625 node_or_snapshot, 626 dialect=self.default_dialect, 627 default_catalog=self.default_catalog, 628 ) 629 fqn = node_or_snapshot if isinstance(node_or_snapshot, str) else node_or_snapshot.fqn 630 snapshot = self.snapshots.get(fqn) 631 632 if raise_if_missing and not snapshot: 633 raise SQLMeshError(f"Cannot find snapshot for '{fqn}'") 634 635 return snapshot 636 637 def config_for_path(self, path: Path) -> Config: 638 for config_path, config in self.configs.items(): 639 try: 640 path.relative_to(config_path) 641 return config 642 except ValueError: 643 pass 644 return self.config 645 646 def config_for_node(self, node: str | Model | StandaloneAudit) -> Config: 647 if isinstance(node, str): 648 return self.config_for_path(self.get_snapshot(node, raise_if_missing=True).node._path) # type: ignore 649 return self.config_for_path(node._path) # type: ignore 650 651 @property 652 def models(self) -> MappingProxyType[str, Model]: 653 """Returns all registered models in this context.""" 654 return MappingProxyType(self._models) 655 656 @property 657 def metrics(self) -> MappingProxyType[str, Metric]: 658 """Returns all registered metrics in this context.""" 659 return MappingProxyType(self._metrics) 660 661 @property 662 def standalone_audits(self) -> MappingProxyType[str, StandaloneAudit]: 663 """Returns all registered standalone audits in this context.""" 664 return MappingProxyType(self._standalone_audits) 665 666 @property 667 def snapshots(self) -> t.Dict[str, Snapshot]: 668 """Generates and returns snapshots based on models registered in this context. 669 670 If one of the snapshots has been previously stored in the persisted state, the stored 671 instance will be returned. 672 """ 673 return self._snapshots() 674 675 @property 676 def default_catalog(self) -> t.Optional[str]: 677 if self._default_catalog is None: 678 self._default_catalog = self._scheduler.get_default_catalog(self) 679 return self._default_catalog 680 681 def render( 682 self, 683 model_or_snapshot: ModelOrSnapshot, 684 *, 685 start: t.Optional[TimeLike] = None, 686 end: t.Optional[TimeLike] = None, 687 execution_time: t.Optional[TimeLike] = None, 688 expand: t.Union[bool, t.Iterable[str]] = False, 689 **kwargs: t.Any, 690 ) -> exp.Expression: 691 """Renders a model's query, expanding macros with provided kwargs, and optionally expanding referenced models. 692 693 Args: 694 model_or_snapshot: The model, model name, or snapshot to render. 695 start: The start of the interval to render. 696 end: The end of the interval to render. 697 execution_time: The date/time time reference to use for execution time. Defaults to now. 698 expand: Whether or not to use expand materialized models, defaults to False. 699 If True, all referenced models are expanded as raw queries. 700 If a list, only referenced models are expanded as raw queries. 701 702 Returns: 703 The rendered expression. 704 """ 705 execution_time = execution_time or now_ds() 706 707 model = self.get_model(model_or_snapshot, raise_if_missing=True) 708 709 if expand and not isinstance(expand, bool): 710 expand = { 711 normalize_model_name( 712 x, default_catalog=self.default_catalog, dialect=self.default_dialect 713 ) 714 for x in expand 715 } 716 717 expand = self.dag.upstream(model.fqn) if expand is True else expand or [] 718 719 if model.is_seed: 720 df = next( 721 model.render( 722 context=self.execution_context(), 723 start=start, 724 end=end, 725 execution_time=execution_time, 726 **kwargs, 727 ) 728 ) 729 return next(pandas_to_sql(t.cast(pd.DataFrame, df), model.columns_to_types)) 730 731 return model.render_query_or_raise( 732 start=start, 733 end=end, 734 execution_time=execution_time, 735 snapshots=self.snapshots, 736 expand=expand, 737 engine_adapter=self.engine_adapter, 738 **kwargs, 739 ) 740 741 def evaluate( 742 self, 743 model_or_snapshot: ModelOrSnapshot, 744 start: TimeLike, 745 end: TimeLike, 746 execution_time: TimeLike, 747 limit: t.Optional[int] = None, 748 **kwargs: t.Any, 749 ) -> DF: 750 """Evaluate a model or snapshot (running its query against a DB/Engine). 751 752 This method is used to test or iterate on models without side effects. 753 754 Args: 755 model_or_snapshot: The model, model name, or snapshot to render. 756 start: The start of the interval to evaluate. 757 end: The end of the interval to evaluate. 758 execution_time: The date/time time reference to use for execution time. 759 limit: A limit applied to the model. 760 """ 761 snapshot = self.get_snapshot(model_or_snapshot, raise_if_missing=True) 762 763 df = self.snapshot_evaluator.evaluate_and_fetch( 764 snapshot, 765 start=start, 766 end=end, 767 execution_time=execution_time, 768 snapshots=self.snapshots, 769 limit=limit or c.DEFAULT_MAX_LIMIT, 770 ) 771 772 if df is None: 773 raise RuntimeError(f"Error evaluating {snapshot.name}") 774 775 return df 776 777 def format( 778 self, 779 transpile: t.Optional[str] = None, 780 append_newline: t.Optional[bool] = None, 781 **kwargs: t.Any, 782 ) -> None: 783 """Format all SQL models.""" 784 for model in self._models.values(): 785 if not model._path.suffix == ".sql": 786 continue 787 with open(model._path, "r+", encoding="utf-8") as file: 788 expressions = parse( 789 file.read(), default_dialect=self.config_for_node(model).dialect 790 ) 791 if transpile: 792 for prop in expressions[0].expressions: 793 if prop.name.lower() == "dialect": 794 prop.replace( 795 exp.Property( 796 this="dialect", 797 value=exp.Literal.string(transpile or model.dialect), 798 ) 799 ) 800 format = self.config_for_node(model).format 801 opts = {**format.generator_options, **kwargs} 802 file.seek(0) 803 file.write( 804 format_model_expressions(expressions, transpile or model.dialect, **opts) 805 ) 806 if append_newline is None: 807 append_newline = format.append_newline 808 if append_newline: 809 file.write("\n") 810 file.truncate() 811 812 def plan( 813 self, 814 environment: t.Optional[str] = None, 815 *, 816 start: t.Optional[TimeLike] = None, 817 end: t.Optional[TimeLike] = None, 818 execution_time: t.Optional[TimeLike] = None, 819 create_from: t.Optional[str] = None, 820 skip_tests: bool = False, 821 restate_models: t.Optional[t.Iterable[str]] = None, 822 no_gaps: bool = False, 823 skip_backfill: bool = False, 824 forward_only: t.Optional[bool] = None, 825 no_prompts: t.Optional[bool] = None, 826 auto_apply: t.Optional[bool] = None, 827 no_auto_categorization: t.Optional[bool] = None, 828 effective_from: t.Optional[TimeLike] = None, 829 include_unmodified: t.Optional[bool] = None, 830 select_models: t.Optional[t.Collection[str]] = None, 831 backfill_models: t.Optional[t.Collection[str]] = None, 832 categorizer_config: t.Optional[CategorizerConfig] = None, 833 enable_preview: t.Optional[bool] = None, 834 no_diff: t.Optional[bool] = None, 835 run: bool = False, 836 ) -> Plan: 837 """Interactively creates a plan. 838 839 This method compares the current context with the target environment. It then presents 840 the differences and asks whether to backfill each modified model. 841 842 Args: 843 environment: The environment to diff and plan against. 844 start: The start date of the backfill if there is one. 845 end: The end date of the backfill if there is one. 846 execution_time: The date/time reference to use for execution time. Defaults to now. 847 create_from: The environment to create the target environment from if it 848 doesn't exist. If not specified, the "prod" environment will be used. 849 skip_tests: Unit tests are run by default so this will skip them if enabled 850 restate_models: A list of either internal or external models, or tags, that need to be restated 851 for the given plan interval. If the target environment is a production environment, 852 ALL snapshots that depended on these upstream tables will have their intervals deleted 853 (even ones not in this current environment). Only the snapshots in this environment will 854 be backfilled whereas others need to be recovered on a future plan application. For development 855 environments only snapshots that are part of this plan will be affected. 856 no_gaps: Whether to ensure that new snapshots for models that are already a 857 part of the target environment have no data gaps when compared against previous 858 snapshots for same models. 859 skip_backfill: Whether to skip the backfill step. Default: False. 860 forward_only: Whether the purpose of the plan is to make forward only changes. 861 no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that 862 if this flag is set to true and there are uncategorized changes the plan creation will 863 fail. Default: False. 864 auto_apply: Whether to automatically apply the new plan after creation. Default: False. 865 no_auto_categorization: Indicates whether to disable automatic categorization of model 866 changes (breaking / non-breaking). If not provided, then the corresponding configuration 867 option determines the behavior. 868 categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the 869 project config by default. 870 effective_from: The effective date from which to apply forward-only changes on production. 871 include_unmodified: Indicates whether to include unmodified models in the target development environment. 872 select_models: A list of model selection strings to filter the models that should be included into this plan. 873 backfill_models: A list of model selection strings to filter the models for which the data should be backfilled. 874 enable_preview: Indicates whether to enable preview for forward-only models in development environments. 875 no_diff: Hide text differences for changed models. 876 run: Whether to run latest intervals as part of the plan application. 877 878 Returns: 879 The populated Plan object. 880 """ 881 plan_builder = self.plan_builder( 882 environment, 883 start=start, 884 end=end, 885 execution_time=execution_time, 886 create_from=create_from, 887 skip_tests=skip_tests, 888 restate_models=restate_models, 889 no_gaps=no_gaps, 890 skip_backfill=skip_backfill, 891 forward_only=forward_only, 892 no_auto_categorization=no_auto_categorization, 893 effective_from=effective_from, 894 include_unmodified=include_unmodified, 895 select_models=select_models, 896 backfill_models=backfill_models, 897 categorizer_config=categorizer_config, 898 enable_preview=enable_preview, 899 run=run, 900 ) 901 902 self.console.plan( 903 plan_builder, 904 auto_apply if auto_apply is not None else self.config.plan.auto_apply, 905 self.default_catalog, 906 no_diff=no_diff if no_diff is not None else self.config.plan.no_diff, 907 no_prompts=no_prompts if no_prompts is not None else self.config.plan.no_prompts, 908 ) 909 910 return plan_builder.build() 911 912 def plan_builder( 913 self, 914 environment: t.Optional[str] = None, 915 *, 916 start: t.Optional[TimeLike] = None, 917 end: t.Optional[TimeLike] = None, 918 execution_time: t.Optional[TimeLike] = None, 919 create_from: t.Optional[str] = None, 920 skip_tests: bool = False, 921 restate_models: t.Optional[t.Iterable[str]] = None, 922 no_gaps: bool = False, 923 skip_backfill: bool = False, 924 forward_only: t.Optional[bool] = None, 925 no_auto_categorization: t.Optional[bool] = None, 926 effective_from: t.Optional[TimeLike] = None, 927 include_unmodified: t.Optional[bool] = None, 928 select_models: t.Optional[t.Collection[str]] = None, 929 backfill_models: t.Optional[t.Collection[str]] = None, 930 categorizer_config: t.Optional[CategorizerConfig] = None, 931 enable_preview: t.Optional[bool] = None, 932 run: bool = False, 933 ) -> PlanBuilder: 934 """Creates a plan builder. 935 936 Args: 937 environment: The environment to diff and plan against. 938 start: The start date of the backfill if there is one. 939 end: The end date of the backfill if there is one. 940 execution_time: The date/time reference to use for execution time. Defaults to now. 941 create_from: The environment to create the target environment from if it 942 doesn't exist. If not specified, the "prod" environment will be used. 943 skip_tests: Unit tests are run by default so this will skip them if enabled 944 restate_models: A list of either internal or external models, or tags, that need to be restated 945 for the given plan interval. If the target environment is a production environment, 946 ALL snapshots that depended on these upstream tables will have their intervals deleted 947 (even ones not in this current environment). Only the snapshots in this environment will 948 be backfilled whereas others need to be recovered on a future plan application. For development 949 environments only snapshots that are part of this plan will be affected. 950 no_gaps: Whether to ensure that new snapshots for models that are already a 951 part of the target environment have no data gaps when compared against previous 952 snapshots for same models. 953 skip_backfill: Whether to skip the backfill step. Default: False. 954 forward_only: Whether the purpose of the plan is to make forward only changes. 955 no_auto_categorization: Indicates whether to disable automatic categorization of model 956 changes (breaking / non-breaking). If not provided, then the corresponding configuration 957 option determines the behavior. 958 categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the 959 project config by default. 960 effective_from: The effective date from which to apply forward-only changes on production. 961 include_unmodified: Indicates whether to include unmodified models in the target development environment. 962 select_models: A list of model selection strings to filter the models that should be included into this plan. 963 backfill_models: A list of model selection strings to filter the models for which the data should be backfilled. 964 enable_preview: Indicates whether to enable preview for forward-only models in development environments. 965 run: Whether to run latest intervals as part of the plan application. 966 967 Returns: 968 The plan builder. 969 """ 970 environment = environment or self.config.default_target_environment 971 environment = Environment.normalize_name(environment) 972 is_dev = environment != c.PROD 973 974 if skip_backfill and not no_gaps and not is_dev: 975 raise ConfigError( 976 "When targeting the production environment either the backfill should not be skipped or the lack of data gaps should be enforced (--no-gaps flag)." 977 ) 978 979 if run and is_dev: 980 raise ConfigError("The '--run' flag is only supported for the production environment.") 981 982 self._run_plan_tests(skip_tests=skip_tests) 983 984 environment_ttl = ( 985 self.environment_ttl if environment not in self.pinned_environments else None 986 ) 987 988 model_selector = self._new_selector() 989 990 if backfill_models: 991 backfill_models = model_selector.expand_model_selections(backfill_models) 992 else: 993 backfill_models = None 994 995 models_override: t.Optional[UniqueKeyDict[str, Model]] = None 996 if select_models: 997 models_override = model_selector.select_models( 998 select_models, 999 environment, 1000 fallback_env_name=create_from or c.PROD, 1001 ensure_finalized_snapshots=self.config.plan.use_finalized_state, 1002 ) 1003 if not backfill_models: 1004 # Only backfill selected models unless explicitly specified. 1005 backfill_models = model_selector.expand_model_selections(select_models) 1006 1007 expanded_restate_models = None 1008 if restate_models is not None: 1009 expanded_restate_models = model_selector.expand_model_selections(restate_models) 1010 if not expanded_restate_models: 1011 self.console.log_error( 1012 f"Provided restated models do not match any models. No models will be included in plan. Provided: {', '.join(restate_models)}" 1013 ) 1014 1015 snapshots = self._snapshots(models_override) 1016 context_diff = self._context_diff( 1017 environment or c.PROD, 1018 snapshots=snapshots, 1019 create_from=create_from, 1020 force_no_diff=(restate_models is not None and not expanded_restate_models) 1021 or (backfill_models is not None and not backfill_models), 1022 ensure_finalized_snapshots=self.config.plan.use_finalized_state, 1023 ) 1024 1025 # If no end date is specified, use the max interval end from prod 1026 # to prevent unintended evaluation of the entire DAG. 1027 if not run: 1028 if backfill_models is not None: 1029 # Only consider selected models for the default end value. 1030 models_for_default_end = backfill_models.copy() 1031 for name in backfill_models: 1032 if name not in snapshots: 1033 continue 1034 snapshot = snapshots[name] 1035 snapshot_id = snapshot.snapshot_id 1036 if ( 1037 snapshot_id in context_diff.added 1038 and snapshot_id in context_diff.new_snapshots 1039 ): 1040 # If the selected model is a newly added model, then we should narrow down the intervals 1041 # that should be considered for the default plan end value by including its parents. 1042 models_for_default_end |= {s.name for s in snapshot.parents} 1043 default_end = self.state_sync.greatest_common_interval_end( 1044 c.PROD, 1045 models_for_default_end, 1046 ensure_finalized_snapshots=self.config.plan.use_finalized_state, 1047 ) 1048 else: 1049 default_end = self.state_sync.max_interval_end_for_environment( 1050 c.PROD, ensure_finalized_snapshots=self.config.plan.use_finalized_state 1051 ) 1052 else: 1053 default_end = None 1054 1055 default_start = to_date(default_end) - timedelta(days=1) if default_end and is_dev else None 1056 1057 return PlanBuilder( 1058 context_diff=context_diff, 1059 start=start, 1060 end=end, 1061 execution_time=execution_time, 1062 apply=self.apply, 1063 restate_models=expanded_restate_models, 1064 backfill_models=backfill_models, 1065 no_gaps=no_gaps, 1066 skip_backfill=skip_backfill, 1067 is_dev=is_dev, 1068 forward_only=( 1069 forward_only if forward_only is not None else self.config.plan.forward_only 1070 ), 1071 environment_ttl=environment_ttl, 1072 environment_suffix_target=self.config.environment_suffix_target, 1073 environment_catalog_mapping=self.config.environment_catalog_mapping, 1074 categorizer_config=categorizer_config or self.auto_categorize_changes, 1075 auto_categorization_enabled=not no_auto_categorization, 1076 effective_from=effective_from, 1077 include_unmodified=( 1078 include_unmodified 1079 if include_unmodified is not None 1080 else self.config.plan.include_unmodified 1081 ), 1082 default_start=default_start, 1083 default_end=default_end, 1084 enable_preview=( 1085 enable_preview if enable_preview is not None else self.config.plan.enable_preview 1086 ), 1087 end_bounded=not run, 1088 ensure_finalized_snapshots=self.config.plan.use_finalized_state, 1089 ) 1090 1091 def apply( 1092 self, 1093 plan: Plan, 1094 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 1095 ) -> None: 1096 """Applies a plan by pushing snapshots and backfilling data. 1097 1098 Given a plan, it pushes snapshots into the state sync and then uses the scheduler 1099 to backfill all models. 1100 1101 Args: 1102 plan: The plan to apply. 1103 circuit_breaker: An optional handler which checks if the apply should be aborted. 1104 """ 1105 if ( 1106 not plan.context_diff.has_changes 1107 and not plan.requires_backfill 1108 and not plan.has_unmodified_unpromoted 1109 ): 1110 return 1111 if plan.uncategorized: 1112 raise UncategorizedPlanError("Can't apply a plan with uncategorized changes.") 1113 self.notification_target_manager.notify( 1114 NotificationEvent.APPLY_START, 1115 environment=plan.environment_naming_info.name, 1116 plan_id=plan.plan_id, 1117 ) 1118 try: 1119 self._apply(plan, circuit_breaker) 1120 except Exception as e: 1121 self.notification_target_manager.notify( 1122 NotificationEvent.APPLY_FAILURE, 1123 environment=plan.environment_naming_info.name, 1124 plan_id=plan.plan_id, 1125 exc=traceback.format_exc(), 1126 ) 1127 logger.error(f"Apply Failure: {traceback.format_exc()}") 1128 raise e 1129 self.notification_target_manager.notify( 1130 NotificationEvent.APPLY_END, 1131 environment=plan.environment_naming_info.name, 1132 plan_id=plan.plan_id, 1133 ) 1134 1135 def invalidate_environment(self, name: str, sync: bool = False) -> None: 1136 """Invalidates the target environment by setting its expiration timestamp to now. 1137 1138 Args: 1139 name: The name of the environment to invalidate. 1140 sync: If True, the call blocks until the environment is deleted. Otherwise, the environment will 1141 be deleted asynchronously by the janitor process. 1142 """ 1143 self.state_sync.invalidate_environment(name) 1144 if sync: 1145 self._cleanup_environments() 1146 self.console.log_success(f"Environment '{name}' has been deleted.") 1147 else: 1148 self.console.log_success(f"Environment '{name}' has been invalidated.") 1149 1150 def diff(self, environment: t.Optional[str] = None, detailed: bool = False) -> bool: 1151 """Show a diff of the current context with a given environment. 1152 1153 Args: 1154 environment: The environment to diff against. 1155 detailed: Show the actual SQL differences if True. 1156 1157 Returns: 1158 True if there are changes, False otherwise. 1159 """ 1160 environment = environment or self.config.default_target_environment 1161 environment = Environment.normalize_name(environment) 1162 context_diff = self._context_diff(environment) 1163 self.console.show_model_difference_summary( 1164 context_diff, 1165 EnvironmentNamingInfo.from_environment_catalog_mapping( 1166 self.config.environment_catalog_mapping, 1167 name=environment, 1168 suffix_target=self.config.environment_suffix_target, 1169 ), 1170 self.default_catalog, 1171 no_diff=not detailed, 1172 ) 1173 return context_diff.has_changes 1174 1175 def table_diff( 1176 self, 1177 source: str, 1178 target: str, 1179 on: t.List[str] | exp.Condition | None = None, 1180 model_or_snapshot: t.Optional[ModelOrSnapshot] = None, 1181 where: t.Optional[str | exp.Condition] = None, 1182 limit: int = 20, 1183 show: bool = True, 1184 show_sample: bool = True, 1185 ) -> TableDiff: 1186 """Show a diff between two tables. 1187 1188 Args: 1189 source: The source environment or table. 1190 target: The target environment or table. 1191 on: The join condition, table aliases must be "s" and "t" for source and target. 1192 If omitted, the table's grain will be used. 1193 model_or_snapshot: The model or snapshot to use when environments are passed in. 1194 where: An optional where statement to filter results. 1195 limit: The limit of the sample dataframe. 1196 show: Show the table diff output in the console. 1197 show_sample: Show the sample dataframe in the console. Requires show=True. 1198 1199 Returns: 1200 The TableDiff object containing schema and summary differences. 1201 """ 1202 source_alias, target_alias = source, target 1203 if model_or_snapshot: 1204 model = self.get_model(model_or_snapshot, raise_if_missing=True) 1205 source_env = self.state_reader.get_environment(source) 1206 target_env = self.state_reader.get_environment(target) 1207 1208 if not source_env: 1209 raise SQLMeshError(f"Could not find environment '{source}'") 1210 if not target_env: 1211 raise SQLMeshError(f"Could not find environment '{target}')") 1212 1213 source = next( 1214 snapshot for snapshot in source_env.snapshots if snapshot.name == model.fqn 1215 ).table_name() 1216 target = next( 1217 snapshot for snapshot in target_env.snapshots if snapshot.name == model.fqn 1218 ).table_name() 1219 source_alias = source_env.name 1220 target_alias = target_env.name 1221 1222 if not on: 1223 for ref in model.all_references: 1224 if ref.unique: 1225 on = ref.columns 1226 1227 if not on: 1228 raise SQLMeshError( 1229 "SQLMesh doesn't know how to join the two tables. Specify the `grains` in each model definition or pass join column names in separate `-o` flags." 1230 ) 1231 1232 table_diff = TableDiff( 1233 adapter=self._engine_adapter, 1234 source=source, 1235 target=target, 1236 on=on, 1237 where=where, 1238 source_alias=source_alias, 1239 target_alias=target_alias, 1240 model_name=model.name if model_or_snapshot else None, 1241 limit=limit, 1242 ) 1243 if show: 1244 self.console.show_schema_diff(table_diff.schema_diff()) 1245 self.console.show_row_diff(table_diff.row_diff(), show_sample=show_sample) 1246 return table_diff 1247 1248 def get_dag( 1249 self, select_models: t.Optional[t.Collection[str]] = None, **options: t.Any 1250 ) -> GraphHTML: 1251 """Gets an HTML object representation of the DAG. 1252 1253 Args: 1254 select_models: A list of model selection strings that should be included in the dag. 1255 Returns: 1256 An html object that renders the dag. 1257 """ 1258 dag = ( 1259 self.dag.prune(*self._new_selector().expand_model_selections(select_models)) 1260 if select_models 1261 else self.dag 1262 ) 1263 1264 nodes = {} 1265 edges: t.List[t.Dict] = [] 1266 1267 for node, deps in dag.graph.items(): 1268 nodes[node] = { 1269 "id": node, 1270 "label": node.split(".")[-1], 1271 "title": f"<span>{node}</span>", 1272 } 1273 edges.extend({"from": d, "to": node} for d in deps) 1274 1275 return GraphHTML( 1276 nodes, 1277 edges, 1278 options={ 1279 "height": "100%", 1280 "width": "100%", 1281 "interaction": {}, 1282 "layout": { 1283 "hierarchical": { 1284 "enabled": True, 1285 "nodeSpacing": 200, 1286 "sortMethod": "directed", 1287 }, 1288 }, 1289 "nodes": { 1290 "shape": "box", 1291 }, 1292 **options, 1293 }, 1294 ) 1295 1296 def render_dag(self, path: str, select_models: t.Optional[t.Collection[str]] = None) -> None: 1297 """Render the dag as HTML and save it to a file. 1298 1299 Args: 1300 path: filename to save the dag html to 1301 select_models: A list of model selection strings that should be included in the dag. 1302 """ 1303 file_path = Path(path) 1304 suffix = file_path.suffix 1305 if suffix != ".html": 1306 if suffix: 1307 logger.warning( 1308 f"The extension {suffix} does not designate an html file. A file with a `.html` extension will be created instead." 1309 ) 1310 path = str(file_path.with_suffix(".html")) 1311 1312 with open(path, "w", encoding="utf-8") as file: 1313 file.write(str(self.get_dag(select_models))) 1314 1315 def create_test( 1316 self, 1317 model: str, 1318 input_queries: t.Dict[str, str], 1319 overwrite: bool = False, 1320 variables: t.Optional[t.Dict[str, str]] = None, 1321 path: t.Optional[str] = None, 1322 name: t.Optional[str] = None, 1323 include_ctes: bool = False, 1324 ) -> None: 1325 """Generate a unit test fixture for a given model. 1326 1327 Args: 1328 model: The model to test. 1329 input_queries: Mapping of model names to queries. Each model included in this mapping 1330 will be populated in the test based on the results of the corresponding query. 1331 overwrite: Whether to overwrite the existing test in case of a file path collision. 1332 When set to False, an error will be raised if there is such a collision. 1333 variables: Key-value pairs that will define variables needed by the model. 1334 path: The file path corresponding to the fixture, relative to the test directory. 1335 By default, the fixture will be created under the test directory and the file name 1336 will be inferred from the test's name. 1337 name: The name of the test. This is inferred from the model name by default. 1338 include_ctes: When true, CTE fixtures will also be generated. 1339 """ 1340 input_queries = { 1341 # The get_model here has two purposes: return normalized names & check for missing deps 1342 self.get_model(dep, raise_if_missing=True).fqn: query 1343 for dep, query in input_queries.items() 1344 } 1345 1346 generate_test( 1347 model=self.get_model(model, raise_if_missing=True), 1348 input_queries=input_queries, 1349 models=self._models, 1350 engine_adapter=self._engine_adapter, 1351 test_engine_adapter=self._test_engine_adapter, 1352 project_path=self.path, 1353 overwrite=overwrite, 1354 variables=variables, 1355 path=path, 1356 name=name, 1357 include_ctes=include_ctes, 1358 ) 1359 1360 def test( 1361 self, 1362 match_patterns: t.Optional[t.List[str]] = None, 1363 tests: t.Optional[t.List[str]] = None, 1364 verbose: bool = False, 1365 preserve_fixtures: bool = False, 1366 stream: t.Optional[t.TextIO] = None, 1367 ) -> ModelTextTestResult: 1368 """Discover and run model tests""" 1369 if verbose: 1370 pd.set_option("display.max_columns", None) 1371 verbosity = 2 1372 else: 1373 verbosity = 1 1374 1375 try: 1376 if tests: 1377 result = run_model_tests( 1378 tests=tests, 1379 models=self._models, 1380 engine_adapter=self._test_engine_adapter, 1381 dialect=self.default_dialect, 1382 verbosity=verbosity, 1383 patterns=match_patterns, 1384 preserve_fixtures=preserve_fixtures, 1385 default_catalog=self.default_catalog, 1386 ) 1387 else: 1388 test_meta = [] 1389 1390 for path, config in self.configs.items(): 1391 test_meta.extend( 1392 get_all_model_tests( 1393 path / c.TESTS, 1394 patterns=match_patterns, 1395 ignore_patterns=config.ignore_patterns, 1396 ) 1397 ) 1398 1399 result = run_tests( 1400 test_meta, 1401 models=self._models, 1402 engine_adapter=self._test_engine_adapter, 1403 dialect=self.default_dialect, 1404 verbosity=verbosity, 1405 preserve_fixtures=preserve_fixtures, 1406 stream=stream, 1407 default_catalog=self.default_catalog, 1408 ) 1409 finally: 1410 self._test_engine_adapter.close() 1411 1412 return result 1413 1414 def audit( 1415 self, 1416 start: TimeLike, 1417 end: TimeLike, 1418 *, 1419 models: t.Optional[t.Iterator[str]] = None, 1420 execution_time: t.Optional[TimeLike] = None, 1421 ) -> None: 1422 """Audit models. 1423 1424 Args: 1425 start: The start of the interval to audit. 1426 end: The end of the interval to audit. 1427 models: The models to audit. All models will be audited if not specified. 1428 execution_time: The date/time time reference to use for execution time. Defaults to now. 1429 """ 1430 1431 snapshots = ( 1432 [self.get_snapshot(model, raise_if_missing=True) for model in models] 1433 if models 1434 else self.snapshots.values() 1435 ) 1436 1437 num_audits = sum(len(snapshot.audits_with_args) for snapshot in snapshots) 1438 self.console.log_status_update(f"Found {num_audits} audit(s).") 1439 errors = [] 1440 skipped_count = 0 1441 for snapshot in snapshots: 1442 for audit_result in self.snapshot_evaluator.audit( 1443 snapshot=snapshot, 1444 start=start, 1445 end=end, 1446 snapshots=self.snapshots, 1447 raise_exception=False, 1448 ): 1449 audit_id = f"{audit_result.audit.name}" 1450 if audit_result.model: 1451 audit_id += f" on model {audit_result.model.name}" 1452 1453 if audit_result.skipped: 1454 self.console.log_status_update(f"{audit_id} ⏸️ SKIPPED.") 1455 skipped_count += 1 1456 elif audit_result.count: 1457 errors.append(audit_result) 1458 self.console.log_status_update( 1459 f"{audit_id} ❌ [red]FAIL [{audit_result.count}][/red]." 1460 ) 1461 else: 1462 self.console.log_status_update(f"{audit_id} ✅ [green]PASS[/green].") 1463 1464 self.console.log_status_update( 1465 f"\nFinished with {len(errors)} audit error{'' if len(errors) == 1 else 's'} " 1466 f"and {skipped_count} audit{'' if skipped_count == 1 else 's'} skipped." 1467 ) 1468 for error in errors: 1469 self.console.log_status_update( 1470 f"\nFailure in audit {error.audit.name} ({error.audit._path})." 1471 ) 1472 self.console.log_status_update(f"Got {error.count} results, expected 0.") 1473 if error.query: 1474 self.console.show_sql( 1475 f"{error.query.sql(dialect=self.snapshot_evaluator.adapter.dialect)}" 1476 ) 1477 1478 self.console.log_status_update("Done.") 1479 1480 def rewrite(self, sql: str, dialect: str = "") -> exp.Expression: 1481 """Rewrite a sql expression with semantic references into an executable query. 1482 1483 https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/ 1484 1485 Args: 1486 sql: The sql string to rewrite. 1487 dialect: The dialect of the sql string, defaults to the project dialect. 1488 1489 Returns: 1490 A SQLGlot expression with semantic references expanded. 1491 """ 1492 return rewrite( 1493 sql, 1494 graph=ReferenceGraph(self.models.values()), 1495 metrics=self._metrics, 1496 dialect=dialect or self.default_dialect, 1497 ) 1498 1499 def migrate(self) -> None: 1500 """Migrates SQLMesh to the current running version. 1501 1502 Please contact your SQLMesh administrator before doing this. 1503 """ 1504 self.notification_target_manager.notify(NotificationEvent.MIGRATION_START) 1505 try: 1506 self._new_state_sync().migrate( 1507 default_catalog=self.default_catalog, 1508 promoted_snapshots_only=self.config.migration.promoted_snapshots_only, 1509 ) 1510 except Exception as e: 1511 self.notification_target_manager.notify( 1512 NotificationEvent.MIGRATION_FAILURE, traceback.format_exc() 1513 ) 1514 raise e 1515 self.notification_target_manager.notify(NotificationEvent.MIGRATION_END) 1516 1517 def rollback(self) -> None: 1518 """Rolls back SQLMesh to the previous migration. 1519 1520 Please contact your SQLMesh administrator before doing this. This action cannot be undone. 1521 """ 1522 self._new_state_sync().rollback() 1523 1524 def create_external_models(self) -> None: 1525 """Create a schema file with all external models. 1526 1527 The schema file contains all columns and types of external models, allowing for more robust 1528 lineage, validation, and optimizations. 1529 """ 1530 if not self._models: 1531 self.load(update_schemas=False) 1532 1533 for path, config in self.configs.items(): 1534 create_schema_file( 1535 path=path / c.SCHEMA_YAML, 1536 models=UniqueKeyDict( 1537 "models", 1538 { 1539 fqn: model 1540 for fqn, model in self._models.items() 1541 if self.config_for_node(model) is config 1542 }, 1543 ), 1544 adapter=self._engine_adapter, 1545 state_reader=self.state_reader, 1546 dialect=config.model_defaults.dialect, 1547 max_workers=self.concurrent_tasks, 1548 ) 1549 1550 def print_info(self) -> None: 1551 """Prints information about connections, models, macros, etc. to the console.""" 1552 self.console.log_status_update(f"Models: {len(self.models)}") 1553 self.console.log_status_update(f"Macros: {len(self._macros) - len(macro.get_registry())}") 1554 1555 self._try_connection("data warehouse", self._engine_adapter) 1556 1557 state_connection = self.config.get_state_connection(self.gateway) 1558 if state_connection: 1559 self._try_connection("state backend", state_connection.create_engine_adapter()) 1560 1561 self._try_connection("test", self._test_engine_adapter) 1562 1563 def close(self) -> None: 1564 """Releases all resources allocated by this context.""" 1565 self.snapshot_evaluator.close() 1566 self.state_sync.close() 1567 1568 def _run( 1569 self, 1570 environment: str, 1571 *, 1572 start: t.Optional[TimeLike], 1573 end: t.Optional[TimeLike], 1574 execution_time: t.Optional[TimeLike], 1575 skip_janitor: bool, 1576 ignore_cron: bool, 1577 ) -> bool: 1578 if not skip_janitor and environment.lower() == c.PROD: 1579 self._run_janitor() 1580 1581 env_check_attempts_num = max( 1582 1, 1583 self.config.run.environment_check_max_wait 1584 // self.config.run.environment_check_interval, 1585 ) 1586 1587 def _block_until_finalized() -> str: 1588 for _ in range(env_check_attempts_num): 1589 assert environment is not None # mypy 1590 environment_state = self.state_sync.get_environment(environment) 1591 if not environment_state: 1592 raise SQLMeshError(f"Environment '{environment}' was not found.") 1593 if environment_state.finalized_ts: 1594 return environment_state.plan_id 1595 logger.warning( 1596 "Environment '%s' is being updated by plan '%s'. Retrying in %s seconds...", 1597 environment, 1598 environment_state.plan_id, 1599 self.config.run.environment_check_interval, 1600 ) 1601 time.sleep(self.config.run.environment_check_interval) 1602 raise SQLMeshError( 1603 f"Exceeded the maximum wait time for environment '{environment}' to be ready. " 1604 "This means that the environment either failed to update or the update is taking longer than expected. " 1605 "See https://sqlmesh.readthedocs.io/en/stable/reference/configuration/#run to adjust the timeout settings." 1606 ) 1607 1608 done = False 1609 while not done: 1610 plan_id_at_start = _block_until_finalized() 1611 1612 def _has_environment_changed() -> bool: 1613 assert environment is not None # mypy 1614 current_environment_state = self.state_sync.get_environment(environment) 1615 return ( 1616 not current_environment_state 1617 or current_environment_state.plan_id != plan_id_at_start 1618 or not current_environment_state.finalized_ts 1619 ) 1620 1621 try: 1622 success = self.scheduler(environment=environment).run( 1623 environment, 1624 start=start, 1625 end=end, 1626 execution_time=execution_time, 1627 ignore_cron=ignore_cron, 1628 circuit_breaker=_has_environment_changed, 1629 ) 1630 done = True 1631 except CircuitBreakerError: 1632 logger.warning( 1633 "Environment '%s' has been modified while running. Restarting the run...", 1634 environment, 1635 ) 1636 1637 return success 1638 1639 def _apply(self, plan: Plan, circuit_breaker: t.Optional[t.Callable[[], bool]]) -> None: 1640 self._scheduler.create_plan_evaluator(self).evaluate(plan, circuit_breaker=circuit_breaker) 1641 1642 def table_name(self, model_name: str, dev: bool) -> str: 1643 """Returns the name of the pysical table for the given model name. 1644 1645 Args: 1646 model_name: The name of the model. 1647 dev: Whether to use the deployability index for the table name. 1648 1649 Returns: 1650 The name of the physical table. 1651 """ 1652 deployability_index = ( 1653 DeployabilityIndex.create(self.snapshots.values()) 1654 if dev 1655 else DeployabilityIndex.all_deployable() 1656 ) 1657 snapshot = self.get_snapshot(model_name) 1658 if not snapshot: 1659 raise SQLMeshError(f"Model '{model_name}' was not found.") 1660 return snapshot.table_name(is_deployable=deployability_index.is_deployable(snapshot)) 1661 1662 def clear_caches(self) -> None: 1663 for path in self.configs: 1664 rmtree(path / c.CACHE) 1665 1666 def _run_tests(self, verbose: bool = False) -> t.Tuple[unittest.result.TestResult, str]: 1667 test_output_io = StringIO() 1668 result = self.test(stream=test_output_io, verbose=verbose) 1669 return result, test_output_io.getvalue() 1670 1671 def _run_plan_tests( 1672 self, skip_tests: bool = False 1673 ) -> t.Tuple[t.Optional[unittest.result.TestResult], t.Optional[str]]: 1674 if self._test_engine_adapter and not skip_tests: 1675 result, test_output = self._run_tests() 1676 if result.testsRun > 0: 1677 self.console.log_test_results( 1678 result, test_output, self._test_engine_adapter.dialect 1679 ) 1680 if not result.wasSuccessful(): 1681 raise PlanError( 1682 "Cannot generate plan due to failing test(s). Fix test(s) and run again" 1683 ) 1684 return result, test_output 1685 return None, None 1686 1687 @property 1688 def _model_tables(self) -> t.Dict[str, str]: 1689 """Mapping of model name to physical table name. 1690 1691 If a snapshot has not been versioned yet, its view name will be returned. 1692 """ 1693 return { 1694 fqn: ( 1695 snapshot.table_name() 1696 if snapshot.version 1697 else snapshot.qualified_view_name.for_environment( 1698 EnvironmentNamingInfo.from_environment_catalog_mapping( 1699 self.config.environment_catalog_mapping, 1700 name=c.PROD, 1701 suffix_target=self.config.environment_suffix_target, 1702 ) 1703 ) 1704 ) 1705 for fqn, snapshot in self.snapshots.items() 1706 } 1707 1708 def _snapshots( 1709 self, models_override: t.Optional[UniqueKeyDict[str, Model]] = None 1710 ) -> t.Dict[str, Snapshot]: 1711 prod = self.state_reader.get_environment(c.PROD) 1712 remote_snapshots = ( 1713 { 1714 snapshot.name: snapshot 1715 for snapshot in self.state_reader.get_snapshots(prod.snapshots).values() 1716 } 1717 if prod 1718 else {} 1719 ) 1720 1721 local_nodes = {**(models_override or self._models), **self._standalone_audits} 1722 nodes = local_nodes.copy() 1723 audits = self._audits.copy() 1724 projects = {config.project for config in self.configs.values()} 1725 1726 for name, snapshot in remote_snapshots.items(): 1727 if name not in nodes and snapshot.node.project not in projects: 1728 nodes[name] = snapshot.node 1729 if snapshot.is_model: 1730 for audit in snapshot.audits: 1731 if name not in audits: 1732 audits[name] = audit 1733 1734 def _nodes_to_snapshots(nodes: t.Dict[str, Node]) -> t.Dict[str, Snapshot]: 1735 snapshots: t.Dict[str, Snapshot] = {} 1736 fingerprint_cache: t.Dict[str, SnapshotFingerprint] = {} 1737 1738 for node in nodes.values(): 1739 if node.fqn not in local_nodes and node.fqn in remote_snapshots: 1740 ttl = remote_snapshots[node.fqn].ttl 1741 else: 1742 config = self.config_for_node(node) 1743 ttl = config.snapshot_ttl 1744 1745 snapshot = Snapshot.from_node( 1746 node, 1747 nodes=nodes, 1748 audits=audits, 1749 cache=fingerprint_cache, 1750 ttl=ttl, 1751 ) 1752 snapshots[snapshot.name] = snapshot 1753 return snapshots 1754 1755 snapshots = _nodes_to_snapshots(nodes) 1756 stored_snapshots = self.state_reader.get_snapshots(snapshots.values()) 1757 1758 unrestorable_snapshots = { 1759 snapshot 1760 for snapshot in stored_snapshots.values() 1761 if snapshot.name in local_nodes and snapshot.unrestorable 1762 } 1763 if unrestorable_snapshots: 1764 for snapshot in unrestorable_snapshots: 1765 logger.info( 1766 "Found a unrestorable snapshot %s. Restamping the model...", snapshot.name 1767 ) 1768 node = local_nodes[snapshot.name] 1769 nodes[snapshot.name] = node.copy( 1770 update={"stamp": f"revert to {snapshot.identifier}"} 1771 ) 1772 snapshots = _nodes_to_snapshots(nodes) 1773 stored_snapshots = self.state_reader.get_snapshots(snapshots.values()) 1774 1775 for snapshot in stored_snapshots.values(): 1776 # Keep the original model instance to preserve the query cache. 1777 snapshot.node = snapshots[snapshot.name].node 1778 1779 return {name: stored_snapshots.get(s.snapshot_id, s) for name, s in snapshots.items()} 1780 1781 def _context_diff( 1782 self, 1783 environment: str, 1784 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 1785 create_from: t.Optional[str] = None, 1786 force_no_diff: bool = False, 1787 ensure_finalized_snapshots: bool = False, 1788 ) -> ContextDiff: 1789 environment = Environment.normalize_name(environment) 1790 if force_no_diff: 1791 return ContextDiff.create_no_diff(environment) 1792 return ContextDiff.create( 1793 environment, 1794 snapshots=snapshots or self.snapshots, 1795 create_from=create_from or c.PROD, 1796 state_reader=self.state_reader, 1797 ensure_finalized_snapshots=ensure_finalized_snapshots, 1798 ) 1799 1800 def _run_janitor(self) -> None: 1801 self._cleanup_environments() 1802 expired_snapshots = self.state_sync.delete_expired_snapshots() 1803 self.snapshot_evaluator.cleanup( 1804 expired_snapshots, on_complete=self.console.update_cleanup_progress 1805 ) 1806 1807 self.state_sync.compact_intervals() 1808 1809 def _cleanup_environments(self) -> None: 1810 expired_environments = self.state_sync.delete_expired_environments() 1811 cleanup_expired_views(self.engine_adapter, expired_environments, console=self.console) 1812 1813 def _try_connection(self, connection_name: str, engine_adapter: EngineAdapter) -> None: 1814 connection_name = connection_name.capitalize() 1815 try: 1816 engine_adapter.fetchall("SELECT 1") 1817 self.console.log_status_update(f"{connection_name} connection [green]succeeded[/green]") 1818 except Exception as ex: 1819 self.console.log_error(f"{connection_name} connection failed. {ex}") 1820 1821 def _new_state_sync(self) -> StateSync: 1822 return self._provided_state_sync or self._scheduler.create_state_sync(self) 1823 1824 def _new_selector(self) -> Selector: 1825 return Selector( 1826 self.state_reader, 1827 self._models, 1828 context_path=self.path, 1829 default_catalog=self.default_catalog, 1830 dialect=self.default_dialect, 1831 ) 1832 1833 def _register_notification_targets(self) -> None: 1834 event_notifications = collections.defaultdict(set) 1835 for target in self.notification_targets: 1836 if target.is_configured: 1837 for event in target.notify_on: 1838 event_notifications[event].add(target) 1839 user_notification_targets = { 1840 user.username: set( 1841 target for target in user.notification_targets if target.is_configured 1842 ) 1843 for user in self.users 1844 } 1845 self.notification_target_manager = NotificationTargetManager( 1846 event_notifications, user_notification_targets, username=self.config.username 1847 ) 1848 1849 1850class Context(GenericContext[Config]): 1851 CONFIG_TYPE = Config
131class BaseContext(abc.ABC): 132 """The base context which defines methods to execute a model.""" 133 134 @property 135 @abc.abstractmethod 136 def default_dialect(self) -> t.Optional[str]: 137 """Returns the default dialect.""" 138 139 @property 140 @abc.abstractmethod 141 def _model_tables(self) -> t.Dict[str, str]: 142 """Returns a mapping of model names to tables.""" 143 144 @property 145 @abc.abstractmethod 146 def engine_adapter(self) -> EngineAdapter: 147 """Returns an engine adapter.""" 148 149 @property 150 def spark(self) -> t.Optional[PySparkSession]: 151 """Returns the spark session if it exists.""" 152 return self.engine_adapter.spark 153 154 @property 155 def default_catalog(self) -> t.Optional[str]: 156 raise NotImplementedError 157 158 def table(self, model_name: str) -> str: 159 """Gets the physical table name for a given model. 160 161 Args: 162 model_name: The model name. 163 164 Returns: 165 The physical table name. 166 """ 167 model_name = normalize_model_name(model_name, self.default_catalog, self.default_dialect) 168 169 # We generate SQL for the default dialect because the table name may be used in a 170 # fetchdf call and so the quotes need to be correct (eg. backticks for bigquery) 171 return parse_one(self._model_tables[model_name]).sql(dialect=self.default_dialect) 172 173 def fetchdf( 174 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 175 ) -> pd.DataFrame: 176 """Fetches a dataframe given a sql string or sqlglot expression. 177 178 Args: 179 query: SQL string or sqlglot expression. 180 quote_identifiers: Whether to quote all identifiers in the query. 181 182 Returns: 183 The default dataframe is Pandas, but for Spark a PySpark dataframe is returned. 184 """ 185 return self.engine_adapter.fetchdf(query, quote_identifiers=quote_identifiers) 186 187 def fetch_pyspark_df( 188 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 189 ) -> PySparkDataFrame: 190 """Fetches a PySpark dataframe given a sql string or sqlglot expression. 191 192 Args: 193 query: SQL string or sqlglot expression. 194 quote_identifiers: Whether to quote all identifiers in the query. 195 196 Returns: 197 A PySpark dataframe. 198 """ 199 return self.engine_adapter.fetch_pyspark_df(query, quote_identifiers=quote_identifiers)
The base context which defines methods to execute a model.
158 def table(self, model_name: str) -> str: 159 """Gets the physical table name for a given model. 160 161 Args: 162 model_name: The model name. 163 164 Returns: 165 The physical table name. 166 """ 167 model_name = normalize_model_name(model_name, self.default_catalog, self.default_dialect) 168 169 # We generate SQL for the default dialect because the table name may be used in a 170 # fetchdf call and so the quotes need to be correct (eg. backticks for bigquery) 171 return parse_one(self._model_tables[model_name]).sql(dialect=self.default_dialect)
Gets the physical table name for a given model.
Arguments:
- model_name: The model name.
Returns:
The physical table name.
173 def fetchdf( 174 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 175 ) -> pd.DataFrame: 176 """Fetches a dataframe given a sql string or sqlglot expression. 177 178 Args: 179 query: SQL string or sqlglot expression. 180 quote_identifiers: Whether to quote all identifiers in the query. 181 182 Returns: 183 The default dataframe is Pandas, but for Spark a PySpark dataframe is returned. 184 """ 185 return self.engine_adapter.fetchdf(query, quote_identifiers=quote_identifiers)
Fetches a dataframe given a sql string or sqlglot expression.
Arguments:
- query: SQL string or sqlglot expression.
- quote_identifiers: Whether to quote all identifiers in the query.
Returns:
The default dataframe is Pandas, but for Spark a PySpark dataframe is returned.
187 def fetch_pyspark_df( 188 self, query: t.Union[exp.Expression, str], quote_identifiers: bool = False 189 ) -> PySparkDataFrame: 190 """Fetches a PySpark dataframe given a sql string or sqlglot expression. 191 192 Args: 193 query: SQL string or sqlglot expression. 194 quote_identifiers: Whether to quote all identifiers in the query. 195 196 Returns: 197 A PySpark dataframe. 198 """ 199 return self.engine_adapter.fetch_pyspark_df(query, quote_identifiers=quote_identifiers)
Fetches a PySpark dataframe given a sql string or sqlglot expression.
Arguments:
- query: SQL string or sqlglot expression.
- quote_identifiers: Whether to quote all identifiers in the query.
Returns:
A PySpark dataframe.
202class ExecutionContext(BaseContext): 203 """The minimal context needed to execute a model. 204 205 Args: 206 engine_adapter: The engine adapter to execute queries against. 207 snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations. 208 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 209 """ 210 211 def __init__( 212 self, 213 engine_adapter: EngineAdapter, 214 snapshots: t.Dict[str, Snapshot], 215 deployability_index: t.Optional[DeployabilityIndex] = None, 216 default_dialect: t.Optional[str] = None, 217 default_catalog: t.Optional[str] = None, 218 variables: t.Optional[t.Dict[str, t.Any]] = None, 219 ): 220 self.snapshots = snapshots 221 self.deployability_index = deployability_index 222 self._engine_adapter = engine_adapter 223 self._default_catalog = default_catalog 224 self._default_dialect = default_dialect 225 self._variables = variables or {} 226 227 @property 228 def default_dialect(self) -> t.Optional[str]: 229 return self._default_dialect 230 231 @property 232 def engine_adapter(self) -> EngineAdapter: 233 """Returns an engine adapter.""" 234 return self._engine_adapter 235 236 @cached_property 237 def _model_tables(self) -> t.Dict[str, str]: 238 """Returns a mapping of model names to tables.""" 239 return to_table_mapping(self.snapshots.values(), self.deployability_index) 240 241 @property 242 def default_catalog(self) -> t.Optional[str]: 243 return self._default_catalog 244 245 @property 246 def gateway(self) -> t.Optional[str]: 247 """Returns the gateway name.""" 248 return self.var(c.GATEWAY) 249 250 def var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]: 251 """Returns a variable value.""" 252 return self._variables.get(var_name.lower(), default) 253 254 def with_variables(self, variables: t.Dict[str, t.Any]) -> ExecutionContext: 255 """Returns a new ExecutionContext with additional variables.""" 256 return ExecutionContext( 257 self._engine_adapter, 258 self.snapshots, 259 self.deployability_index, 260 self._default_dialect, 261 self._default_catalog, 262 variables=variables, 263 )
The minimal context needed to execute a model.
Arguments:
- engine_adapter: The engine adapter to execute queries against.
- snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations.
- deployability_index: Determines snapshots that are deployable in the context of this evaluation.
211 def __init__( 212 self, 213 engine_adapter: EngineAdapter, 214 snapshots: t.Dict[str, Snapshot], 215 deployability_index: t.Optional[DeployabilityIndex] = None, 216 default_dialect: t.Optional[str] = None, 217 default_catalog: t.Optional[str] = None, 218 variables: t.Optional[t.Dict[str, t.Any]] = None, 219 ): 220 self.snapshots = snapshots 221 self.deployability_index = deployability_index 222 self._engine_adapter = engine_adapter 223 self._default_catalog = default_catalog 224 self._default_dialect = default_dialect 225 self._variables = variables or {}
250 def var(self, var_name: str, default: t.Optional[t.Any] = None) -> t.Optional[t.Any]: 251 """Returns a variable value.""" 252 return self._variables.get(var_name.lower(), default)
Returns a variable value.
254 def with_variables(self, variables: t.Dict[str, t.Any]) -> ExecutionContext: 255 """Returns a new ExecutionContext with additional variables.""" 256 return ExecutionContext( 257 self._engine_adapter, 258 self.snapshots, 259 self.deployability_index, 260 self._default_dialect, 261 self._default_catalog, 262 variables=variables, 263 )
Returns a new ExecutionContext with additional variables.
Inherited Members
266class GenericContext(BaseContext, t.Generic[C]): 267 """Encapsulates a SQLMesh environment supplying convenient functions to perform various tasks. 268 269 Args: 270 engine_adapter: The default engine adapter to use. 271 notification_targets: The notification target to use. Defaults to what is defined in config. 272 paths: The directories containing SQLMesh files. 273 config: A Config object or the name of a Config object in config.py. 274 connection: The name of the connection. If not specified the first connection as it appears 275 in configuration will be used. 276 test_connection: The name of the connection to use for tests. If not specified the first 277 connection as it appears in configuration will be used. 278 concurrent_tasks: The maximum number of tasks that can use the connection concurrently. 279 load: Whether or not to automatically load all models and macros (default True). 280 console: The rich instance used for printing out CLI command results. 281 users: A list of users to make known to SQLMesh. 282 config_type: The type of config object to use (default Config). 283 """ 284 285 CONFIG_TYPE: t.Type[C] 286 287 def __init__( 288 self, 289 engine_adapter: t.Optional[EngineAdapter] = None, 290 notification_targets: t.Optional[t.List[NotificationTarget]] = None, 291 state_sync: t.Optional[StateSync] = None, 292 paths: t.Union[str | Path, t.Iterable[str | Path]] = "", 293 config: t.Optional[t.Union[C, str, t.Dict[Path, C]]] = None, 294 gateway: t.Optional[str] = None, 295 concurrent_tasks: t.Optional[int] = None, 296 loader: t.Optional[t.Type[Loader]] = None, 297 load: bool = True, 298 console: t.Optional[Console] = None, 299 users: t.Optional[t.List[User]] = None, 300 ): 301 self.console = console or get_console() 302 self.configs = ( 303 config if isinstance(config, dict) else load_configs(config, self.CONFIG_TYPE, paths) 304 ) 305 self.dag: DAG[str] = DAG() 306 self._models: UniqueKeyDict[str, Model] = UniqueKeyDict("models") 307 self._audits: UniqueKeyDict[str, Audit] = UniqueKeyDict("audits") 308 self._standalone_audits: UniqueKeyDict[str, StandaloneAudit] = UniqueKeyDict( 309 "standaloneaudits" 310 ) 311 self._macros: UniqueKeyDict[str, ExecutableOrMacro] = UniqueKeyDict("macros") 312 self._metrics: UniqueKeyDict[str, Metric] = UniqueKeyDict("metrics") 313 self._jinja_macros = JinjaMacroRegistry() 314 self._default_catalog: t.Optional[str] = None 315 316 self.path, self.config = t.cast(t.Tuple[Path, C], next(iter(self.configs.items()))) 317 318 self.gateway = gateway 319 self._scheduler = self.config.get_scheduler(self.gateway) 320 self.environment_ttl = self.config.environment_ttl 321 self.pinned_environments = Environment.normalize_names(self.config.pinned_environments) 322 self.auto_categorize_changes = self.config.plan.auto_categorize_changes 323 324 self._connection_config = self.config.get_connection(self.gateway) 325 self.concurrent_tasks = concurrent_tasks or self._connection_config.concurrent_tasks 326 self._engine_adapter = engine_adapter or self._connection_config.create_engine_adapter() 327 328 test_connection_config = self.config.get_test_connection( 329 self.gateway, self.default_catalog, default_catalog_dialect=self.engine_adapter.DIALECT 330 ) 331 self._test_engine_adapter = test_connection_config.create_engine_adapter( 332 register_comments_override=False 333 ) 334 335 self._snapshot_evaluator: t.Optional[SnapshotEvaluator] = None 336 337 self._provided_state_sync: t.Optional[StateSync] = state_sync 338 self._state_sync: t.Optional[StateSync] = None 339 340 self._loader = (loader or self.config.loader)(**self.config.loader_kwargs) 341 342 # Should we dedupe notification_targets? If so how? 343 self.notification_targets = (notification_targets or []) + self.config.notification_targets 344 self.users = (users or []) + self.config.users 345 self.users = list({user.username: user for user in self.users}.values()) 346 self._register_notification_targets() 347 348 if ( 349 self.config.environment_catalog_mapping 350 and not self.engine_adapter.CATALOG_SUPPORT.is_multi_catalog_supported 351 ): 352 raise SQLMeshError( 353 "Environment catalog mapping is only supported for engine adapters that support multiple catalogs" 354 ) 355 356 if load: 357 self.load() 358 359 @property 360 def default_dialect(self) -> t.Optional[str]: 361 return self.config.dialect 362 363 @property 364 def engine_adapter(self) -> EngineAdapter: 365 """Returns an engine adapter.""" 366 return self._engine_adapter 367 368 @property 369 def snapshot_evaluator(self) -> SnapshotEvaluator: 370 if not self._snapshot_evaluator: 371 self._snapshot_evaluator = SnapshotEvaluator( 372 self.engine_adapter.with_log_level(logging.INFO), 373 ddl_concurrent_tasks=self.concurrent_tasks, 374 ) 375 return self._snapshot_evaluator 376 377 def execution_context( 378 self, deployability_index: t.Optional[DeployabilityIndex] = None 379 ) -> ExecutionContext: 380 """Returns an execution context.""" 381 return ExecutionContext( 382 engine_adapter=self._engine_adapter, 383 snapshots=self.snapshots, 384 deployability_index=deployability_index, 385 default_dialect=self.default_dialect, 386 default_catalog=self.default_catalog, 387 ) 388 389 def upsert_model(self, model: t.Union[str, Model], **kwargs: t.Any) -> Model: 390 """Update or insert a model. 391 392 The context's models dictionary will be updated to include these changes. 393 394 Args: 395 model: Model name or instance to update. 396 kwargs: The kwargs to update the model with. 397 398 Returns: 399 A new instance of the updated or inserted model. 400 """ 401 model = self.get_model(model, raise_if_missing=True) 402 path = model._path 403 404 # model.copy() can't be used here due to a cached state that can be a part of a model instance. 405 model = t.cast(Model, type(model)(**{**t.cast(Model, model).dict(), **kwargs})) 406 model._path = path 407 408 self._models.update({model.fqn: model}) 409 self.dag.add(model.fqn, model.depends_on) 410 update_model_schemas( 411 self.dag, 412 self._models, 413 self.path, 414 ) 415 416 model.validate_definition() 417 418 return model 419 420 def scheduler(self, environment: t.Optional[str] = None) -> Scheduler: 421 """Returns the built-in scheduler. 422 423 Args: 424 environment: The target environment to source model snapshots from, or None 425 if snapshots should be sourced from the currently loaded local state. 426 427 Returns: 428 The built-in scheduler instance. 429 """ 430 snapshots: t.Iterable[Snapshot] 431 if environment is not None: 432 stored_environment = self.state_sync.get_environment(environment) 433 if stored_environment is None: 434 raise ConfigError(f"Environment '{environment}' was not found.") 435 snapshots = self.state_sync.get_snapshots(stored_environment.snapshots).values() 436 else: 437 snapshots = self.snapshots.values() 438 439 if not snapshots: 440 raise ConfigError("No models were found") 441 442 return Scheduler( 443 snapshots, 444 self.snapshot_evaluator, 445 self.state_sync, 446 default_catalog=self.default_catalog, 447 max_workers=self.concurrent_tasks, 448 console=self.console, 449 notification_target_manager=self.notification_target_manager, 450 ) 451 452 @property 453 def state_sync(self) -> StateSync: 454 if not self._state_sync: 455 self._state_sync = self._new_state_sync() 456 457 if self._state_sync.get_versions(validate=False).schema_version == 0: 458 self._state_sync.migrate(default_catalog=self.default_catalog) 459 self._state_sync.get_versions() 460 self._state_sync = CachingStateSync(self._state_sync) # type: ignore 461 return self._state_sync 462 463 @property 464 def state_reader(self) -> StateReader: 465 return self.state_sync 466 467 def refresh(self) -> None: 468 """Refresh all models that have been updated.""" 469 if self._loader.reload_needed(): 470 self.load() 471 472 def load(self, update_schemas: bool = True) -> GenericContext[C]: 473 """Load all files in the context's path.""" 474 with sys_path(*self.configs): 475 gc.disable() 476 project = self._loader.load(self, update_schemas) 477 self._macros = project.macros 478 self._jinja_macros = project.jinja_macros 479 self._models = project.models 480 self._metrics = project.metrics 481 self._standalone_audits.clear() 482 self._audits.clear() 483 for name, audit in project.audits.items(): 484 if isinstance(audit, StandaloneAudit): 485 self._standalone_audits[name] = audit 486 else: 487 self._audits[name] = audit 488 self.dag = project.dag 489 gc.enable() 490 491 duplicates = set(self._models) & set(self._standalone_audits) 492 if duplicates: 493 raise ConfigError( 494 f"Models and Standalone audits cannot have the same name: {duplicates}" 495 ) 496 497 return self 498 499 def run( 500 self, 501 environment: t.Optional[str] = None, 502 *, 503 start: t.Optional[TimeLike] = None, 504 end: t.Optional[TimeLike] = None, 505 execution_time: t.Optional[TimeLike] = None, 506 skip_janitor: bool = False, 507 ignore_cron: bool = False, 508 ) -> bool: 509 """Run the entire dag through the scheduler. 510 511 Args: 512 environment: The target environment to source model snapshots from and virtually update. Default: prod. 513 start: The start of the interval to render. 514 end: The end of the interval to render. 515 execution_time: The date/time time reference to use for execution time. Defaults to now. 516 skip_janitor: Whether to skip the janitor task. 517 ignore_cron: Whether to ignore the model's cron schedule and run all available missing intervals. 518 519 Returns: 520 True if the run was successful, False otherwise. 521 """ 522 environment = environment or self.config.default_target_environment 523 524 self.notification_target_manager.notify( 525 NotificationEvent.RUN_START, environment=environment 526 ) 527 success = False 528 try: 529 success = self._run( 530 environment=environment, 531 start=start, 532 end=end, 533 execution_time=execution_time, 534 skip_janitor=skip_janitor, 535 ignore_cron=ignore_cron, 536 ) 537 except Exception as e: 538 self.notification_target_manager.notify( 539 NotificationEvent.RUN_FAILURE, traceback.format_exc() 540 ) 541 logger.error(f"Run Failure: {traceback.format_exc()}") 542 raise e 543 544 if success: 545 self.notification_target_manager.notify( 546 NotificationEvent.RUN_END, environment=environment 547 ) 548 self.console.log_success(f"Run finished for environment '{environment}'") 549 else: 550 self.notification_target_manager.notify( 551 NotificationEvent.RUN_FAILURE, "See console logs for details." 552 ) 553 554 return success 555 556 @t.overload 557 def get_model( 558 self, model_or_snapshot: ModelOrSnapshot, raise_if_missing: Literal[True] = True 559 ) -> Model: ... 560 561 @t.overload 562 def get_model( 563 self, 564 model_or_snapshot: ModelOrSnapshot, 565 raise_if_missing: Literal[False] = False, 566 ) -> t.Optional[Model]: ... 567 568 def get_model( 569 self, model_or_snapshot: ModelOrSnapshot, raise_if_missing: bool = False 570 ) -> t.Optional[Model]: 571 """Returns a model with the given name or None if a model with such name doesn't exist. 572 573 Args: 574 model_or_snapshot: A model name, model, or snapshot. 575 raise_if_missing: Raises an error if a model is not found. 576 577 Returns: 578 The expected model. 579 """ 580 if isinstance(model_or_snapshot, str): 581 normalized_name = normalize_model_name( 582 model_or_snapshot, 583 dialect=self.default_dialect, 584 default_catalog=self.default_catalog, 585 ) 586 model = self._models.get(normalized_name) 587 elif isinstance(model_or_snapshot, Snapshot): 588 model = model_or_snapshot.model 589 else: 590 model = model_or_snapshot 591 592 if raise_if_missing and not model: 593 raise SQLMeshError(f"Cannot find model for '{model_or_snapshot}'") 594 595 return model 596 597 @t.overload 598 def get_snapshot(self, node_or_snapshot: NodeOrSnapshot) -> t.Optional[Snapshot]: ... 599 600 @t.overload 601 def get_snapshot( 602 self, node_or_snapshot: NodeOrSnapshot, raise_if_missing: Literal[True] 603 ) -> Snapshot: ... 604 605 @t.overload 606 def get_snapshot( 607 self, node_or_snapshot: NodeOrSnapshot, raise_if_missing: Literal[False] 608 ) -> t.Optional[Snapshot]: ... 609 610 def get_snapshot( 611 self, node_or_snapshot: NodeOrSnapshot, raise_if_missing: bool = False 612 ) -> t.Optional[Snapshot]: 613 """Returns a snapshot with the given name or None if a snapshot with such name doesn't exist. 614 615 Args: 616 node_or_snapshot: A node name, node, or snapshot. 617 raise_if_missing: Raises an error if a snapshot is not found. 618 619 Returns: 620 The expected snapshot. 621 """ 622 if isinstance(node_or_snapshot, Snapshot): 623 return node_or_snapshot 624 if isinstance(node_or_snapshot, str) and not self.standalone_audits.get(node_or_snapshot): 625 node_or_snapshot = normalize_model_name( 626 node_or_snapshot, 627 dialect=self.default_dialect, 628 default_catalog=self.default_catalog, 629 ) 630 fqn = node_or_snapshot if isinstance(node_or_snapshot, str) else node_or_snapshot.fqn 631 snapshot = self.snapshots.get(fqn) 632 633 if raise_if_missing and not snapshot: 634 raise SQLMeshError(f"Cannot find snapshot for '{fqn}'") 635 636 return snapshot 637 638 def config_for_path(self, path: Path) -> Config: 639 for config_path, config in self.configs.items(): 640 try: 641 path.relative_to(config_path) 642 return config 643 except ValueError: 644 pass 645 return self.config 646 647 def config_for_node(self, node: str | Model | StandaloneAudit) -> Config: 648 if isinstance(node, str): 649 return self.config_for_path(self.get_snapshot(node, raise_if_missing=True).node._path) # type: ignore 650 return self.config_for_path(node._path) # type: ignore 651 652 @property 653 def models(self) -> MappingProxyType[str, Model]: 654 """Returns all registered models in this context.""" 655 return MappingProxyType(self._models) 656 657 @property 658 def metrics(self) -> MappingProxyType[str, Metric]: 659 """Returns all registered metrics in this context.""" 660 return MappingProxyType(self._metrics) 661 662 @property 663 def standalone_audits(self) -> MappingProxyType[str, StandaloneAudit]: 664 """Returns all registered standalone audits in this context.""" 665 return MappingProxyType(self._standalone_audits) 666 667 @property 668 def snapshots(self) -> t.Dict[str, Snapshot]: 669 """Generates and returns snapshots based on models registered in this context. 670 671 If one of the snapshots has been previously stored in the persisted state, the stored 672 instance will be returned. 673 """ 674 return self._snapshots() 675 676 @property 677 def default_catalog(self) -> t.Optional[str]: 678 if self._default_catalog is None: 679 self._default_catalog = self._scheduler.get_default_catalog(self) 680 return self._default_catalog 681 682 def render( 683 self, 684 model_or_snapshot: ModelOrSnapshot, 685 *, 686 start: t.Optional[TimeLike] = None, 687 end: t.Optional[TimeLike] = None, 688 execution_time: t.Optional[TimeLike] = None, 689 expand: t.Union[bool, t.Iterable[str]] = False, 690 **kwargs: t.Any, 691 ) -> exp.Expression: 692 """Renders a model's query, expanding macros with provided kwargs, and optionally expanding referenced models. 693 694 Args: 695 model_or_snapshot: The model, model name, or snapshot to render. 696 start: The start of the interval to render. 697 end: The end of the interval to render. 698 execution_time: The date/time time reference to use for execution time. Defaults to now. 699 expand: Whether or not to use expand materialized models, defaults to False. 700 If True, all referenced models are expanded as raw queries. 701 If a list, only referenced models are expanded as raw queries. 702 703 Returns: 704 The rendered expression. 705 """ 706 execution_time = execution_time or now_ds() 707 708 model = self.get_model(model_or_snapshot, raise_if_missing=True) 709 710 if expand and not isinstance(expand, bool): 711 expand = { 712 normalize_model_name( 713 x, default_catalog=self.default_catalog, dialect=self.default_dialect 714 ) 715 for x in expand 716 } 717 718 expand = self.dag.upstream(model.fqn) if expand is True else expand or [] 719 720 if model.is_seed: 721 df = next( 722 model.render( 723 context=self.execution_context(), 724 start=start, 725 end=end, 726 execution_time=execution_time, 727 **kwargs, 728 ) 729 ) 730 return next(pandas_to_sql(t.cast(pd.DataFrame, df), model.columns_to_types)) 731 732 return model.render_query_or_raise( 733 start=start, 734 end=end, 735 execution_time=execution_time, 736 snapshots=self.snapshots, 737 expand=expand, 738 engine_adapter=self.engine_adapter, 739 **kwargs, 740 ) 741 742 def evaluate( 743 self, 744 model_or_snapshot: ModelOrSnapshot, 745 start: TimeLike, 746 end: TimeLike, 747 execution_time: TimeLike, 748 limit: t.Optional[int] = None, 749 **kwargs: t.Any, 750 ) -> DF: 751 """Evaluate a model or snapshot (running its query against a DB/Engine). 752 753 This method is used to test or iterate on models without side effects. 754 755 Args: 756 model_or_snapshot: The model, model name, or snapshot to render. 757 start: The start of the interval to evaluate. 758 end: The end of the interval to evaluate. 759 execution_time: The date/time time reference to use for execution time. 760 limit: A limit applied to the model. 761 """ 762 snapshot = self.get_snapshot(model_or_snapshot, raise_if_missing=True) 763 764 df = self.snapshot_evaluator.evaluate_and_fetch( 765 snapshot, 766 start=start, 767 end=end, 768 execution_time=execution_time, 769 snapshots=self.snapshots, 770 limit=limit or c.DEFAULT_MAX_LIMIT, 771 ) 772 773 if df is None: 774 raise RuntimeError(f"Error evaluating {snapshot.name}") 775 776 return df 777 778 def format( 779 self, 780 transpile: t.Optional[str] = None, 781 append_newline: t.Optional[bool] = None, 782 **kwargs: t.Any, 783 ) -> None: 784 """Format all SQL models.""" 785 for model in self._models.values(): 786 if not model._path.suffix == ".sql": 787 continue 788 with open(model._path, "r+", encoding="utf-8") as file: 789 expressions = parse( 790 file.read(), default_dialect=self.config_for_node(model).dialect 791 ) 792 if transpile: 793 for prop in expressions[0].expressions: 794 if prop.name.lower() == "dialect": 795 prop.replace( 796 exp.Property( 797 this="dialect", 798 value=exp.Literal.string(transpile or model.dialect), 799 ) 800 ) 801 format = self.config_for_node(model).format 802 opts = {**format.generator_options, **kwargs} 803 file.seek(0) 804 file.write( 805 format_model_expressions(expressions, transpile or model.dialect, **opts) 806 ) 807 if append_newline is None: 808 append_newline = format.append_newline 809 if append_newline: 810 file.write("\n") 811 file.truncate() 812 813 def plan( 814 self, 815 environment: t.Optional[str] = None, 816 *, 817 start: t.Optional[TimeLike] = None, 818 end: t.Optional[TimeLike] = None, 819 execution_time: t.Optional[TimeLike] = None, 820 create_from: t.Optional[str] = None, 821 skip_tests: bool = False, 822 restate_models: t.Optional[t.Iterable[str]] = None, 823 no_gaps: bool = False, 824 skip_backfill: bool = False, 825 forward_only: t.Optional[bool] = None, 826 no_prompts: t.Optional[bool] = None, 827 auto_apply: t.Optional[bool] = None, 828 no_auto_categorization: t.Optional[bool] = None, 829 effective_from: t.Optional[TimeLike] = None, 830 include_unmodified: t.Optional[bool] = None, 831 select_models: t.Optional[t.Collection[str]] = None, 832 backfill_models: t.Optional[t.Collection[str]] = None, 833 categorizer_config: t.Optional[CategorizerConfig] = None, 834 enable_preview: t.Optional[bool] = None, 835 no_diff: t.Optional[bool] = None, 836 run: bool = False, 837 ) -> Plan: 838 """Interactively creates a plan. 839 840 This method compares the current context with the target environment. It then presents 841 the differences and asks whether to backfill each modified model. 842 843 Args: 844 environment: The environment to diff and plan against. 845 start: The start date of the backfill if there is one. 846 end: The end date of the backfill if there is one. 847 execution_time: The date/time reference to use for execution time. Defaults to now. 848 create_from: The environment to create the target environment from if it 849 doesn't exist. If not specified, the "prod" environment will be used. 850 skip_tests: Unit tests are run by default so this will skip them if enabled 851 restate_models: A list of either internal or external models, or tags, that need to be restated 852 for the given plan interval. If the target environment is a production environment, 853 ALL snapshots that depended on these upstream tables will have their intervals deleted 854 (even ones not in this current environment). Only the snapshots in this environment will 855 be backfilled whereas others need to be recovered on a future plan application. For development 856 environments only snapshots that are part of this plan will be affected. 857 no_gaps: Whether to ensure that new snapshots for models that are already a 858 part of the target environment have no data gaps when compared against previous 859 snapshots for same models. 860 skip_backfill: Whether to skip the backfill step. Default: False. 861 forward_only: Whether the purpose of the plan is to make forward only changes. 862 no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that 863 if this flag is set to true and there are uncategorized changes the plan creation will 864 fail. Default: False. 865 auto_apply: Whether to automatically apply the new plan after creation. Default: False. 866 no_auto_categorization: Indicates whether to disable automatic categorization of model 867 changes (breaking / non-breaking). If not provided, then the corresponding configuration 868 option determines the behavior. 869 categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the 870 project config by default. 871 effective_from: The effective date from which to apply forward-only changes on production. 872 include_unmodified: Indicates whether to include unmodified models in the target development environment. 873 select_models: A list of model selection strings to filter the models that should be included into this plan. 874 backfill_models: A list of model selection strings to filter the models for which the data should be backfilled. 875 enable_preview: Indicates whether to enable preview for forward-only models in development environments. 876 no_diff: Hide text differences for changed models. 877 run: Whether to run latest intervals as part of the plan application. 878 879 Returns: 880 The populated Plan object. 881 """ 882 plan_builder = self.plan_builder( 883 environment, 884 start=start, 885 end=end, 886 execution_time=execution_time, 887 create_from=create_from, 888 skip_tests=skip_tests, 889 restate_models=restate_models, 890 no_gaps=no_gaps, 891 skip_backfill=skip_backfill, 892 forward_only=forward_only, 893 no_auto_categorization=no_auto_categorization, 894 effective_from=effective_from, 895 include_unmodified=include_unmodified, 896 select_models=select_models, 897 backfill_models=backfill_models, 898 categorizer_config=categorizer_config, 899 enable_preview=enable_preview, 900 run=run, 901 ) 902 903 self.console.plan( 904 plan_builder, 905 auto_apply if auto_apply is not None else self.config.plan.auto_apply, 906 self.default_catalog, 907 no_diff=no_diff if no_diff is not None else self.config.plan.no_diff, 908 no_prompts=no_prompts if no_prompts is not None else self.config.plan.no_prompts, 909 ) 910 911 return plan_builder.build() 912 913 def plan_builder( 914 self, 915 environment: t.Optional[str] = None, 916 *, 917 start: t.Optional[TimeLike] = None, 918 end: t.Optional[TimeLike] = None, 919 execution_time: t.Optional[TimeLike] = None, 920 create_from: t.Optional[str] = None, 921 skip_tests: bool = False, 922 restate_models: t.Optional[t.Iterable[str]] = None, 923 no_gaps: bool = False, 924 skip_backfill: bool = False, 925 forward_only: t.Optional[bool] = None, 926 no_auto_categorization: t.Optional[bool] = None, 927 effective_from: t.Optional[TimeLike] = None, 928 include_unmodified: t.Optional[bool] = None, 929 select_models: t.Optional[t.Collection[str]] = None, 930 backfill_models: t.Optional[t.Collection[str]] = None, 931 categorizer_config: t.Optional[CategorizerConfig] = None, 932 enable_preview: t.Optional[bool] = None, 933 run: bool = False, 934 ) -> PlanBuilder: 935 """Creates a plan builder. 936 937 Args: 938 environment: The environment to diff and plan against. 939 start: The start date of the backfill if there is one. 940 end: The end date of the backfill if there is one. 941 execution_time: The date/time reference to use for execution time. Defaults to now. 942 create_from: The environment to create the target environment from if it 943 doesn't exist. If not specified, the "prod" environment will be used. 944 skip_tests: Unit tests are run by default so this will skip them if enabled 945 restate_models: A list of either internal or external models, or tags, that need to be restated 946 for the given plan interval. If the target environment is a production environment, 947 ALL snapshots that depended on these upstream tables will have their intervals deleted 948 (even ones not in this current environment). Only the snapshots in this environment will 949 be backfilled whereas others need to be recovered on a future plan application. For development 950 environments only snapshots that are part of this plan will be affected. 951 no_gaps: Whether to ensure that new snapshots for models that are already a 952 part of the target environment have no data gaps when compared against previous 953 snapshots for same models. 954 skip_backfill: Whether to skip the backfill step. Default: False. 955 forward_only: Whether the purpose of the plan is to make forward only changes. 956 no_auto_categorization: Indicates whether to disable automatic categorization of model 957 changes (breaking / non-breaking). If not provided, then the corresponding configuration 958 option determines the behavior. 959 categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the 960 project config by default. 961 effective_from: The effective date from which to apply forward-only changes on production. 962 include_unmodified: Indicates whether to include unmodified models in the target development environment. 963 select_models: A list of model selection strings to filter the models that should be included into this plan. 964 backfill_models: A list of model selection strings to filter the models for which the data should be backfilled. 965 enable_preview: Indicates whether to enable preview for forward-only models in development environments. 966 run: Whether to run latest intervals as part of the plan application. 967 968 Returns: 969 The plan builder. 970 """ 971 environment = environment or self.config.default_target_environment 972 environment = Environment.normalize_name(environment) 973 is_dev = environment != c.PROD 974 975 if skip_backfill and not no_gaps and not is_dev: 976 raise ConfigError( 977 "When targeting the production environment either the backfill should not be skipped or the lack of data gaps should be enforced (--no-gaps flag)." 978 ) 979 980 if run and is_dev: 981 raise ConfigError("The '--run' flag is only supported for the production environment.") 982 983 self._run_plan_tests(skip_tests=skip_tests) 984 985 environment_ttl = ( 986 self.environment_ttl if environment not in self.pinned_environments else None 987 ) 988 989 model_selector = self._new_selector() 990 991 if backfill_models: 992 backfill_models = model_selector.expand_model_selections(backfill_models) 993 else: 994 backfill_models = None 995 996 models_override: t.Optional[UniqueKeyDict[str, Model]] = None 997 if select_models: 998 models_override = model_selector.select_models( 999 select_models, 1000 environment, 1001 fallback_env_name=create_from or c.PROD, 1002 ensure_finalized_snapshots=self.config.plan.use_finalized_state, 1003 ) 1004 if not backfill_models: 1005 # Only backfill selected models unless explicitly specified. 1006 backfill_models = model_selector.expand_model_selections(select_models) 1007 1008 expanded_restate_models = None 1009 if restate_models is not None: 1010 expanded_restate_models = model_selector.expand_model_selections(restate_models) 1011 if not expanded_restate_models: 1012 self.console.log_error( 1013 f"Provided restated models do not match any models. No models will be included in plan. Provided: {', '.join(restate_models)}" 1014 ) 1015 1016 snapshots = self._snapshots(models_override) 1017 context_diff = self._context_diff( 1018 environment or c.PROD, 1019 snapshots=snapshots, 1020 create_from=create_from, 1021 force_no_diff=(restate_models is not None and not expanded_restate_models) 1022 or (backfill_models is not None and not backfill_models), 1023 ensure_finalized_snapshots=self.config.plan.use_finalized_state, 1024 ) 1025 1026 # If no end date is specified, use the max interval end from prod 1027 # to prevent unintended evaluation of the entire DAG. 1028 if not run: 1029 if backfill_models is not None: 1030 # Only consider selected models for the default end value. 1031 models_for_default_end = backfill_models.copy() 1032 for name in backfill_models: 1033 if name not in snapshots: 1034 continue 1035 snapshot = snapshots[name] 1036 snapshot_id = snapshot.snapshot_id 1037 if ( 1038 snapshot_id in context_diff.added 1039 and snapshot_id in context_diff.new_snapshots 1040 ): 1041 # If the selected model is a newly added model, then we should narrow down the intervals 1042 # that should be considered for the default plan end value by including its parents. 1043 models_for_default_end |= {s.name for s in snapshot.parents} 1044 default_end = self.state_sync.greatest_common_interval_end( 1045 c.PROD, 1046 models_for_default_end, 1047 ensure_finalized_snapshots=self.config.plan.use_finalized_state, 1048 ) 1049 else: 1050 default_end = self.state_sync.max_interval_end_for_environment( 1051 c.PROD, ensure_finalized_snapshots=self.config.plan.use_finalized_state 1052 ) 1053 else: 1054 default_end = None 1055 1056 default_start = to_date(default_end) - timedelta(days=1) if default_end and is_dev else None 1057 1058 return PlanBuilder( 1059 context_diff=context_diff, 1060 start=start, 1061 end=end, 1062 execution_time=execution_time, 1063 apply=self.apply, 1064 restate_models=expanded_restate_models, 1065 backfill_models=backfill_models, 1066 no_gaps=no_gaps, 1067 skip_backfill=skip_backfill, 1068 is_dev=is_dev, 1069 forward_only=( 1070 forward_only if forward_only is not None else self.config.plan.forward_only 1071 ), 1072 environment_ttl=environment_ttl, 1073 environment_suffix_target=self.config.environment_suffix_target, 1074 environment_catalog_mapping=self.config.environment_catalog_mapping, 1075 categorizer_config=categorizer_config or self.auto_categorize_changes, 1076 auto_categorization_enabled=not no_auto_categorization, 1077 effective_from=effective_from, 1078 include_unmodified=( 1079 include_unmodified 1080 if include_unmodified is not None 1081 else self.config.plan.include_unmodified 1082 ), 1083 default_start=default_start, 1084 default_end=default_end, 1085 enable_preview=( 1086 enable_preview if enable_preview is not None else self.config.plan.enable_preview 1087 ), 1088 end_bounded=not run, 1089 ensure_finalized_snapshots=self.config.plan.use_finalized_state, 1090 ) 1091 1092 def apply( 1093 self, 1094 plan: Plan, 1095 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 1096 ) -> None: 1097 """Applies a plan by pushing snapshots and backfilling data. 1098 1099 Given a plan, it pushes snapshots into the state sync and then uses the scheduler 1100 to backfill all models. 1101 1102 Args: 1103 plan: The plan to apply. 1104 circuit_breaker: An optional handler which checks if the apply should be aborted. 1105 """ 1106 if ( 1107 not plan.context_diff.has_changes 1108 and not plan.requires_backfill 1109 and not plan.has_unmodified_unpromoted 1110 ): 1111 return 1112 if plan.uncategorized: 1113 raise UncategorizedPlanError("Can't apply a plan with uncategorized changes.") 1114 self.notification_target_manager.notify( 1115 NotificationEvent.APPLY_START, 1116 environment=plan.environment_naming_info.name, 1117 plan_id=plan.plan_id, 1118 ) 1119 try: 1120 self._apply(plan, circuit_breaker) 1121 except Exception as e: 1122 self.notification_target_manager.notify( 1123 NotificationEvent.APPLY_FAILURE, 1124 environment=plan.environment_naming_info.name, 1125 plan_id=plan.plan_id, 1126 exc=traceback.format_exc(), 1127 ) 1128 logger.error(f"Apply Failure: {traceback.format_exc()}") 1129 raise e 1130 self.notification_target_manager.notify( 1131 NotificationEvent.APPLY_END, 1132 environment=plan.environment_naming_info.name, 1133 plan_id=plan.plan_id, 1134 ) 1135 1136 def invalidate_environment(self, name: str, sync: bool = False) -> None: 1137 """Invalidates the target environment by setting its expiration timestamp to now. 1138 1139 Args: 1140 name: The name of the environment to invalidate. 1141 sync: If True, the call blocks until the environment is deleted. Otherwise, the environment will 1142 be deleted asynchronously by the janitor process. 1143 """ 1144 self.state_sync.invalidate_environment(name) 1145 if sync: 1146 self._cleanup_environments() 1147 self.console.log_success(f"Environment '{name}' has been deleted.") 1148 else: 1149 self.console.log_success(f"Environment '{name}' has been invalidated.") 1150 1151 def diff(self, environment: t.Optional[str] = None, detailed: bool = False) -> bool: 1152 """Show a diff of the current context with a given environment. 1153 1154 Args: 1155 environment: The environment to diff against. 1156 detailed: Show the actual SQL differences if True. 1157 1158 Returns: 1159 True if there are changes, False otherwise. 1160 """ 1161 environment = environment or self.config.default_target_environment 1162 environment = Environment.normalize_name(environment) 1163 context_diff = self._context_diff(environment) 1164 self.console.show_model_difference_summary( 1165 context_diff, 1166 EnvironmentNamingInfo.from_environment_catalog_mapping( 1167 self.config.environment_catalog_mapping, 1168 name=environment, 1169 suffix_target=self.config.environment_suffix_target, 1170 ), 1171 self.default_catalog, 1172 no_diff=not detailed, 1173 ) 1174 return context_diff.has_changes 1175 1176 def table_diff( 1177 self, 1178 source: str, 1179 target: str, 1180 on: t.List[str] | exp.Condition | None = None, 1181 model_or_snapshot: t.Optional[ModelOrSnapshot] = None, 1182 where: t.Optional[str | exp.Condition] = None, 1183 limit: int = 20, 1184 show: bool = True, 1185 show_sample: bool = True, 1186 ) -> TableDiff: 1187 """Show a diff between two tables. 1188 1189 Args: 1190 source: The source environment or table. 1191 target: The target environment or table. 1192 on: The join condition, table aliases must be "s" and "t" for source and target. 1193 If omitted, the table's grain will be used. 1194 model_or_snapshot: The model or snapshot to use when environments are passed in. 1195 where: An optional where statement to filter results. 1196 limit: The limit of the sample dataframe. 1197 show: Show the table diff output in the console. 1198 show_sample: Show the sample dataframe in the console. Requires show=True. 1199 1200 Returns: 1201 The TableDiff object containing schema and summary differences. 1202 """ 1203 source_alias, target_alias = source, target 1204 if model_or_snapshot: 1205 model = self.get_model(model_or_snapshot, raise_if_missing=True) 1206 source_env = self.state_reader.get_environment(source) 1207 target_env = self.state_reader.get_environment(target) 1208 1209 if not source_env: 1210 raise SQLMeshError(f"Could not find environment '{source}'") 1211 if not target_env: 1212 raise SQLMeshError(f"Could not find environment '{target}')") 1213 1214 source = next( 1215 snapshot for snapshot in source_env.snapshots if snapshot.name == model.fqn 1216 ).table_name() 1217 target = next( 1218 snapshot for snapshot in target_env.snapshots if snapshot.name == model.fqn 1219 ).table_name() 1220 source_alias = source_env.name 1221 target_alias = target_env.name 1222 1223 if not on: 1224 for ref in model.all_references: 1225 if ref.unique: 1226 on = ref.columns 1227 1228 if not on: 1229 raise SQLMeshError( 1230 "SQLMesh doesn't know how to join the two tables. Specify the `grains` in each model definition or pass join column names in separate `-o` flags." 1231 ) 1232 1233 table_diff = TableDiff( 1234 adapter=self._engine_adapter, 1235 source=source, 1236 target=target, 1237 on=on, 1238 where=where, 1239 source_alias=source_alias, 1240 target_alias=target_alias, 1241 model_name=model.name if model_or_snapshot else None, 1242 limit=limit, 1243 ) 1244 if show: 1245 self.console.show_schema_diff(table_diff.schema_diff()) 1246 self.console.show_row_diff(table_diff.row_diff(), show_sample=show_sample) 1247 return table_diff 1248 1249 def get_dag( 1250 self, select_models: t.Optional[t.Collection[str]] = None, **options: t.Any 1251 ) -> GraphHTML: 1252 """Gets an HTML object representation of the DAG. 1253 1254 Args: 1255 select_models: A list of model selection strings that should be included in the dag. 1256 Returns: 1257 An html object that renders the dag. 1258 """ 1259 dag = ( 1260 self.dag.prune(*self._new_selector().expand_model_selections(select_models)) 1261 if select_models 1262 else self.dag 1263 ) 1264 1265 nodes = {} 1266 edges: t.List[t.Dict] = [] 1267 1268 for node, deps in dag.graph.items(): 1269 nodes[node] = { 1270 "id": node, 1271 "label": node.split(".")[-1], 1272 "title": f"<span>{node}</span>", 1273 } 1274 edges.extend({"from": d, "to": node} for d in deps) 1275 1276 return GraphHTML( 1277 nodes, 1278 edges, 1279 options={ 1280 "height": "100%", 1281 "width": "100%", 1282 "interaction": {}, 1283 "layout": { 1284 "hierarchical": { 1285 "enabled": True, 1286 "nodeSpacing": 200, 1287 "sortMethod": "directed", 1288 }, 1289 }, 1290 "nodes": { 1291 "shape": "box", 1292 }, 1293 **options, 1294 }, 1295 ) 1296 1297 def render_dag(self, path: str, select_models: t.Optional[t.Collection[str]] = None) -> None: 1298 """Render the dag as HTML and save it to a file. 1299 1300 Args: 1301 path: filename to save the dag html to 1302 select_models: A list of model selection strings that should be included in the dag. 1303 """ 1304 file_path = Path(path) 1305 suffix = file_path.suffix 1306 if suffix != ".html": 1307 if suffix: 1308 logger.warning( 1309 f"The extension {suffix} does not designate an html file. A file with a `.html` extension will be created instead." 1310 ) 1311 path = str(file_path.with_suffix(".html")) 1312 1313 with open(path, "w", encoding="utf-8") as file: 1314 file.write(str(self.get_dag(select_models))) 1315 1316 def create_test( 1317 self, 1318 model: str, 1319 input_queries: t.Dict[str, str], 1320 overwrite: bool = False, 1321 variables: t.Optional[t.Dict[str, str]] = None, 1322 path: t.Optional[str] = None, 1323 name: t.Optional[str] = None, 1324 include_ctes: bool = False, 1325 ) -> None: 1326 """Generate a unit test fixture for a given model. 1327 1328 Args: 1329 model: The model to test. 1330 input_queries: Mapping of model names to queries. Each model included in this mapping 1331 will be populated in the test based on the results of the corresponding query. 1332 overwrite: Whether to overwrite the existing test in case of a file path collision. 1333 When set to False, an error will be raised if there is such a collision. 1334 variables: Key-value pairs that will define variables needed by the model. 1335 path: The file path corresponding to the fixture, relative to the test directory. 1336 By default, the fixture will be created under the test directory and the file name 1337 will be inferred from the test's name. 1338 name: The name of the test. This is inferred from the model name by default. 1339 include_ctes: When true, CTE fixtures will also be generated. 1340 """ 1341 input_queries = { 1342 # The get_model here has two purposes: return normalized names & check for missing deps 1343 self.get_model(dep, raise_if_missing=True).fqn: query 1344 for dep, query in input_queries.items() 1345 } 1346 1347 generate_test( 1348 model=self.get_model(model, raise_if_missing=True), 1349 input_queries=input_queries, 1350 models=self._models, 1351 engine_adapter=self._engine_adapter, 1352 test_engine_adapter=self._test_engine_adapter, 1353 project_path=self.path, 1354 overwrite=overwrite, 1355 variables=variables, 1356 path=path, 1357 name=name, 1358 include_ctes=include_ctes, 1359 ) 1360 1361 def test( 1362 self, 1363 match_patterns: t.Optional[t.List[str]] = None, 1364 tests: t.Optional[t.List[str]] = None, 1365 verbose: bool = False, 1366 preserve_fixtures: bool = False, 1367 stream: t.Optional[t.TextIO] = None, 1368 ) -> ModelTextTestResult: 1369 """Discover and run model tests""" 1370 if verbose: 1371 pd.set_option("display.max_columns", None) 1372 verbosity = 2 1373 else: 1374 verbosity = 1 1375 1376 try: 1377 if tests: 1378 result = run_model_tests( 1379 tests=tests, 1380 models=self._models, 1381 engine_adapter=self._test_engine_adapter, 1382 dialect=self.default_dialect, 1383 verbosity=verbosity, 1384 patterns=match_patterns, 1385 preserve_fixtures=preserve_fixtures, 1386 default_catalog=self.default_catalog, 1387 ) 1388 else: 1389 test_meta = [] 1390 1391 for path, config in self.configs.items(): 1392 test_meta.extend( 1393 get_all_model_tests( 1394 path / c.TESTS, 1395 patterns=match_patterns, 1396 ignore_patterns=config.ignore_patterns, 1397 ) 1398 ) 1399 1400 result = run_tests( 1401 test_meta, 1402 models=self._models, 1403 engine_adapter=self._test_engine_adapter, 1404 dialect=self.default_dialect, 1405 verbosity=verbosity, 1406 preserve_fixtures=preserve_fixtures, 1407 stream=stream, 1408 default_catalog=self.default_catalog, 1409 ) 1410 finally: 1411 self._test_engine_adapter.close() 1412 1413 return result 1414 1415 def audit( 1416 self, 1417 start: TimeLike, 1418 end: TimeLike, 1419 *, 1420 models: t.Optional[t.Iterator[str]] = None, 1421 execution_time: t.Optional[TimeLike] = None, 1422 ) -> None: 1423 """Audit models. 1424 1425 Args: 1426 start: The start of the interval to audit. 1427 end: The end of the interval to audit. 1428 models: The models to audit. All models will be audited if not specified. 1429 execution_time: The date/time time reference to use for execution time. Defaults to now. 1430 """ 1431 1432 snapshots = ( 1433 [self.get_snapshot(model, raise_if_missing=True) for model in models] 1434 if models 1435 else self.snapshots.values() 1436 ) 1437 1438 num_audits = sum(len(snapshot.audits_with_args) for snapshot in snapshots) 1439 self.console.log_status_update(f"Found {num_audits} audit(s).") 1440 errors = [] 1441 skipped_count = 0 1442 for snapshot in snapshots: 1443 for audit_result in self.snapshot_evaluator.audit( 1444 snapshot=snapshot, 1445 start=start, 1446 end=end, 1447 snapshots=self.snapshots, 1448 raise_exception=False, 1449 ): 1450 audit_id = f"{audit_result.audit.name}" 1451 if audit_result.model: 1452 audit_id += f" on model {audit_result.model.name}" 1453 1454 if audit_result.skipped: 1455 self.console.log_status_update(f"{audit_id} ⏸️ SKIPPED.") 1456 skipped_count += 1 1457 elif audit_result.count: 1458 errors.append(audit_result) 1459 self.console.log_status_update( 1460 f"{audit_id} ❌ [red]FAIL [{audit_result.count}][/red]." 1461 ) 1462 else: 1463 self.console.log_status_update(f"{audit_id} ✅ [green]PASS[/green].") 1464 1465 self.console.log_status_update( 1466 f"\nFinished with {len(errors)} audit error{'' if len(errors) == 1 else 's'} " 1467 f"and {skipped_count} audit{'' if skipped_count == 1 else 's'} skipped." 1468 ) 1469 for error in errors: 1470 self.console.log_status_update( 1471 f"\nFailure in audit {error.audit.name} ({error.audit._path})." 1472 ) 1473 self.console.log_status_update(f"Got {error.count} results, expected 0.") 1474 if error.query: 1475 self.console.show_sql( 1476 f"{error.query.sql(dialect=self.snapshot_evaluator.adapter.dialect)}" 1477 ) 1478 1479 self.console.log_status_update("Done.") 1480 1481 def rewrite(self, sql: str, dialect: str = "") -> exp.Expression: 1482 """Rewrite a sql expression with semantic references into an executable query. 1483 1484 https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/ 1485 1486 Args: 1487 sql: The sql string to rewrite. 1488 dialect: The dialect of the sql string, defaults to the project dialect. 1489 1490 Returns: 1491 A SQLGlot expression with semantic references expanded. 1492 """ 1493 return rewrite( 1494 sql, 1495 graph=ReferenceGraph(self.models.values()), 1496 metrics=self._metrics, 1497 dialect=dialect or self.default_dialect, 1498 ) 1499 1500 def migrate(self) -> None: 1501 """Migrates SQLMesh to the current running version. 1502 1503 Please contact your SQLMesh administrator before doing this. 1504 """ 1505 self.notification_target_manager.notify(NotificationEvent.MIGRATION_START) 1506 try: 1507 self._new_state_sync().migrate( 1508 default_catalog=self.default_catalog, 1509 promoted_snapshots_only=self.config.migration.promoted_snapshots_only, 1510 ) 1511 except Exception as e: 1512 self.notification_target_manager.notify( 1513 NotificationEvent.MIGRATION_FAILURE, traceback.format_exc() 1514 ) 1515 raise e 1516 self.notification_target_manager.notify(NotificationEvent.MIGRATION_END) 1517 1518 def rollback(self) -> None: 1519 """Rolls back SQLMesh to the previous migration. 1520 1521 Please contact your SQLMesh administrator before doing this. This action cannot be undone. 1522 """ 1523 self._new_state_sync().rollback() 1524 1525 def create_external_models(self) -> None: 1526 """Create a schema file with all external models. 1527 1528 The schema file contains all columns and types of external models, allowing for more robust 1529 lineage, validation, and optimizations. 1530 """ 1531 if not self._models: 1532 self.load(update_schemas=False) 1533 1534 for path, config in self.configs.items(): 1535 create_schema_file( 1536 path=path / c.SCHEMA_YAML, 1537 models=UniqueKeyDict( 1538 "models", 1539 { 1540 fqn: model 1541 for fqn, model in self._models.items() 1542 if self.config_for_node(model) is config 1543 }, 1544 ), 1545 adapter=self._engine_adapter, 1546 state_reader=self.state_reader, 1547 dialect=config.model_defaults.dialect, 1548 max_workers=self.concurrent_tasks, 1549 ) 1550 1551 def print_info(self) -> None: 1552 """Prints information about connections, models, macros, etc. to the console.""" 1553 self.console.log_status_update(f"Models: {len(self.models)}") 1554 self.console.log_status_update(f"Macros: {len(self._macros) - len(macro.get_registry())}") 1555 1556 self._try_connection("data warehouse", self._engine_adapter) 1557 1558 state_connection = self.config.get_state_connection(self.gateway) 1559 if state_connection: 1560 self._try_connection("state backend", state_connection.create_engine_adapter()) 1561 1562 self._try_connection("test", self._test_engine_adapter) 1563 1564 def close(self) -> None: 1565 """Releases all resources allocated by this context.""" 1566 self.snapshot_evaluator.close() 1567 self.state_sync.close() 1568 1569 def _run( 1570 self, 1571 environment: str, 1572 *, 1573 start: t.Optional[TimeLike], 1574 end: t.Optional[TimeLike], 1575 execution_time: t.Optional[TimeLike], 1576 skip_janitor: bool, 1577 ignore_cron: bool, 1578 ) -> bool: 1579 if not skip_janitor and environment.lower() == c.PROD: 1580 self._run_janitor() 1581 1582 env_check_attempts_num = max( 1583 1, 1584 self.config.run.environment_check_max_wait 1585 // self.config.run.environment_check_interval, 1586 ) 1587 1588 def _block_until_finalized() -> str: 1589 for _ in range(env_check_attempts_num): 1590 assert environment is not None # mypy 1591 environment_state = self.state_sync.get_environment(environment) 1592 if not environment_state: 1593 raise SQLMeshError(f"Environment '{environment}' was not found.") 1594 if environment_state.finalized_ts: 1595 return environment_state.plan_id 1596 logger.warning( 1597 "Environment '%s' is being updated by plan '%s'. Retrying in %s seconds...", 1598 environment, 1599 environment_state.plan_id, 1600 self.config.run.environment_check_interval, 1601 ) 1602 time.sleep(self.config.run.environment_check_interval) 1603 raise SQLMeshError( 1604 f"Exceeded the maximum wait time for environment '{environment}' to be ready. " 1605 "This means that the environment either failed to update or the update is taking longer than expected. " 1606 "See https://sqlmesh.readthedocs.io/en/stable/reference/configuration/#run to adjust the timeout settings." 1607 ) 1608 1609 done = False 1610 while not done: 1611 plan_id_at_start = _block_until_finalized() 1612 1613 def _has_environment_changed() -> bool: 1614 assert environment is not None # mypy 1615 current_environment_state = self.state_sync.get_environment(environment) 1616 return ( 1617 not current_environment_state 1618 or current_environment_state.plan_id != plan_id_at_start 1619 or not current_environment_state.finalized_ts 1620 ) 1621 1622 try: 1623 success = self.scheduler(environment=environment).run( 1624 environment, 1625 start=start, 1626 end=end, 1627 execution_time=execution_time, 1628 ignore_cron=ignore_cron, 1629 circuit_breaker=_has_environment_changed, 1630 ) 1631 done = True 1632 except CircuitBreakerError: 1633 logger.warning( 1634 "Environment '%s' has been modified while running. Restarting the run...", 1635 environment, 1636 ) 1637 1638 return success 1639 1640 def _apply(self, plan: Plan, circuit_breaker: t.Optional[t.Callable[[], bool]]) -> None: 1641 self._scheduler.create_plan_evaluator(self).evaluate(plan, circuit_breaker=circuit_breaker) 1642 1643 def table_name(self, model_name: str, dev: bool) -> str: 1644 """Returns the name of the pysical table for the given model name. 1645 1646 Args: 1647 model_name: The name of the model. 1648 dev: Whether to use the deployability index for the table name. 1649 1650 Returns: 1651 The name of the physical table. 1652 """ 1653 deployability_index = ( 1654 DeployabilityIndex.create(self.snapshots.values()) 1655 if dev 1656 else DeployabilityIndex.all_deployable() 1657 ) 1658 snapshot = self.get_snapshot(model_name) 1659 if not snapshot: 1660 raise SQLMeshError(f"Model '{model_name}' was not found.") 1661 return snapshot.table_name(is_deployable=deployability_index.is_deployable(snapshot)) 1662 1663 def clear_caches(self) -> None: 1664 for path in self.configs: 1665 rmtree(path / c.CACHE) 1666 1667 def _run_tests(self, verbose: bool = False) -> t.Tuple[unittest.result.TestResult, str]: 1668 test_output_io = StringIO() 1669 result = self.test(stream=test_output_io, verbose=verbose) 1670 return result, test_output_io.getvalue() 1671 1672 def _run_plan_tests( 1673 self, skip_tests: bool = False 1674 ) -> t.Tuple[t.Optional[unittest.result.TestResult], t.Optional[str]]: 1675 if self._test_engine_adapter and not skip_tests: 1676 result, test_output = self._run_tests() 1677 if result.testsRun > 0: 1678 self.console.log_test_results( 1679 result, test_output, self._test_engine_adapter.dialect 1680 ) 1681 if not result.wasSuccessful(): 1682 raise PlanError( 1683 "Cannot generate plan due to failing test(s). Fix test(s) and run again" 1684 ) 1685 return result, test_output 1686 return None, None 1687 1688 @property 1689 def _model_tables(self) -> t.Dict[str, str]: 1690 """Mapping of model name to physical table name. 1691 1692 If a snapshot has not been versioned yet, its view name will be returned. 1693 """ 1694 return { 1695 fqn: ( 1696 snapshot.table_name() 1697 if snapshot.version 1698 else snapshot.qualified_view_name.for_environment( 1699 EnvironmentNamingInfo.from_environment_catalog_mapping( 1700 self.config.environment_catalog_mapping, 1701 name=c.PROD, 1702 suffix_target=self.config.environment_suffix_target, 1703 ) 1704 ) 1705 ) 1706 for fqn, snapshot in self.snapshots.items() 1707 } 1708 1709 def _snapshots( 1710 self, models_override: t.Optional[UniqueKeyDict[str, Model]] = None 1711 ) -> t.Dict[str, Snapshot]: 1712 prod = self.state_reader.get_environment(c.PROD) 1713 remote_snapshots = ( 1714 { 1715 snapshot.name: snapshot 1716 for snapshot in self.state_reader.get_snapshots(prod.snapshots).values() 1717 } 1718 if prod 1719 else {} 1720 ) 1721 1722 local_nodes = {**(models_override or self._models), **self._standalone_audits} 1723 nodes = local_nodes.copy() 1724 audits = self._audits.copy() 1725 projects = {config.project for config in self.configs.values()} 1726 1727 for name, snapshot in remote_snapshots.items(): 1728 if name not in nodes and snapshot.node.project not in projects: 1729 nodes[name] = snapshot.node 1730 if snapshot.is_model: 1731 for audit in snapshot.audits: 1732 if name not in audits: 1733 audits[name] = audit 1734 1735 def _nodes_to_snapshots(nodes: t.Dict[str, Node]) -> t.Dict[str, Snapshot]: 1736 snapshots: t.Dict[str, Snapshot] = {} 1737 fingerprint_cache: t.Dict[str, SnapshotFingerprint] = {} 1738 1739 for node in nodes.values(): 1740 if node.fqn not in local_nodes and node.fqn in remote_snapshots: 1741 ttl = remote_snapshots[node.fqn].ttl 1742 else: 1743 config = self.config_for_node(node) 1744 ttl = config.snapshot_ttl 1745 1746 snapshot = Snapshot.from_node( 1747 node, 1748 nodes=nodes, 1749 audits=audits, 1750 cache=fingerprint_cache, 1751 ttl=ttl, 1752 ) 1753 snapshots[snapshot.name] = snapshot 1754 return snapshots 1755 1756 snapshots = _nodes_to_snapshots(nodes) 1757 stored_snapshots = self.state_reader.get_snapshots(snapshots.values()) 1758 1759 unrestorable_snapshots = { 1760 snapshot 1761 for snapshot in stored_snapshots.values() 1762 if snapshot.name in local_nodes and snapshot.unrestorable 1763 } 1764 if unrestorable_snapshots: 1765 for snapshot in unrestorable_snapshots: 1766 logger.info( 1767 "Found a unrestorable snapshot %s. Restamping the model...", snapshot.name 1768 ) 1769 node = local_nodes[snapshot.name] 1770 nodes[snapshot.name] = node.copy( 1771 update={"stamp": f"revert to {snapshot.identifier}"} 1772 ) 1773 snapshots = _nodes_to_snapshots(nodes) 1774 stored_snapshots = self.state_reader.get_snapshots(snapshots.values()) 1775 1776 for snapshot in stored_snapshots.values(): 1777 # Keep the original model instance to preserve the query cache. 1778 snapshot.node = snapshots[snapshot.name].node 1779 1780 return {name: stored_snapshots.get(s.snapshot_id, s) for name, s in snapshots.items()} 1781 1782 def _context_diff( 1783 self, 1784 environment: str, 1785 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 1786 create_from: t.Optional[str] = None, 1787 force_no_diff: bool = False, 1788 ensure_finalized_snapshots: bool = False, 1789 ) -> ContextDiff: 1790 environment = Environment.normalize_name(environment) 1791 if force_no_diff: 1792 return ContextDiff.create_no_diff(environment) 1793 return ContextDiff.create( 1794 environment, 1795 snapshots=snapshots or self.snapshots, 1796 create_from=create_from or c.PROD, 1797 state_reader=self.state_reader, 1798 ensure_finalized_snapshots=ensure_finalized_snapshots, 1799 ) 1800 1801 def _run_janitor(self) -> None: 1802 self._cleanup_environments() 1803 expired_snapshots = self.state_sync.delete_expired_snapshots() 1804 self.snapshot_evaluator.cleanup( 1805 expired_snapshots, on_complete=self.console.update_cleanup_progress 1806 ) 1807 1808 self.state_sync.compact_intervals() 1809 1810 def _cleanup_environments(self) -> None: 1811 expired_environments = self.state_sync.delete_expired_environments() 1812 cleanup_expired_views(self.engine_adapter, expired_environments, console=self.console) 1813 1814 def _try_connection(self, connection_name: str, engine_adapter: EngineAdapter) -> None: 1815 connection_name = connection_name.capitalize() 1816 try: 1817 engine_adapter.fetchall("SELECT 1") 1818 self.console.log_status_update(f"{connection_name} connection [green]succeeded[/green]") 1819 except Exception as ex: 1820 self.console.log_error(f"{connection_name} connection failed. {ex}") 1821 1822 def _new_state_sync(self) -> StateSync: 1823 return self._provided_state_sync or self._scheduler.create_state_sync(self) 1824 1825 def _new_selector(self) -> Selector: 1826 return Selector( 1827 self.state_reader, 1828 self._models, 1829 context_path=self.path, 1830 default_catalog=self.default_catalog, 1831 dialect=self.default_dialect, 1832 ) 1833 1834 def _register_notification_targets(self) -> None: 1835 event_notifications = collections.defaultdict(set) 1836 for target in self.notification_targets: 1837 if target.is_configured: 1838 for event in target.notify_on: 1839 event_notifications[event].add(target) 1840 user_notification_targets = { 1841 user.username: set( 1842 target for target in user.notification_targets if target.is_configured 1843 ) 1844 for user in self.users 1845 } 1846 self.notification_target_manager = NotificationTargetManager( 1847 event_notifications, user_notification_targets, username=self.config.username 1848 )
Encapsulates a SQLMesh environment supplying convenient functions to perform various tasks.
Arguments:
- engine_adapter: The default engine adapter to use.
- notification_targets: The notification target to use. Defaults to what is defined in config.
- paths: The directories containing SQLMesh files.
- config: A Config object or the name of a Config object in config.py.
- connection: The name of the connection. If not specified the first connection as it appears in configuration will be used.
- test_connection: The name of the connection to use for tests. If not specified the first connection as it appears in configuration will be used.
- concurrent_tasks: The maximum number of tasks that can use the connection concurrently.
- load: Whether or not to automatically load all models and macros (default True).
- console: The rich instance used for printing out CLI command results.
- users: A list of users to make known to SQLMesh.
- config_type: The type of config object to use (default Config).
287 def __init__( 288 self, 289 engine_adapter: t.Optional[EngineAdapter] = None, 290 notification_targets: t.Optional[t.List[NotificationTarget]] = None, 291 state_sync: t.Optional[StateSync] = None, 292 paths: t.Union[str | Path, t.Iterable[str | Path]] = "", 293 config: t.Optional[t.Union[C, str, t.Dict[Path, C]]] = None, 294 gateway: t.Optional[str] = None, 295 concurrent_tasks: t.Optional[int] = None, 296 loader: t.Optional[t.Type[Loader]] = None, 297 load: bool = True, 298 console: t.Optional[Console] = None, 299 users: t.Optional[t.List[User]] = None, 300 ): 301 self.console = console or get_console() 302 self.configs = ( 303 config if isinstance(config, dict) else load_configs(config, self.CONFIG_TYPE, paths) 304 ) 305 self.dag: DAG[str] = DAG() 306 self._models: UniqueKeyDict[str, Model] = UniqueKeyDict("models") 307 self._audits: UniqueKeyDict[str, Audit] = UniqueKeyDict("audits") 308 self._standalone_audits: UniqueKeyDict[str, StandaloneAudit] = UniqueKeyDict( 309 "standaloneaudits" 310 ) 311 self._macros: UniqueKeyDict[str, ExecutableOrMacro] = UniqueKeyDict("macros") 312 self._metrics: UniqueKeyDict[str, Metric] = UniqueKeyDict("metrics") 313 self._jinja_macros = JinjaMacroRegistry() 314 self._default_catalog: t.Optional[str] = None 315 316 self.path, self.config = t.cast(t.Tuple[Path, C], next(iter(self.configs.items()))) 317 318 self.gateway = gateway 319 self._scheduler = self.config.get_scheduler(self.gateway) 320 self.environment_ttl = self.config.environment_ttl 321 self.pinned_environments = Environment.normalize_names(self.config.pinned_environments) 322 self.auto_categorize_changes = self.config.plan.auto_categorize_changes 323 324 self._connection_config = self.config.get_connection(self.gateway) 325 self.concurrent_tasks = concurrent_tasks or self._connection_config.concurrent_tasks 326 self._engine_adapter = engine_adapter or self._connection_config.create_engine_adapter() 327 328 test_connection_config = self.config.get_test_connection( 329 self.gateway, self.default_catalog, default_catalog_dialect=self.engine_adapter.DIALECT 330 ) 331 self._test_engine_adapter = test_connection_config.create_engine_adapter( 332 register_comments_override=False 333 ) 334 335 self._snapshot_evaluator: t.Optional[SnapshotEvaluator] = None 336 337 self._provided_state_sync: t.Optional[StateSync] = state_sync 338 self._state_sync: t.Optional[StateSync] = None 339 340 self._loader = (loader or self.config.loader)(**self.config.loader_kwargs) 341 342 # Should we dedupe notification_targets? If so how? 343 self.notification_targets = (notification_targets or []) + self.config.notification_targets 344 self.users = (users or []) + self.config.users 345 self.users = list({user.username: user for user in self.users}.values()) 346 self._register_notification_targets() 347 348 if ( 349 self.config.environment_catalog_mapping 350 and not self.engine_adapter.CATALOG_SUPPORT.is_multi_catalog_supported 351 ): 352 raise SQLMeshError( 353 "Environment catalog mapping is only supported for engine adapters that support multiple catalogs" 354 ) 355 356 if load: 357 self.load()
377 def execution_context( 378 self, deployability_index: t.Optional[DeployabilityIndex] = None 379 ) -> ExecutionContext: 380 """Returns an execution context.""" 381 return ExecutionContext( 382 engine_adapter=self._engine_adapter, 383 snapshots=self.snapshots, 384 deployability_index=deployability_index, 385 default_dialect=self.default_dialect, 386 default_catalog=self.default_catalog, 387 )
Returns an execution context.
389 def upsert_model(self, model: t.Union[str, Model], **kwargs: t.Any) -> Model: 390 """Update or insert a model. 391 392 The context's models dictionary will be updated to include these changes. 393 394 Args: 395 model: Model name or instance to update. 396 kwargs: The kwargs to update the model with. 397 398 Returns: 399 A new instance of the updated or inserted model. 400 """ 401 model = self.get_model(model, raise_if_missing=True) 402 path = model._path 403 404 # model.copy() can't be used here due to a cached state that can be a part of a model instance. 405 model = t.cast(Model, type(model)(**{**t.cast(Model, model).dict(), **kwargs})) 406 model._path = path 407 408 self._models.update({model.fqn: model}) 409 self.dag.add(model.fqn, model.depends_on) 410 update_model_schemas( 411 self.dag, 412 self._models, 413 self.path, 414 ) 415 416 model.validate_definition() 417 418 return model
Update or insert a model.
The context's models dictionary will be updated to include these changes.
Arguments:
- model: Model name or instance to update.
- kwargs: The kwargs to update the model with.
Returns:
A new instance of the updated or inserted model.
420 def scheduler(self, environment: t.Optional[str] = None) -> Scheduler: 421 """Returns the built-in scheduler. 422 423 Args: 424 environment: The target environment to source model snapshots from, or None 425 if snapshots should be sourced from the currently loaded local state. 426 427 Returns: 428 The built-in scheduler instance. 429 """ 430 snapshots: t.Iterable[Snapshot] 431 if environment is not None: 432 stored_environment = self.state_sync.get_environment(environment) 433 if stored_environment is None: 434 raise ConfigError(f"Environment '{environment}' was not found.") 435 snapshots = self.state_sync.get_snapshots(stored_environment.snapshots).values() 436 else: 437 snapshots = self.snapshots.values() 438 439 if not snapshots: 440 raise ConfigError("No models were found") 441 442 return Scheduler( 443 snapshots, 444 self.snapshot_evaluator, 445 self.state_sync, 446 default_catalog=self.default_catalog, 447 max_workers=self.concurrent_tasks, 448 console=self.console, 449 notification_target_manager=self.notification_target_manager, 450 )
Returns the built-in scheduler.
Arguments:
- environment: The target environment to source model snapshots from, or None if snapshots should be sourced from the currently loaded local state.
Returns:
The built-in scheduler instance.
467 def refresh(self) -> None: 468 """Refresh all models that have been updated.""" 469 if self._loader.reload_needed(): 470 self.load()
Refresh all models that have been updated.
472 def load(self, update_schemas: bool = True) -> GenericContext[C]: 473 """Load all files in the context's path.""" 474 with sys_path(*self.configs): 475 gc.disable() 476 project = self._loader.load(self, update_schemas) 477 self._macros = project.macros 478 self._jinja_macros = project.jinja_macros 479 self._models = project.models 480 self._metrics = project.metrics 481 self._standalone_audits.clear() 482 self._audits.clear() 483 for name, audit in project.audits.items(): 484 if isinstance(audit, StandaloneAudit): 485 self._standalone_audits[name] = audit 486 else: 487 self._audits[name] = audit 488 self.dag = project.dag 489 gc.enable() 490 491 duplicates = set(self._models) & set(self._standalone_audits) 492 if duplicates: 493 raise ConfigError( 494 f"Models and Standalone audits cannot have the same name: {duplicates}" 495 ) 496 497 return self
Load all files in the context's path.
499 def run( 500 self, 501 environment: t.Optional[str] = None, 502 *, 503 start: t.Optional[TimeLike] = None, 504 end: t.Optional[TimeLike] = None, 505 execution_time: t.Optional[TimeLike] = None, 506 skip_janitor: bool = False, 507 ignore_cron: bool = False, 508 ) -> bool: 509 """Run the entire dag through the scheduler. 510 511 Args: 512 environment: The target environment to source model snapshots from and virtually update. Default: prod. 513 start: The start of the interval to render. 514 end: The end of the interval to render. 515 execution_time: The date/time time reference to use for execution time. Defaults to now. 516 skip_janitor: Whether to skip the janitor task. 517 ignore_cron: Whether to ignore the model's cron schedule and run all available missing intervals. 518 519 Returns: 520 True if the run was successful, False otherwise. 521 """ 522 environment = environment or self.config.default_target_environment 523 524 self.notification_target_manager.notify( 525 NotificationEvent.RUN_START, environment=environment 526 ) 527 success = False 528 try: 529 success = self._run( 530 environment=environment, 531 start=start, 532 end=end, 533 execution_time=execution_time, 534 skip_janitor=skip_janitor, 535 ignore_cron=ignore_cron, 536 ) 537 except Exception as e: 538 self.notification_target_manager.notify( 539 NotificationEvent.RUN_FAILURE, traceback.format_exc() 540 ) 541 logger.error(f"Run Failure: {traceback.format_exc()}") 542 raise e 543 544 if success: 545 self.notification_target_manager.notify( 546 NotificationEvent.RUN_END, environment=environment 547 ) 548 self.console.log_success(f"Run finished for environment '{environment}'") 549 else: 550 self.notification_target_manager.notify( 551 NotificationEvent.RUN_FAILURE, "See console logs for details." 552 ) 553 554 return success
Run the entire dag through the scheduler.
Arguments:
- environment: The target environment to source model snapshots from and virtually update. Default: prod.
- start: The start of the interval to render.
- end: The end of the interval to render.
- execution_time: The date/time time reference to use for execution time. Defaults to now.
- skip_janitor: Whether to skip the janitor task.
- ignore_cron: Whether to ignore the model's cron schedule and run all available missing intervals.
Returns:
True if the run was successful, False otherwise.
568 def get_model( 569 self, model_or_snapshot: ModelOrSnapshot, raise_if_missing: bool = False 570 ) -> t.Optional[Model]: 571 """Returns a model with the given name or None if a model with such name doesn't exist. 572 573 Args: 574 model_or_snapshot: A model name, model, or snapshot. 575 raise_if_missing: Raises an error if a model is not found. 576 577 Returns: 578 The expected model. 579 """ 580 if isinstance(model_or_snapshot, str): 581 normalized_name = normalize_model_name( 582 model_or_snapshot, 583 dialect=self.default_dialect, 584 default_catalog=self.default_catalog, 585 ) 586 model = self._models.get(normalized_name) 587 elif isinstance(model_or_snapshot, Snapshot): 588 model = model_or_snapshot.model 589 else: 590 model = model_or_snapshot 591 592 if raise_if_missing and not model: 593 raise SQLMeshError(f"Cannot find model for '{model_or_snapshot}'") 594 595 return model
Returns a model with the given name or None if a model with such name doesn't exist.
Arguments:
- model_or_snapshot: A model name, model, or snapshot.
- raise_if_missing: Raises an error if a model is not found.
Returns:
The expected model.
610 def get_snapshot( 611 self, node_or_snapshot: NodeOrSnapshot, raise_if_missing: bool = False 612 ) -> t.Optional[Snapshot]: 613 """Returns a snapshot with the given name or None if a snapshot with such name doesn't exist. 614 615 Args: 616 node_or_snapshot: A node name, node, or snapshot. 617 raise_if_missing: Raises an error if a snapshot is not found. 618 619 Returns: 620 The expected snapshot. 621 """ 622 if isinstance(node_or_snapshot, Snapshot): 623 return node_or_snapshot 624 if isinstance(node_or_snapshot, str) and not self.standalone_audits.get(node_or_snapshot): 625 node_or_snapshot = normalize_model_name( 626 node_or_snapshot, 627 dialect=self.default_dialect, 628 default_catalog=self.default_catalog, 629 ) 630 fqn = node_or_snapshot if isinstance(node_or_snapshot, str) else node_or_snapshot.fqn 631 snapshot = self.snapshots.get(fqn) 632 633 if raise_if_missing and not snapshot: 634 raise SQLMeshError(f"Cannot find snapshot for '{fqn}'") 635 636 return snapshot
Returns a snapshot with the given name or None if a snapshot with such name doesn't exist.
Arguments:
- node_or_snapshot: A node name, node, or snapshot.
- raise_if_missing: Raises an error if a snapshot is not found.
Returns:
The expected snapshot.
Returns all registered standalone audits in this context.
Generates and returns snapshots based on models registered in this context.
If one of the snapshots has been previously stored in the persisted state, the stored instance will be returned.
682 def render( 683 self, 684 model_or_snapshot: ModelOrSnapshot, 685 *, 686 start: t.Optional[TimeLike] = None, 687 end: t.Optional[TimeLike] = None, 688 execution_time: t.Optional[TimeLike] = None, 689 expand: t.Union[bool, t.Iterable[str]] = False, 690 **kwargs: t.Any, 691 ) -> exp.Expression: 692 """Renders a model's query, expanding macros with provided kwargs, and optionally expanding referenced models. 693 694 Args: 695 model_or_snapshot: The model, model name, or snapshot to render. 696 start: The start of the interval to render. 697 end: The end of the interval to render. 698 execution_time: The date/time time reference to use for execution time. Defaults to now. 699 expand: Whether or not to use expand materialized models, defaults to False. 700 If True, all referenced models are expanded as raw queries. 701 If a list, only referenced models are expanded as raw queries. 702 703 Returns: 704 The rendered expression. 705 """ 706 execution_time = execution_time or now_ds() 707 708 model = self.get_model(model_or_snapshot, raise_if_missing=True) 709 710 if expand and not isinstance(expand, bool): 711 expand = { 712 normalize_model_name( 713 x, default_catalog=self.default_catalog, dialect=self.default_dialect 714 ) 715 for x in expand 716 } 717 718 expand = self.dag.upstream(model.fqn) if expand is True else expand or [] 719 720 if model.is_seed: 721 df = next( 722 model.render( 723 context=self.execution_context(), 724 start=start, 725 end=end, 726 execution_time=execution_time, 727 **kwargs, 728 ) 729 ) 730 return next(pandas_to_sql(t.cast(pd.DataFrame, df), model.columns_to_types)) 731 732 return model.render_query_or_raise( 733 start=start, 734 end=end, 735 execution_time=execution_time, 736 snapshots=self.snapshots, 737 expand=expand, 738 engine_adapter=self.engine_adapter, 739 **kwargs, 740 )
Renders a model's query, expanding macros with provided kwargs, and optionally expanding referenced models.
Arguments:
- model_or_snapshot: The model, model name, or snapshot to render.
- start: The start of the interval to render.
- end: The end of the interval to render.
- execution_time: The date/time time reference to use for execution time. Defaults to now.
- expand: Whether or not to use expand materialized models, defaults to False. If True, all referenced models are expanded as raw queries. If a list, only referenced models are expanded as raw queries.
Returns:
The rendered expression.
742 def evaluate( 743 self, 744 model_or_snapshot: ModelOrSnapshot, 745 start: TimeLike, 746 end: TimeLike, 747 execution_time: TimeLike, 748 limit: t.Optional[int] = None, 749 **kwargs: t.Any, 750 ) -> DF: 751 """Evaluate a model or snapshot (running its query against a DB/Engine). 752 753 This method is used to test or iterate on models without side effects. 754 755 Args: 756 model_or_snapshot: The model, model name, or snapshot to render. 757 start: The start of the interval to evaluate. 758 end: The end of the interval to evaluate. 759 execution_time: The date/time time reference to use for execution time. 760 limit: A limit applied to the model. 761 """ 762 snapshot = self.get_snapshot(model_or_snapshot, raise_if_missing=True) 763 764 df = self.snapshot_evaluator.evaluate_and_fetch( 765 snapshot, 766 start=start, 767 end=end, 768 execution_time=execution_time, 769 snapshots=self.snapshots, 770 limit=limit or c.DEFAULT_MAX_LIMIT, 771 ) 772 773 if df is None: 774 raise RuntimeError(f"Error evaluating {snapshot.name}") 775 776 return df
Evaluate a model or snapshot (running its query against a DB/Engine).
This method is used to test or iterate on models without side effects.
Arguments:
- model_or_snapshot: The model, model name, or snapshot to render.
- start: The start of the interval to evaluate.
- end: The end of the interval to evaluate.
- execution_time: The date/time time reference to use for execution time.
- limit: A limit applied to the model.
778 def format( 779 self, 780 transpile: t.Optional[str] = None, 781 append_newline: t.Optional[bool] = None, 782 **kwargs: t.Any, 783 ) -> None: 784 """Format all SQL models.""" 785 for model in self._models.values(): 786 if not model._path.suffix == ".sql": 787 continue 788 with open(model._path, "r+", encoding="utf-8") as file: 789 expressions = parse( 790 file.read(), default_dialect=self.config_for_node(model).dialect 791 ) 792 if transpile: 793 for prop in expressions[0].expressions: 794 if prop.name.lower() == "dialect": 795 prop.replace( 796 exp.Property( 797 this="dialect", 798 value=exp.Literal.string(transpile or model.dialect), 799 ) 800 ) 801 format = self.config_for_node(model).format 802 opts = {**format.generator_options, **kwargs} 803 file.seek(0) 804 file.write( 805 format_model_expressions(expressions, transpile or model.dialect, **opts) 806 ) 807 if append_newline is None: 808 append_newline = format.append_newline 809 if append_newline: 810 file.write("\n") 811 file.truncate()
Format all SQL models.
813 def plan( 814 self, 815 environment: t.Optional[str] = None, 816 *, 817 start: t.Optional[TimeLike] = None, 818 end: t.Optional[TimeLike] = None, 819 execution_time: t.Optional[TimeLike] = None, 820 create_from: t.Optional[str] = None, 821 skip_tests: bool = False, 822 restate_models: t.Optional[t.Iterable[str]] = None, 823 no_gaps: bool = False, 824 skip_backfill: bool = False, 825 forward_only: t.Optional[bool] = None, 826 no_prompts: t.Optional[bool] = None, 827 auto_apply: t.Optional[bool] = None, 828 no_auto_categorization: t.Optional[bool] = None, 829 effective_from: t.Optional[TimeLike] = None, 830 include_unmodified: t.Optional[bool] = None, 831 select_models: t.Optional[t.Collection[str]] = None, 832 backfill_models: t.Optional[t.Collection[str]] = None, 833 categorizer_config: t.Optional[CategorizerConfig] = None, 834 enable_preview: t.Optional[bool] = None, 835 no_diff: t.Optional[bool] = None, 836 run: bool = False, 837 ) -> Plan: 838 """Interactively creates a plan. 839 840 This method compares the current context with the target environment. It then presents 841 the differences and asks whether to backfill each modified model. 842 843 Args: 844 environment: The environment to diff and plan against. 845 start: The start date of the backfill if there is one. 846 end: The end date of the backfill if there is one. 847 execution_time: The date/time reference to use for execution time. Defaults to now. 848 create_from: The environment to create the target environment from if it 849 doesn't exist. If not specified, the "prod" environment will be used. 850 skip_tests: Unit tests are run by default so this will skip them if enabled 851 restate_models: A list of either internal or external models, or tags, that need to be restated 852 for the given plan interval. If the target environment is a production environment, 853 ALL snapshots that depended on these upstream tables will have their intervals deleted 854 (even ones not in this current environment). Only the snapshots in this environment will 855 be backfilled whereas others need to be recovered on a future plan application. For development 856 environments only snapshots that are part of this plan will be affected. 857 no_gaps: Whether to ensure that new snapshots for models that are already a 858 part of the target environment have no data gaps when compared against previous 859 snapshots for same models. 860 skip_backfill: Whether to skip the backfill step. Default: False. 861 forward_only: Whether the purpose of the plan is to make forward only changes. 862 no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that 863 if this flag is set to true and there are uncategorized changes the plan creation will 864 fail. Default: False. 865 auto_apply: Whether to automatically apply the new plan after creation. Default: False. 866 no_auto_categorization: Indicates whether to disable automatic categorization of model 867 changes (breaking / non-breaking). If not provided, then the corresponding configuration 868 option determines the behavior. 869 categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the 870 project config by default. 871 effective_from: The effective date from which to apply forward-only changes on production. 872 include_unmodified: Indicates whether to include unmodified models in the target development environment. 873 select_models: A list of model selection strings to filter the models that should be included into this plan. 874 backfill_models: A list of model selection strings to filter the models for which the data should be backfilled. 875 enable_preview: Indicates whether to enable preview for forward-only models in development environments. 876 no_diff: Hide text differences for changed models. 877 run: Whether to run latest intervals as part of the plan application. 878 879 Returns: 880 The populated Plan object. 881 """ 882 plan_builder = self.plan_builder( 883 environment, 884 start=start, 885 end=end, 886 execution_time=execution_time, 887 create_from=create_from, 888 skip_tests=skip_tests, 889 restate_models=restate_models, 890 no_gaps=no_gaps, 891 skip_backfill=skip_backfill, 892 forward_only=forward_only, 893 no_auto_categorization=no_auto_categorization, 894 effective_from=effective_from, 895 include_unmodified=include_unmodified, 896 select_models=select_models, 897 backfill_models=backfill_models, 898 categorizer_config=categorizer_config, 899 enable_preview=enable_preview, 900 run=run, 901 ) 902 903 self.console.plan( 904 plan_builder, 905 auto_apply if auto_apply is not None else self.config.plan.auto_apply, 906 self.default_catalog, 907 no_diff=no_diff if no_diff is not None else self.config.plan.no_diff, 908 no_prompts=no_prompts if no_prompts is not None else self.config.plan.no_prompts, 909 ) 910 911 return plan_builder.build()
Interactively creates a plan.
This method compares the current context with the target environment. It then presents the differences and asks whether to backfill each modified model.
Arguments:
- environment: The environment to diff and plan against.
- start: The start date of the backfill if there is one.
- end: The end date of the backfill if there is one.
- execution_time: The date/time reference to use for execution time. Defaults to now.
- create_from: The environment to create the target environment from if it doesn't exist. If not specified, the "prod" environment will be used.
- skip_tests: Unit tests are run by default so this will skip them if enabled
- restate_models: A list of either internal or external models, or tags, that need to be restated for the given plan interval. If the target environment is a production environment, ALL snapshots that depended on these upstream tables will have their intervals deleted (even ones not in this current environment). Only the snapshots in this environment will be backfilled whereas others need to be recovered on a future plan application. For development environments only snapshots that are part of this plan will be affected.
- no_gaps: Whether to ensure that new snapshots for models that are already a part of the target environment have no data gaps when compared against previous snapshots for same models.
- skip_backfill: Whether to skip the backfill step. Default: False.
- forward_only: Whether the purpose of the plan is to make forward only changes.
- no_prompts: Whether to disable interactive prompts for the backfill time range. Please note that if this flag is set to true and there are uncategorized changes the plan creation will fail. Default: False.
- auto_apply: Whether to automatically apply the new plan after creation. Default: False.
- no_auto_categorization: Indicates whether to disable automatic categorization of model changes (breaking / non-breaking). If not provided, then the corresponding configuration option determines the behavior.
- categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the project config by default.
- effective_from: The effective date from which to apply forward-only changes on production.
- include_unmodified: Indicates whether to include unmodified models in the target development environment.
- select_models: A list of model selection strings to filter the models that should be included into this plan.
- backfill_models: A list of model selection strings to filter the models for which the data should be backfilled.
- enable_preview: Indicates whether to enable preview for forward-only models in development environments.
- no_diff: Hide text differences for changed models.
- run: Whether to run latest intervals as part of the plan application.
Returns:
The populated Plan object.
913 def plan_builder( 914 self, 915 environment: t.Optional[str] = None, 916 *, 917 start: t.Optional[TimeLike] = None, 918 end: t.Optional[TimeLike] = None, 919 execution_time: t.Optional[TimeLike] = None, 920 create_from: t.Optional[str] = None, 921 skip_tests: bool = False, 922 restate_models: t.Optional[t.Iterable[str]] = None, 923 no_gaps: bool = False, 924 skip_backfill: bool = False, 925 forward_only: t.Optional[bool] = None, 926 no_auto_categorization: t.Optional[bool] = None, 927 effective_from: t.Optional[TimeLike] = None, 928 include_unmodified: t.Optional[bool] = None, 929 select_models: t.Optional[t.Collection[str]] = None, 930 backfill_models: t.Optional[t.Collection[str]] = None, 931 categorizer_config: t.Optional[CategorizerConfig] = None, 932 enable_preview: t.Optional[bool] = None, 933 run: bool = False, 934 ) -> PlanBuilder: 935 """Creates a plan builder. 936 937 Args: 938 environment: The environment to diff and plan against. 939 start: The start date of the backfill if there is one. 940 end: The end date of the backfill if there is one. 941 execution_time: The date/time reference to use for execution time. Defaults to now. 942 create_from: The environment to create the target environment from if it 943 doesn't exist. If not specified, the "prod" environment will be used. 944 skip_tests: Unit tests are run by default so this will skip them if enabled 945 restate_models: A list of either internal or external models, or tags, that need to be restated 946 for the given plan interval. If the target environment is a production environment, 947 ALL snapshots that depended on these upstream tables will have their intervals deleted 948 (even ones not in this current environment). Only the snapshots in this environment will 949 be backfilled whereas others need to be recovered on a future plan application. For development 950 environments only snapshots that are part of this plan will be affected. 951 no_gaps: Whether to ensure that new snapshots for models that are already a 952 part of the target environment have no data gaps when compared against previous 953 snapshots for same models. 954 skip_backfill: Whether to skip the backfill step. Default: False. 955 forward_only: Whether the purpose of the plan is to make forward only changes. 956 no_auto_categorization: Indicates whether to disable automatic categorization of model 957 changes (breaking / non-breaking). If not provided, then the corresponding configuration 958 option determines the behavior. 959 categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the 960 project config by default. 961 effective_from: The effective date from which to apply forward-only changes on production. 962 include_unmodified: Indicates whether to include unmodified models in the target development environment. 963 select_models: A list of model selection strings to filter the models that should be included into this plan. 964 backfill_models: A list of model selection strings to filter the models for which the data should be backfilled. 965 enable_preview: Indicates whether to enable preview for forward-only models in development environments. 966 run: Whether to run latest intervals as part of the plan application. 967 968 Returns: 969 The plan builder. 970 """ 971 environment = environment or self.config.default_target_environment 972 environment = Environment.normalize_name(environment) 973 is_dev = environment != c.PROD 974 975 if skip_backfill and not no_gaps and not is_dev: 976 raise ConfigError( 977 "When targeting the production environment either the backfill should not be skipped or the lack of data gaps should be enforced (--no-gaps flag)." 978 ) 979 980 if run and is_dev: 981 raise ConfigError("The '--run' flag is only supported for the production environment.") 982 983 self._run_plan_tests(skip_tests=skip_tests) 984 985 environment_ttl = ( 986 self.environment_ttl if environment not in self.pinned_environments else None 987 ) 988 989 model_selector = self._new_selector() 990 991 if backfill_models: 992 backfill_models = model_selector.expand_model_selections(backfill_models) 993 else: 994 backfill_models = None 995 996 models_override: t.Optional[UniqueKeyDict[str, Model]] = None 997 if select_models: 998 models_override = model_selector.select_models( 999 select_models, 1000 environment, 1001 fallback_env_name=create_from or c.PROD, 1002 ensure_finalized_snapshots=self.config.plan.use_finalized_state, 1003 ) 1004 if not backfill_models: 1005 # Only backfill selected models unless explicitly specified. 1006 backfill_models = model_selector.expand_model_selections(select_models) 1007 1008 expanded_restate_models = None 1009 if restate_models is not None: 1010 expanded_restate_models = model_selector.expand_model_selections(restate_models) 1011 if not expanded_restate_models: 1012 self.console.log_error( 1013 f"Provided restated models do not match any models. No models will be included in plan. Provided: {', '.join(restate_models)}" 1014 ) 1015 1016 snapshots = self._snapshots(models_override) 1017 context_diff = self._context_diff( 1018 environment or c.PROD, 1019 snapshots=snapshots, 1020 create_from=create_from, 1021 force_no_diff=(restate_models is not None and not expanded_restate_models) 1022 or (backfill_models is not None and not backfill_models), 1023 ensure_finalized_snapshots=self.config.plan.use_finalized_state, 1024 ) 1025 1026 # If no end date is specified, use the max interval end from prod 1027 # to prevent unintended evaluation of the entire DAG. 1028 if not run: 1029 if backfill_models is not None: 1030 # Only consider selected models for the default end value. 1031 models_for_default_end = backfill_models.copy() 1032 for name in backfill_models: 1033 if name not in snapshots: 1034 continue 1035 snapshot = snapshots[name] 1036 snapshot_id = snapshot.snapshot_id 1037 if ( 1038 snapshot_id in context_diff.added 1039 and snapshot_id in context_diff.new_snapshots 1040 ): 1041 # If the selected model is a newly added model, then we should narrow down the intervals 1042 # that should be considered for the default plan end value by including its parents. 1043 models_for_default_end |= {s.name for s in snapshot.parents} 1044 default_end = self.state_sync.greatest_common_interval_end( 1045 c.PROD, 1046 models_for_default_end, 1047 ensure_finalized_snapshots=self.config.plan.use_finalized_state, 1048 ) 1049 else: 1050 default_end = self.state_sync.max_interval_end_for_environment( 1051 c.PROD, ensure_finalized_snapshots=self.config.plan.use_finalized_state 1052 ) 1053 else: 1054 default_end = None 1055 1056 default_start = to_date(default_end) - timedelta(days=1) if default_end and is_dev else None 1057 1058 return PlanBuilder( 1059 context_diff=context_diff, 1060 start=start, 1061 end=end, 1062 execution_time=execution_time, 1063 apply=self.apply, 1064 restate_models=expanded_restate_models, 1065 backfill_models=backfill_models, 1066 no_gaps=no_gaps, 1067 skip_backfill=skip_backfill, 1068 is_dev=is_dev, 1069 forward_only=( 1070 forward_only if forward_only is not None else self.config.plan.forward_only 1071 ), 1072 environment_ttl=environment_ttl, 1073 environment_suffix_target=self.config.environment_suffix_target, 1074 environment_catalog_mapping=self.config.environment_catalog_mapping, 1075 categorizer_config=categorizer_config or self.auto_categorize_changes, 1076 auto_categorization_enabled=not no_auto_categorization, 1077 effective_from=effective_from, 1078 include_unmodified=( 1079 include_unmodified 1080 if include_unmodified is not None 1081 else self.config.plan.include_unmodified 1082 ), 1083 default_start=default_start, 1084 default_end=default_end, 1085 enable_preview=( 1086 enable_preview if enable_preview is not None else self.config.plan.enable_preview 1087 ), 1088 end_bounded=not run, 1089 ensure_finalized_snapshots=self.config.plan.use_finalized_state, 1090 )
Creates a plan builder.
Arguments:
- environment: The environment to diff and plan against.
- start: The start date of the backfill if there is one.
- end: The end date of the backfill if there is one.
- execution_time: The date/time reference to use for execution time. Defaults to now.
- create_from: The environment to create the target environment from if it doesn't exist. If not specified, the "prod" environment will be used.
- skip_tests: Unit tests are run by default so this will skip them if enabled
- restate_models: A list of either internal or external models, or tags, that need to be restated for the given plan interval. If the target environment is a production environment, ALL snapshots that depended on these upstream tables will have their intervals deleted (even ones not in this current environment). Only the snapshots in this environment will be backfilled whereas others need to be recovered on a future plan application. For development environments only snapshots that are part of this plan will be affected.
- no_gaps: Whether to ensure that new snapshots for models that are already a part of the target environment have no data gaps when compared against previous snapshots for same models.
- skip_backfill: Whether to skip the backfill step. Default: False.
- forward_only: Whether the purpose of the plan is to make forward only changes.
- no_auto_categorization: Indicates whether to disable automatic categorization of model changes (breaking / non-breaking). If not provided, then the corresponding configuration option determines the behavior.
- categorizer_config: The configuration for the categorizer. Uses the categorizer configuration defined in the project config by default.
- effective_from: The effective date from which to apply forward-only changes on production.
- include_unmodified: Indicates whether to include unmodified models in the target development environment.
- select_models: A list of model selection strings to filter the models that should be included into this plan.
- backfill_models: A list of model selection strings to filter the models for which the data should be backfilled.
- enable_preview: Indicates whether to enable preview for forward-only models in development environments.
- run: Whether to run latest intervals as part of the plan application.
Returns:
The plan builder.
1092 def apply( 1093 self, 1094 plan: Plan, 1095 circuit_breaker: t.Optional[t.Callable[[], bool]] = None, 1096 ) -> None: 1097 """Applies a plan by pushing snapshots and backfilling data. 1098 1099 Given a plan, it pushes snapshots into the state sync and then uses the scheduler 1100 to backfill all models. 1101 1102 Args: 1103 plan: The plan to apply. 1104 circuit_breaker: An optional handler which checks if the apply should be aborted. 1105 """ 1106 if ( 1107 not plan.context_diff.has_changes 1108 and not plan.requires_backfill 1109 and not plan.has_unmodified_unpromoted 1110 ): 1111 return 1112 if plan.uncategorized: 1113 raise UncategorizedPlanError("Can't apply a plan with uncategorized changes.") 1114 self.notification_target_manager.notify( 1115 NotificationEvent.APPLY_START, 1116 environment=plan.environment_naming_info.name, 1117 plan_id=plan.plan_id, 1118 ) 1119 try: 1120 self._apply(plan, circuit_breaker) 1121 except Exception as e: 1122 self.notification_target_manager.notify( 1123 NotificationEvent.APPLY_FAILURE, 1124 environment=plan.environment_naming_info.name, 1125 plan_id=plan.plan_id, 1126 exc=traceback.format_exc(), 1127 ) 1128 logger.error(f"Apply Failure: {traceback.format_exc()}") 1129 raise e 1130 self.notification_target_manager.notify( 1131 NotificationEvent.APPLY_END, 1132 environment=plan.environment_naming_info.name, 1133 plan_id=plan.plan_id, 1134 )
Applies a plan by pushing snapshots and backfilling data.
Given a plan, it pushes snapshots into the state sync and then uses the scheduler to backfill all models.
Arguments:
- plan: The plan to apply.
- circuit_breaker: An optional handler which checks if the apply should be aborted.
1136 def invalidate_environment(self, name: str, sync: bool = False) -> None: 1137 """Invalidates the target environment by setting its expiration timestamp to now. 1138 1139 Args: 1140 name: The name of the environment to invalidate. 1141 sync: If True, the call blocks until the environment is deleted. Otherwise, the environment will 1142 be deleted asynchronously by the janitor process. 1143 """ 1144 self.state_sync.invalidate_environment(name) 1145 if sync: 1146 self._cleanup_environments() 1147 self.console.log_success(f"Environment '{name}' has been deleted.") 1148 else: 1149 self.console.log_success(f"Environment '{name}' has been invalidated.")
Invalidates the target environment by setting its expiration timestamp to now.
Arguments:
- name: The name of the environment to invalidate.
- sync: If True, the call blocks until the environment is deleted. Otherwise, the environment will be deleted asynchronously by the janitor process.
1151 def diff(self, environment: t.Optional[str] = None, detailed: bool = False) -> bool: 1152 """Show a diff of the current context with a given environment. 1153 1154 Args: 1155 environment: The environment to diff against. 1156 detailed: Show the actual SQL differences if True. 1157 1158 Returns: 1159 True if there are changes, False otherwise. 1160 """ 1161 environment = environment or self.config.default_target_environment 1162 environment = Environment.normalize_name(environment) 1163 context_diff = self._context_diff(environment) 1164 self.console.show_model_difference_summary( 1165 context_diff, 1166 EnvironmentNamingInfo.from_environment_catalog_mapping( 1167 self.config.environment_catalog_mapping, 1168 name=environment, 1169 suffix_target=self.config.environment_suffix_target, 1170 ), 1171 self.default_catalog, 1172 no_diff=not detailed, 1173 ) 1174 return context_diff.has_changes
Show a diff of the current context with a given environment.
Arguments:
- environment: The environment to diff against.
- detailed: Show the actual SQL differences if True.
Returns:
True if there are changes, False otherwise.
1176 def table_diff( 1177 self, 1178 source: str, 1179 target: str, 1180 on: t.List[str] | exp.Condition | None = None, 1181 model_or_snapshot: t.Optional[ModelOrSnapshot] = None, 1182 where: t.Optional[str | exp.Condition] = None, 1183 limit: int = 20, 1184 show: bool = True, 1185 show_sample: bool = True, 1186 ) -> TableDiff: 1187 """Show a diff between two tables. 1188 1189 Args: 1190 source: The source environment or table. 1191 target: The target environment or table. 1192 on: The join condition, table aliases must be "s" and "t" for source and target. 1193 If omitted, the table's grain will be used. 1194 model_or_snapshot: The model or snapshot to use when environments are passed in. 1195 where: An optional where statement to filter results. 1196 limit: The limit of the sample dataframe. 1197 show: Show the table diff output in the console. 1198 show_sample: Show the sample dataframe in the console. Requires show=True. 1199 1200 Returns: 1201 The TableDiff object containing schema and summary differences. 1202 """ 1203 source_alias, target_alias = source, target 1204 if model_or_snapshot: 1205 model = self.get_model(model_or_snapshot, raise_if_missing=True) 1206 source_env = self.state_reader.get_environment(source) 1207 target_env = self.state_reader.get_environment(target) 1208 1209 if not source_env: 1210 raise SQLMeshError(f"Could not find environment '{source}'") 1211 if not target_env: 1212 raise SQLMeshError(f"Could not find environment '{target}')") 1213 1214 source = next( 1215 snapshot for snapshot in source_env.snapshots if snapshot.name == model.fqn 1216 ).table_name() 1217 target = next( 1218 snapshot for snapshot in target_env.snapshots if snapshot.name == model.fqn 1219 ).table_name() 1220 source_alias = source_env.name 1221 target_alias = target_env.name 1222 1223 if not on: 1224 for ref in model.all_references: 1225 if ref.unique: 1226 on = ref.columns 1227 1228 if not on: 1229 raise SQLMeshError( 1230 "SQLMesh doesn't know how to join the two tables. Specify the `grains` in each model definition or pass join column names in separate `-o` flags." 1231 ) 1232 1233 table_diff = TableDiff( 1234 adapter=self._engine_adapter, 1235 source=source, 1236 target=target, 1237 on=on, 1238 where=where, 1239 source_alias=source_alias, 1240 target_alias=target_alias, 1241 model_name=model.name if model_or_snapshot else None, 1242 limit=limit, 1243 ) 1244 if show: 1245 self.console.show_schema_diff(table_diff.schema_diff()) 1246 self.console.show_row_diff(table_diff.row_diff(), show_sample=show_sample) 1247 return table_diff
Show a diff between two tables.
Arguments:
- source: The source environment or table.
- target: The target environment or table.
- on: The join condition, table aliases must be "s" and "t" for source and target. If omitted, the table's grain will be used.
- model_or_snapshot: The model or snapshot to use when environments are passed in.
- where: An optional where statement to filter results.
- limit: The limit of the sample dataframe.
- show: Show the table diff output in the console.
- show_sample: Show the sample dataframe in the console. Requires show=True.
Returns:
The TableDiff object containing schema and summary differences.
1249 def get_dag( 1250 self, select_models: t.Optional[t.Collection[str]] = None, **options: t.Any 1251 ) -> GraphHTML: 1252 """Gets an HTML object representation of the DAG. 1253 1254 Args: 1255 select_models: A list of model selection strings that should be included in the dag. 1256 Returns: 1257 An html object that renders the dag. 1258 """ 1259 dag = ( 1260 self.dag.prune(*self._new_selector().expand_model_selections(select_models)) 1261 if select_models 1262 else self.dag 1263 ) 1264 1265 nodes = {} 1266 edges: t.List[t.Dict] = [] 1267 1268 for node, deps in dag.graph.items(): 1269 nodes[node] = { 1270 "id": node, 1271 "label": node.split(".")[-1], 1272 "title": f"<span>{node}</span>", 1273 } 1274 edges.extend({"from": d, "to": node} for d in deps) 1275 1276 return GraphHTML( 1277 nodes, 1278 edges, 1279 options={ 1280 "height": "100%", 1281 "width": "100%", 1282 "interaction": {}, 1283 "layout": { 1284 "hierarchical": { 1285 "enabled": True, 1286 "nodeSpacing": 200, 1287 "sortMethod": "directed", 1288 }, 1289 }, 1290 "nodes": { 1291 "shape": "box", 1292 }, 1293 **options, 1294 }, 1295 )
Gets an HTML object representation of the DAG.
Arguments:
- select_models: A list of model selection strings that should be included in the dag.
Returns:
An html object that renders the dag.
1297 def render_dag(self, path: str, select_models: t.Optional[t.Collection[str]] = None) -> None: 1298 """Render the dag as HTML and save it to a file. 1299 1300 Args: 1301 path: filename to save the dag html to 1302 select_models: A list of model selection strings that should be included in the dag. 1303 """ 1304 file_path = Path(path) 1305 suffix = file_path.suffix 1306 if suffix != ".html": 1307 if suffix: 1308 logger.warning( 1309 f"The extension {suffix} does not designate an html file. A file with a `.html` extension will be created instead." 1310 ) 1311 path = str(file_path.with_suffix(".html")) 1312 1313 with open(path, "w", encoding="utf-8") as file: 1314 file.write(str(self.get_dag(select_models)))
Render the dag as HTML and save it to a file.
Arguments:
- path: filename to save the dag html to
- select_models: A list of model selection strings that should be included in the dag.
1316 def create_test( 1317 self, 1318 model: str, 1319 input_queries: t.Dict[str, str], 1320 overwrite: bool = False, 1321 variables: t.Optional[t.Dict[str, str]] = None, 1322 path: t.Optional[str] = None, 1323 name: t.Optional[str] = None, 1324 include_ctes: bool = False, 1325 ) -> None: 1326 """Generate a unit test fixture for a given model. 1327 1328 Args: 1329 model: The model to test. 1330 input_queries: Mapping of model names to queries. Each model included in this mapping 1331 will be populated in the test based on the results of the corresponding query. 1332 overwrite: Whether to overwrite the existing test in case of a file path collision. 1333 When set to False, an error will be raised if there is such a collision. 1334 variables: Key-value pairs that will define variables needed by the model. 1335 path: The file path corresponding to the fixture, relative to the test directory. 1336 By default, the fixture will be created under the test directory and the file name 1337 will be inferred from the test's name. 1338 name: The name of the test. This is inferred from the model name by default. 1339 include_ctes: When true, CTE fixtures will also be generated. 1340 """ 1341 input_queries = { 1342 # The get_model here has two purposes: return normalized names & check for missing deps 1343 self.get_model(dep, raise_if_missing=True).fqn: query 1344 for dep, query in input_queries.items() 1345 } 1346 1347 generate_test( 1348 model=self.get_model(model, raise_if_missing=True), 1349 input_queries=input_queries, 1350 models=self._models, 1351 engine_adapter=self._engine_adapter, 1352 test_engine_adapter=self._test_engine_adapter, 1353 project_path=self.path, 1354 overwrite=overwrite, 1355 variables=variables, 1356 path=path, 1357 name=name, 1358 include_ctes=include_ctes, 1359 )
Generate a unit test fixture for a given model.
Arguments:
- model: The model to test.
- input_queries: Mapping of model names to queries. Each model included in this mapping will be populated in the test based on the results of the corresponding query.
- overwrite: Whether to overwrite the existing test in case of a file path collision. When set to False, an error will be raised if there is such a collision.
- variables: Key-value pairs that will define variables needed by the model.
- path: The file path corresponding to the fixture, relative to the test directory. By default, the fixture will be created under the test directory and the file name will be inferred from the test's name.
- name: The name of the test. This is inferred from the model name by default.
- include_ctes: When true, CTE fixtures will also be generated.
1361 def test( 1362 self, 1363 match_patterns: t.Optional[t.List[str]] = None, 1364 tests: t.Optional[t.List[str]] = None, 1365 verbose: bool = False, 1366 preserve_fixtures: bool = False, 1367 stream: t.Optional[t.TextIO] = None, 1368 ) -> ModelTextTestResult: 1369 """Discover and run model tests""" 1370 if verbose: 1371 pd.set_option("display.max_columns", None) 1372 verbosity = 2 1373 else: 1374 verbosity = 1 1375 1376 try: 1377 if tests: 1378 result = run_model_tests( 1379 tests=tests, 1380 models=self._models, 1381 engine_adapter=self._test_engine_adapter, 1382 dialect=self.default_dialect, 1383 verbosity=verbosity, 1384 patterns=match_patterns, 1385 preserve_fixtures=preserve_fixtures, 1386 default_catalog=self.default_catalog, 1387 ) 1388 else: 1389 test_meta = [] 1390 1391 for path, config in self.configs.items(): 1392 test_meta.extend( 1393 get_all_model_tests( 1394 path / c.TESTS, 1395 patterns=match_patterns, 1396 ignore_patterns=config.ignore_patterns, 1397 ) 1398 ) 1399 1400 result = run_tests( 1401 test_meta, 1402 models=self._models, 1403 engine_adapter=self._test_engine_adapter, 1404 dialect=self.default_dialect, 1405 verbosity=verbosity, 1406 preserve_fixtures=preserve_fixtures, 1407 stream=stream, 1408 default_catalog=self.default_catalog, 1409 ) 1410 finally: 1411 self._test_engine_adapter.close() 1412 1413 return result
Discover and run model tests
1415 def audit( 1416 self, 1417 start: TimeLike, 1418 end: TimeLike, 1419 *, 1420 models: t.Optional[t.Iterator[str]] = None, 1421 execution_time: t.Optional[TimeLike] = None, 1422 ) -> None: 1423 """Audit models. 1424 1425 Args: 1426 start: The start of the interval to audit. 1427 end: The end of the interval to audit. 1428 models: The models to audit. All models will be audited if not specified. 1429 execution_time: The date/time time reference to use for execution time. Defaults to now. 1430 """ 1431 1432 snapshots = ( 1433 [self.get_snapshot(model, raise_if_missing=True) for model in models] 1434 if models 1435 else self.snapshots.values() 1436 ) 1437 1438 num_audits = sum(len(snapshot.audits_with_args) for snapshot in snapshots) 1439 self.console.log_status_update(f"Found {num_audits} audit(s).") 1440 errors = [] 1441 skipped_count = 0 1442 for snapshot in snapshots: 1443 for audit_result in self.snapshot_evaluator.audit( 1444 snapshot=snapshot, 1445 start=start, 1446 end=end, 1447 snapshots=self.snapshots, 1448 raise_exception=False, 1449 ): 1450 audit_id = f"{audit_result.audit.name}" 1451 if audit_result.model: 1452 audit_id += f" on model {audit_result.model.name}" 1453 1454 if audit_result.skipped: 1455 self.console.log_status_update(f"{audit_id} ⏸️ SKIPPED.") 1456 skipped_count += 1 1457 elif audit_result.count: 1458 errors.append(audit_result) 1459 self.console.log_status_update( 1460 f"{audit_id} ❌ [red]FAIL [{audit_result.count}][/red]." 1461 ) 1462 else: 1463 self.console.log_status_update(f"{audit_id} ✅ [green]PASS[/green].") 1464 1465 self.console.log_status_update( 1466 f"\nFinished with {len(errors)} audit error{'' if len(errors) == 1 else 's'} " 1467 f"and {skipped_count} audit{'' if skipped_count == 1 else 's'} skipped." 1468 ) 1469 for error in errors: 1470 self.console.log_status_update( 1471 f"\nFailure in audit {error.audit.name} ({error.audit._path})." 1472 ) 1473 self.console.log_status_update(f"Got {error.count} results, expected 0.") 1474 if error.query: 1475 self.console.show_sql( 1476 f"{error.query.sql(dialect=self.snapshot_evaluator.adapter.dialect)}" 1477 ) 1478 1479 self.console.log_status_update("Done.")
Audit models.
Arguments:
- start: The start of the interval to audit.
- end: The end of the interval to audit.
- models: The models to audit. All models will be audited if not specified.
- execution_time: The date/time time reference to use for execution time. Defaults to now.
1481 def rewrite(self, sql: str, dialect: str = "") -> exp.Expression: 1482 """Rewrite a sql expression with semantic references into an executable query. 1483 1484 https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/ 1485 1486 Args: 1487 sql: The sql string to rewrite. 1488 dialect: The dialect of the sql string, defaults to the project dialect. 1489 1490 Returns: 1491 A SQLGlot expression with semantic references expanded. 1492 """ 1493 return rewrite( 1494 sql, 1495 graph=ReferenceGraph(self.models.values()), 1496 metrics=self._metrics, 1497 dialect=dialect or self.default_dialect, 1498 )
Rewrite a sql expression with semantic references into an executable query.
https://sqlmesh.readthedocs.io/en/latest/concepts/metrics/overview/
Arguments:
- sql: The sql string to rewrite.
- dialect: The dialect of the sql string, defaults to the project dialect.
Returns:
A SQLGlot expression with semantic references expanded.
1500 def migrate(self) -> None: 1501 """Migrates SQLMesh to the current running version. 1502 1503 Please contact your SQLMesh administrator before doing this. 1504 """ 1505 self.notification_target_manager.notify(NotificationEvent.MIGRATION_START) 1506 try: 1507 self._new_state_sync().migrate( 1508 default_catalog=self.default_catalog, 1509 promoted_snapshots_only=self.config.migration.promoted_snapshots_only, 1510 ) 1511 except Exception as e: 1512 self.notification_target_manager.notify( 1513 NotificationEvent.MIGRATION_FAILURE, traceback.format_exc() 1514 ) 1515 raise e 1516 self.notification_target_manager.notify(NotificationEvent.MIGRATION_END)
Migrates SQLMesh to the current running version.
Please contact your SQLMesh administrator before doing this.
1518 def rollback(self) -> None: 1519 """Rolls back SQLMesh to the previous migration. 1520 1521 Please contact your SQLMesh administrator before doing this. This action cannot be undone. 1522 """ 1523 self._new_state_sync().rollback()
Rolls back SQLMesh to the previous migration.
Please contact your SQLMesh administrator before doing this. This action cannot be undone.
1525 def create_external_models(self) -> None: 1526 """Create a schema file with all external models. 1527 1528 The schema file contains all columns and types of external models, allowing for more robust 1529 lineage, validation, and optimizations. 1530 """ 1531 if not self._models: 1532 self.load(update_schemas=False) 1533 1534 for path, config in self.configs.items(): 1535 create_schema_file( 1536 path=path / c.SCHEMA_YAML, 1537 models=UniqueKeyDict( 1538 "models", 1539 { 1540 fqn: model 1541 for fqn, model in self._models.items() 1542 if self.config_for_node(model) is config 1543 }, 1544 ), 1545 adapter=self._engine_adapter, 1546 state_reader=self.state_reader, 1547 dialect=config.model_defaults.dialect, 1548 max_workers=self.concurrent_tasks, 1549 )
Create a schema file with all external models.
The schema file contains all columns and types of external models, allowing for more robust lineage, validation, and optimizations.
1551 def print_info(self) -> None: 1552 """Prints information about connections, models, macros, etc. to the console.""" 1553 self.console.log_status_update(f"Models: {len(self.models)}") 1554 self.console.log_status_update(f"Macros: {len(self._macros) - len(macro.get_registry())}") 1555 1556 self._try_connection("data warehouse", self._engine_adapter) 1557 1558 state_connection = self.config.get_state_connection(self.gateway) 1559 if state_connection: 1560 self._try_connection("state backend", state_connection.create_engine_adapter()) 1561 1562 self._try_connection("test", self._test_engine_adapter)
Prints information about connections, models, macros, etc. to the console.
1564 def close(self) -> None: 1565 """Releases all resources allocated by this context.""" 1566 self.snapshot_evaluator.close() 1567 self.state_sync.close()
Releases all resources allocated by this context.
1643 def table_name(self, model_name: str, dev: bool) -> str: 1644 """Returns the name of the pysical table for the given model name. 1645 1646 Args: 1647 model_name: The name of the model. 1648 dev: Whether to use the deployability index for the table name. 1649 1650 Returns: 1651 The name of the physical table. 1652 """ 1653 deployability_index = ( 1654 DeployabilityIndex.create(self.snapshots.values()) 1655 if dev 1656 else DeployabilityIndex.all_deployable() 1657 ) 1658 snapshot = self.get_snapshot(model_name) 1659 if not snapshot: 1660 raise SQLMeshError(f"Model '{model_name}' was not found.") 1661 return snapshot.table_name(is_deployable=deployability_index.is_deployable(snapshot))
Returns the name of the pysical table for the given model name.
Arguments:
- model_name: The name of the model.
- dev: Whether to use the deployability index for the table name.
Returns:
The name of the physical table.
Inherited Members
Encapsulates a SQLMesh environment supplying convenient functions to perform various tasks.
Arguments:
- engine_adapter: The default engine adapter to use.
- notification_targets: The notification target to use. Defaults to what is defined in config.
- paths: The directories containing SQLMesh files.
- config: A Config object or the name of a Config object in config.py.
- connection: The name of the connection. If not specified the first connection as it appears in configuration will be used.
- test_connection: The name of the connection to use for tests. If not specified the first connection as it appears in configuration will be used.
- concurrent_tasks: The maximum number of tasks that can use the connection concurrently.
- load: Whether or not to automatically load all models and macros (default True).
- console: The rich instance used for printing out CLI command results.
- users: A list of users to make known to SQLMesh.
- config_type: The type of config object to use (default Config).
Inherited Members
- GenericContext
- GenericContext
- default_dialect
- engine_adapter
- execution_context
- upsert_model
- scheduler
- refresh
- load
- run
- get_model
- get_snapshot
- config_for_path
- config_for_node
- models
- metrics
- standalone_audits
- snapshots
- render
- evaluate
- format
- plan
- plan_builder
- apply
- invalidate_environment
- diff
- table_diff
- get_dag
- render_dag
- create_test
- test
- audit
- rewrite
- migrate
- rollback
- create_external_models
- print_info
- close
- table_name
- clear_caches