sqlmesh.core.renderer
1from __future__ import annotations 2 3import logging 4import typing as t 5from contextlib import contextmanager 6from functools import partial 7from pathlib import Path 8 9from sqlglot import exp, Dialect 10from sqlglot.errors import SqlglotError 11from sqlglot.helper import ensure_list 12from sqlglot.optimizer.annotate_types import annotate_types 13from sqlglot.optimizer.qualify import qualify 14from sqlglot.optimizer.simplify import simplify 15 16from sqlmesh.core import constants as c 17from sqlmesh.core import dialect as d 18from sqlmesh.core.macros import MacroEvaluator, RuntimeStage 19from sqlmesh.utils.date import ( 20 TimeLike, 21 date_dict, 22 make_inclusive, 23 to_datetime, 24 make_ts_exclusive, 25 to_tstz, 26) 27from sqlmesh.utils.errors import ( 28 ConfigError, 29 ParsetimeAdapterCallError, 30 SQLMeshError, 31 raise_config_error, 32) 33from sqlmesh.utils.jinja import JinjaMacroRegistry, extract_error_details 34from sqlmesh.utils.metaprogramming import Executable, prepare_env 35 36if t.TYPE_CHECKING: 37 from sqlglot._typing import E 38 from sqlglot.dialects.dialect import DialectType 39 40 from sqlmesh.core.linter.rule import Rule 41 from sqlmesh.core.model.definition import _Model 42 from sqlmesh.core.snapshot import DeployabilityIndex, Snapshot 43 44 45logger = logging.getLogger(__name__) 46 47 48class BaseExpressionRenderer: 49 def __init__( 50 self, 51 expression: exp.Expr, 52 dialect: DialectType, 53 macro_definitions: t.List[d.MacroDef], 54 path: t.Optional[Path] = None, 55 jinja_macro_registry: t.Optional[JinjaMacroRegistry] = None, 56 python_env: t.Optional[t.Dict[str, Executable]] = None, 57 only_execution_time: bool = False, 58 schema: t.Optional[t.Dict[str, t.Any]] = None, 59 default_catalog: t.Optional[str] = None, 60 quote_identifiers: bool = True, 61 normalize_identifiers: bool = True, 62 optimize_query: t.Optional[bool] = True, 63 model: t.Optional[_Model] = None, 64 ): 65 self._expression = expression 66 self._dialect = dialect 67 self._macro_definitions = macro_definitions 68 self._path = path 69 self._jinja_macro_registry = jinja_macro_registry or JinjaMacroRegistry() 70 self._python_env = python_env or {} 71 self._only_execution_time = only_execution_time 72 self._default_catalog = default_catalog 73 self._normalize_identifiers = normalize_identifiers 74 self._quote_identifiers = quote_identifiers 75 self.update_schema({} if schema is None else schema) 76 self._cache: t.List[t.Optional[exp.Expr]] = [] 77 self._model_fqn = model.fqn if model else None 78 self._optimize_query_flag = optimize_query is not False 79 self._model = model 80 81 def update_schema(self, schema: t.Dict[str, t.Any]) -> None: 82 self.schema = d.normalize_mapping_schema(schema, dialect=self._dialect) 83 84 def _render( 85 self, 86 start: t.Optional[TimeLike] = None, 87 end: t.Optional[TimeLike] = None, 88 execution_time: t.Optional[TimeLike] = None, 89 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 90 table_mapping: t.Optional[t.Dict[str, str]] = None, 91 deployability_index: t.Optional[DeployabilityIndex] = None, 92 runtime_stage: RuntimeStage = RuntimeStage.LOADING, 93 **kwargs: t.Any, 94 ) -> t.List[t.Optional[exp.Expr]]: 95 """Renders a expression, expanding macros with provided kwargs 96 97 Args: 98 start: The start datetime to render. Defaults to epoch start. 99 end: The end datetime to render. Defaults to epoch start. 100 execution_time: The date/time time reference to use for execution time. 101 snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations. 102 table_mapping: Table mapping of physical locations. Takes precedence over snapshot mappings. 103 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 104 runtime_stage: Indicates the current runtime stage, for example if we're still loading the project, etc. 105 kwargs: Additional kwargs to pass to the renderer. 106 107 Returns: 108 The rendered expressions. 109 """ 110 111 should_cache = self._should_cache( 112 runtime_stage, start, end, execution_time, *kwargs.values() 113 ) 114 115 if should_cache and self._cache: 116 return self._cache 117 118 environment_naming_info = kwargs.get("environment_naming_info") 119 if environment_naming_info is not None: 120 kwargs["this_env"] = getattr(environment_naming_info, "name") 121 if snapshots: 122 schemas, views = set(), [] 123 for snapshot in snapshots.values(): 124 if snapshot.is_model and not snapshot.is_symbolic: 125 schemas.add( 126 snapshot.qualified_view_name.schema_for_environment( 127 environment_naming_info, dialect=self._dialect 128 ) 129 ) 130 views.append( 131 snapshot.display_name( 132 environment_naming_info, self._default_catalog, self._dialect 133 ) 134 ) 135 if schemas: 136 kwargs["schemas"] = list(schemas) 137 if views: 138 kwargs["views"] = views 139 140 this_model = kwargs.pop("this_model", None) 141 142 this_snapshot = (snapshots or {}).get(self._model_fqn) if self._model_fqn else None 143 if not this_model and self._model_fqn: 144 this_model = self._resolve_table( 145 self._model_fqn, 146 snapshots={self._model_fqn: this_snapshot} if this_snapshot else None, 147 deployability_index=deployability_index, 148 table_mapping=table_mapping, 149 ) 150 if this_snapshot and (kind := this_snapshot.model_kind_name): 151 kwargs["model_kind_name"] = kind.name 152 153 def _resolve_table(table: str | exp.Table) -> str: 154 return self._resolve_table( 155 d.normalize_model_name(table, self._default_catalog, self._dialect), 156 snapshots=snapshots, 157 table_mapping=table_mapping, 158 deployability_index=deployability_index, 159 ).sql(dialect=self._dialect, identify=True, comments=False) 160 161 macro_evaluator = MacroEvaluator( 162 self._dialect, 163 python_env=self._python_env, 164 schema=self.schema, 165 runtime_stage=runtime_stage, 166 resolve_table=_resolve_table, 167 resolve_tables=lambda e: self._resolve_tables( 168 e, 169 snapshots=snapshots, 170 table_mapping=table_mapping, 171 deployability_index=deployability_index, 172 start=start, 173 end=end, 174 execution_time=execution_time, 175 runtime_stage=runtime_stage, 176 ), 177 snapshots=snapshots, 178 default_catalog=self._default_catalog, 179 path=self._path, 180 environment_naming_info=environment_naming_info, 181 model_fqn=self._model_fqn, 182 ) 183 184 start_time, end_time = ( 185 make_inclusive(start or c.EPOCH, end or c.EPOCH, self._dialect) 186 if not self._only_execution_time 187 else (None, None) 188 ) 189 190 render_kwargs = { 191 **date_dict( 192 to_datetime(execution_time or c.EPOCH), 193 start_time, 194 end_time, 195 ), 196 **kwargs, 197 } 198 199 if this_model: 200 render_kwargs["this_model"] = this_model 201 202 macro_evaluator.locals.update(render_kwargs) 203 204 variables = kwargs.pop("variables", {}) 205 if variables: 206 macro_evaluator.locals.setdefault(c.SQLMESH_VARS, {}).update(variables) 207 208 expressions: t.List[exp.Expr] = [self._expression] 209 if isinstance(self._expression, d.Jinja): 210 try: 211 jinja_env_kwargs = { 212 **{ 213 **render_kwargs, 214 **_prepare_python_env_for_jinja(macro_evaluator, self._python_env), 215 **variables, 216 }, 217 "snapshots": snapshots or {}, 218 "table_mapping": table_mapping, 219 "deployability_index": deployability_index, 220 "default_catalog": self._default_catalog, 221 "runtime_stage": runtime_stage.value, 222 "resolve_table": _resolve_table, 223 "model_instance": self._model, 224 } 225 226 if this_model: 227 jinja_env_kwargs["this_model"] = this_model.sql( 228 dialect=self._dialect, identify=True, comments=False 229 ) 230 231 if self._model and self._model.kind.is_incremental_by_time_range: 232 all_refs = list( 233 self._jinja_macro_registry.global_objs.get("sources", {}).values() # type: ignore 234 ) + list( 235 self._jinja_macro_registry.global_objs.get("refs", {}).values() # type: ignore 236 ) 237 for ref in all_refs: 238 if ref.event_time_filter: 239 ref.event_time_filter["start"] = render_kwargs["start_tstz"] 240 ref.event_time_filter["end"] = to_tstz( 241 make_ts_exclusive(render_kwargs["end_tstz"], dialect=self._dialect) 242 ) 243 244 jinja_env = self._jinja_macro_registry.build_environment(**jinja_env_kwargs) 245 246 expressions = [] 247 rendered_expression = jinja_env.from_string(self._expression.name).render() 248 logger.debug( 249 f"Rendered Jinja expression for model '{self._model_fqn}' at '{self._path}': '{rendered_expression}'" 250 ) 251 except ParsetimeAdapterCallError: 252 raise 253 except Exception as ex: 254 raise ConfigError( 255 f"Could not render jinja for '{self._path}'.\n" + extract_error_details(ex) 256 ) from ex 257 258 if rendered_expression.strip(): 259 # ensure there is actual SQL and not just comments and non-SQL jinja 260 dialect = Dialect.get_or_raise(self._dialect) 261 tokens = dialect.tokenize(rendered_expression) 262 263 if tokens: 264 try: 265 expressions = [ 266 e for e in dialect.parser().parse(tokens, rendered_expression) if e 267 ] 268 269 if not expressions: 270 raise ConfigError( 271 f"Failed to parse an expression:\n{rendered_expression}" 272 ) 273 except Exception as ex: 274 raise ConfigError( 275 f"Could not parse the rendered jinja at '{self._path}'.\n{ex}" 276 ) from ex 277 278 for definition in self._macro_definitions: 279 try: 280 macro_evaluator.evaluate(definition) 281 except Exception as ex: 282 raise_config_error( 283 f"Failed to evaluate macro '{definition}'.\n\n{ex}\n", self._path 284 ) 285 286 resolved_expressions: t.List[t.Optional[exp.Expr]] = [] 287 288 for expression in expressions: 289 try: 290 transformed_expressions = ensure_list(macro_evaluator.transform(expression)) 291 except Exception as ex: 292 raise_config_error( 293 f"Failed to resolve macros for\n\n{expression.sql(dialect=self._dialect, pretty=True)}\n\n{ex}\n", 294 self._path, 295 ) 296 297 for expression in t.cast(t.List[exp.Expr], transformed_expressions): 298 with self._normalize_and_quote(expression) as expression: 299 if hasattr(expression, "selects"): 300 for select in expression.selects: 301 if not isinstance(select, exp.Alias) and select.output_name not in ( 302 "*", 303 "", 304 ): 305 alias = exp.alias_( 306 select, select.output_name, quoted=self._quote_identifiers 307 ) 308 comments = alias.this.comments 309 if comments: 310 alias.add_comments(comments) 311 comments.clear() 312 313 select.replace(alias) 314 resolved_expressions.append(expression) 315 316 # We dont cache here if columns_to_type was called in a macro. 317 # This allows the model's query to be re-rendered so that the 318 # MacroEvaluator can resolve columns_to_types calls and provide true schemas. 319 if should_cache and (not self.schema.empty or not macro_evaluator.columns_to_types_called): 320 self._cache = resolved_expressions 321 return resolved_expressions 322 323 def update_cache(self, expression: t.Optional[exp.Expr]) -> None: 324 self._cache = [expression] 325 326 def _resolve_table( 327 self, 328 table_name: str | exp.Expr, 329 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 330 table_mapping: t.Optional[t.Dict[str, str]] = None, 331 deployability_index: t.Optional[DeployabilityIndex] = None, 332 ) -> exp.Table: 333 table = exp.replace_tables( 334 exp.maybe_parse(table_name, into=exp.Table, dialect=self._dialect), 335 { 336 **self._to_table_mapping((snapshots or {}).values(), deployability_index), 337 **(table_mapping or {}), 338 }, 339 dialect=self._dialect, 340 copy=False, 341 ) 342 # We quote the table here to mimic the behavior of _resolve_tables, otherwise we may end 343 # up normalizing twice, because _to_table_mapping returns the mapped names unquoted. 344 return ( 345 d.quote_identifiers(table, dialect=self._dialect) if self._quote_identifiers else table 346 ) 347 348 def _resolve_tables( 349 self, 350 expression: E, 351 *, 352 start: t.Optional[TimeLike] = None, 353 end: t.Optional[TimeLike] = None, 354 execution_time: t.Optional[TimeLike] = None, 355 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 356 table_mapping: t.Optional[t.Dict[str, str]] = None, 357 expand: t.Iterable[str] = tuple(), 358 deployability_index: t.Optional[DeployabilityIndex] = None, 359 **kwargs: t.Any, 360 ) -> E: 361 if not snapshots and not table_mapping and not expand: 362 return expression 363 364 expression = expression.copy() 365 with self._normalize_and_quote(expression) as expression: 366 snapshots = snapshots or {} 367 table_mapping = table_mapping or {} 368 mapping = { 369 **self._to_table_mapping(snapshots.values(), deployability_index), 370 **table_mapping, 371 } 372 expand = set(expand) | { 373 name for name, snapshot in snapshots.items() if snapshot.is_embedded 374 } 375 376 if expand: 377 model_mapping = { 378 name: snapshot.model 379 for name, snapshot in snapshots.items() 380 if snapshot.is_model 381 } 382 383 def _expand(node: exp.Expr) -> exp.Expr: 384 if isinstance(node, exp.Table) and snapshots: 385 name = exp.table_name(node, identify=True) 386 model = model_mapping.get(name) 387 if ( 388 name in expand 389 and model 390 and not model.is_seed 391 and not model.kind.is_external 392 ): 393 nested_query = model.render_query( 394 start=start, 395 end=end, 396 execution_time=execution_time, 397 snapshots=snapshots, 398 table_mapping=table_mapping, 399 expand=expand, 400 deployability_index=deployability_index, 401 **kwargs, 402 ) 403 if nested_query is not None: 404 return nested_query.subquery( 405 alias=node.alias or model.view_name, 406 copy=False, 407 ) 408 logger.warning("Failed to expand the nested model '%s'", name) 409 return node 410 411 expression = expression.transform(_expand, copy=False) # type: ignore 412 413 if mapping: 414 expression = exp.replace_tables( 415 expression, mapping, dialect=self._dialect, copy=False 416 ) 417 418 return expression 419 420 @contextmanager 421 def _normalize_and_quote(self, query: E) -> t.Iterator[E]: 422 if self._normalize_identifiers: 423 with d.normalize_and_quote( 424 query, self._dialect, self._default_catalog, quote=self._quote_identifiers 425 ) as query: 426 yield query 427 else: 428 yield query 429 430 def _should_cache(self, runtime_stage: RuntimeStage, *args: t.Any) -> bool: 431 return runtime_stage == RuntimeStage.LOADING and not any(args) 432 433 def _to_table_mapping( 434 self, snapshots: t.Iterable[Snapshot], deployability_index: t.Optional[DeployabilityIndex] 435 ) -> t.Dict[str, str]: 436 from sqlmesh.core.snapshot import to_table_mapping 437 438 return to_table_mapping(snapshots, deployability_index) 439 440 441class ExpressionRenderer(BaseExpressionRenderer): 442 def render( 443 self, 444 start: t.Optional[TimeLike] = None, 445 end: t.Optional[TimeLike] = None, 446 execution_time: t.Optional[TimeLike] = None, 447 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 448 table_mapping: t.Optional[t.Dict[str, str]] = None, 449 deployability_index: t.Optional[DeployabilityIndex] = None, 450 expand: t.Iterable[str] = tuple(), 451 **kwargs: t.Any, 452 ) -> t.Optional[t.List[exp.Expr]]: 453 try: 454 expressions = super()._render( 455 start=start, 456 end=end, 457 execution_time=execution_time, 458 snapshots=snapshots, 459 deployability_index=deployability_index, 460 table_mapping=table_mapping, 461 **kwargs, 462 ) 463 except ParsetimeAdapterCallError: 464 return None 465 466 return [ 467 self._resolve_tables( 468 e, 469 snapshots=snapshots, 470 table_mapping=table_mapping, 471 expand=expand, 472 deployability_index=deployability_index, 473 start=start, 474 end=end, 475 execution_time=execution_time, 476 **kwargs, 477 ) 478 for e in expressions 479 if e and not isinstance(e, exp.Semicolon) 480 ] 481 482 483def render_statements( 484 statements: t.List[str], 485 dialect: str, 486 default_catalog: t.Optional[str] = None, 487 python_env: t.Optional[t.Dict[str, Executable]] = None, 488 jinja_macros: t.Optional[JinjaMacroRegistry] = None, 489 **render_kwargs: t.Any, 490) -> t.List[str]: 491 rendered_statements: t.List[str] = [] 492 for statement in statements: 493 for expression in d.parse(statement, default_dialect=dialect): 494 if expression: 495 rendered = ExpressionRenderer( 496 expression, 497 dialect, 498 [], 499 jinja_macro_registry=jinja_macros, 500 python_env=python_env, 501 default_catalog=default_catalog, 502 quote_identifiers=False, 503 normalize_identifiers=False, 504 ).render(**render_kwargs) 505 506 if not rendered: 507 # Warning instead of raising for cases where a statement is conditionally executed 508 logger.warning( 509 f"Rendering `{expression.sql(dialect=dialect)}` did not return an expression" 510 ) 511 else: 512 rendered_statements.extend(expr.sql(dialect=dialect) for expr in rendered) 513 514 return rendered_statements 515 516 517class QueryRenderer(BaseExpressionRenderer): 518 def __init__(self, *args: t.Any, **kwargs: t.Any): 519 super().__init__(*args, **kwargs) 520 self._optimized_cache: t.Optional[exp.Query] = None 521 self._violated_rules: t.Dict[type[Rule], t.Any] = {} 522 523 def update_schema(self, schema: t.Dict[str, t.Any]) -> None: 524 super().update_schema(schema) 525 self._optimized_cache = None 526 527 def render( 528 self, 529 start: t.Optional[TimeLike] = None, 530 end: t.Optional[TimeLike] = None, 531 execution_time: t.Optional[TimeLike] = None, 532 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 533 table_mapping: t.Optional[t.Dict[str, str]] = None, 534 deployability_index: t.Optional[DeployabilityIndex] = None, 535 expand: t.Iterable[str] = tuple(), 536 needs_optimization: bool = True, 537 runtime_stage: RuntimeStage = RuntimeStage.LOADING, 538 **kwargs: t.Any, 539 ) -> t.Optional[exp.Query]: 540 """Renders a query, expanding macros with provided kwargs, and optionally expanding referenced models. 541 542 Args: 543 query: The query to render. 544 start: The start datetime to render. Defaults to epoch start. 545 end: The end datetime to render. Defaults to epoch start. 546 execution_time: The date/time time reference to use for execution time. Defaults to epoch start. 547 snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations. 548 table_mapping: Table mapping of physical locations. Takes precedence over snapshot mappings. 549 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 550 expand: Expand referenced models as subqueries. This is used to bypass backfills when running queries 551 that depend on materialized tables. Model definitions are inlined and can thus be run end to 552 end on the fly. 553 needs_optimization: Whether or not an optimization should be attempted 554 (if passing False, it still may return a cached optimized query). 555 runtime_stage: Indicates the current runtime stage, for example if we're still loading the project, etc. 556 kwargs: Additional kwargs to pass to the renderer. 557 558 Returns: 559 The rendered expression. 560 """ 561 562 should_cache = self._should_cache( 563 runtime_stage, start, end, execution_time, *kwargs.values() 564 ) 565 566 if should_cache and self._optimized_cache: 567 query = self._optimized_cache 568 else: 569 try: 570 expressions = super()._render( 571 start=start, 572 end=end, 573 execution_time=execution_time, 574 snapshots=snapshots, 575 table_mapping=table_mapping, 576 deployability_index=deployability_index, 577 runtime_stage=runtime_stage, 578 **kwargs, 579 ) 580 except ParsetimeAdapterCallError: 581 return None 582 583 expressions = [e for e in expressions if not isinstance(e, exp.Semicolon)] 584 585 if not expressions: 586 # We assume that if there are no expressions, then the model contains dynamic Jinja SQL 587 # and we thus treat it similar to models with adapter calls to match dbt's behavior. 588 if isinstance(self._expression, d.JinjaQuery): 589 return None 590 591 raise ConfigError(f"Failed to render query at '{self._path}':\n{self._expression}") 592 593 if len(expressions) > 1: 594 raise ConfigError(f"Too many statements in query:\n{self._expression}") 595 596 query = expressions[0] # type: ignore 597 598 if not query: 599 return None 600 if not isinstance(query, exp.Query): 601 raise_config_error( 602 f"Model query needs to be a SELECT or a UNION, got {query}.", self._path 603 ) 604 raise 605 606 if needs_optimization and self._optimize_query_flag: 607 deps = d.find_tables( 608 query, default_catalog=self._default_catalog, dialect=self._dialect 609 ) 610 611 query = self._optimize_query(query, deps) 612 613 if should_cache: 614 self._optimized_cache = query 615 616 if needs_optimization: 617 query = self._resolve_tables( 618 query, 619 snapshots=snapshots, 620 table_mapping=table_mapping, 621 expand=expand, 622 deployability_index=deployability_index, 623 start=start, 624 end=end, 625 execution_time=execution_time, 626 runtime_stage=runtime_stage, 627 **kwargs, 628 ) 629 630 return query 631 632 def update_cache( 633 self, 634 expression: t.Optional[exp.Expr], 635 violated_rules: t.Optional[t.Dict[type[Rule], t.Any]] = None, 636 optimized: bool = False, 637 ) -> None: 638 if optimized: 639 if not isinstance(expression, exp.Query): 640 raise SQLMeshError(f"Expected a Query but got: {expression}") 641 self._optimized_cache = expression 642 else: 643 super().update_cache(expression) 644 645 self._violated_rules = violated_rules or {} 646 647 def _optimize_query(self, query: exp.Query, all_deps: t.Set[str]) -> exp.Query: 648 from sqlmesh.core.linter.rules.builtin import ( 649 AmbiguousOrInvalidColumn, 650 InvalidSelectStarExpansion, 651 ) 652 653 # We don't want to normalize names in the schema because that's handled by the optimizer 654 original = query 655 missing_deps = set() 656 all_deps = all_deps - {self._model_fqn} 657 should_optimize = not self.schema.empty or not all_deps 658 659 for dep in all_deps: 660 if not self.schema.find(exp.to_table(dep)): 661 should_optimize = False 662 missing_deps.add(dep) 663 664 if self._model_fqn and not should_optimize and any(s.is_star for s in query.selects): 665 deps = ", ".join(f"'{dep}'" for dep in sorted(missing_deps)) 666 self._violated_rules[InvalidSelectStarExpansion] = deps 667 668 try: 669 if should_optimize: 670 query = query.copy() 671 simplify( 672 annotate_types( 673 qualify( 674 query, 675 dialect=self._dialect, 676 schema=self.schema, 677 infer_schema=False, 678 catalog=self._default_catalog, 679 quote_identifiers=self._quote_identifiers, 680 ), 681 schema=self.schema, 682 dialect=self._dialect, 683 ), 684 dialect=self._dialect, 685 ) 686 except SqlglotError as ex: 687 self._violated_rules[AmbiguousOrInvalidColumn] = ex 688 689 query = original 690 691 except Exception as ex: 692 raise_config_error( 693 f"Failed to optimize query, please file an issue at https://github.com/SQLMesh/sqlmesh/issues/new. {ex}", 694 self._path, 695 ) 696 697 if not query.type: 698 for select in query.expressions: 699 annotate_types(select) 700 701 return query 702 703 704def _prepare_python_env_for_jinja( 705 evaluator: MacroEvaluator, 706 python_env: t.Dict[str, Executable], 707) -> t.Dict[str, t.Any]: 708 prepared_env = prepare_env(python_env) 709 # Pass the evaluator to all macro functions 710 return { 711 key: partial(value, evaluator) if callable(value) else value 712 for key, value in prepared_env.items() 713 }
logger =
<Logger sqlmesh.core.renderer (WARNING)>
class
BaseExpressionRenderer:
49class BaseExpressionRenderer: 50 def __init__( 51 self, 52 expression: exp.Expr, 53 dialect: DialectType, 54 macro_definitions: t.List[d.MacroDef], 55 path: t.Optional[Path] = None, 56 jinja_macro_registry: t.Optional[JinjaMacroRegistry] = None, 57 python_env: t.Optional[t.Dict[str, Executable]] = None, 58 only_execution_time: bool = False, 59 schema: t.Optional[t.Dict[str, t.Any]] = None, 60 default_catalog: t.Optional[str] = None, 61 quote_identifiers: bool = True, 62 normalize_identifiers: bool = True, 63 optimize_query: t.Optional[bool] = True, 64 model: t.Optional[_Model] = None, 65 ): 66 self._expression = expression 67 self._dialect = dialect 68 self._macro_definitions = macro_definitions 69 self._path = path 70 self._jinja_macro_registry = jinja_macro_registry or JinjaMacroRegistry() 71 self._python_env = python_env or {} 72 self._only_execution_time = only_execution_time 73 self._default_catalog = default_catalog 74 self._normalize_identifiers = normalize_identifiers 75 self._quote_identifiers = quote_identifiers 76 self.update_schema({} if schema is None else schema) 77 self._cache: t.List[t.Optional[exp.Expr]] = [] 78 self._model_fqn = model.fqn if model else None 79 self._optimize_query_flag = optimize_query is not False 80 self._model = model 81 82 def update_schema(self, schema: t.Dict[str, t.Any]) -> None: 83 self.schema = d.normalize_mapping_schema(schema, dialect=self._dialect) 84 85 def _render( 86 self, 87 start: t.Optional[TimeLike] = None, 88 end: t.Optional[TimeLike] = None, 89 execution_time: t.Optional[TimeLike] = None, 90 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 91 table_mapping: t.Optional[t.Dict[str, str]] = None, 92 deployability_index: t.Optional[DeployabilityIndex] = None, 93 runtime_stage: RuntimeStage = RuntimeStage.LOADING, 94 **kwargs: t.Any, 95 ) -> t.List[t.Optional[exp.Expr]]: 96 """Renders a expression, expanding macros with provided kwargs 97 98 Args: 99 start: The start datetime to render. Defaults to epoch start. 100 end: The end datetime to render. Defaults to epoch start. 101 execution_time: The date/time time reference to use for execution time. 102 snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations. 103 table_mapping: Table mapping of physical locations. Takes precedence over snapshot mappings. 104 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 105 runtime_stage: Indicates the current runtime stage, for example if we're still loading the project, etc. 106 kwargs: Additional kwargs to pass to the renderer. 107 108 Returns: 109 The rendered expressions. 110 """ 111 112 should_cache = self._should_cache( 113 runtime_stage, start, end, execution_time, *kwargs.values() 114 ) 115 116 if should_cache and self._cache: 117 return self._cache 118 119 environment_naming_info = kwargs.get("environment_naming_info") 120 if environment_naming_info is not None: 121 kwargs["this_env"] = getattr(environment_naming_info, "name") 122 if snapshots: 123 schemas, views = set(), [] 124 for snapshot in snapshots.values(): 125 if snapshot.is_model and not snapshot.is_symbolic: 126 schemas.add( 127 snapshot.qualified_view_name.schema_for_environment( 128 environment_naming_info, dialect=self._dialect 129 ) 130 ) 131 views.append( 132 snapshot.display_name( 133 environment_naming_info, self._default_catalog, self._dialect 134 ) 135 ) 136 if schemas: 137 kwargs["schemas"] = list(schemas) 138 if views: 139 kwargs["views"] = views 140 141 this_model = kwargs.pop("this_model", None) 142 143 this_snapshot = (snapshots or {}).get(self._model_fqn) if self._model_fqn else None 144 if not this_model and self._model_fqn: 145 this_model = self._resolve_table( 146 self._model_fqn, 147 snapshots={self._model_fqn: this_snapshot} if this_snapshot else None, 148 deployability_index=deployability_index, 149 table_mapping=table_mapping, 150 ) 151 if this_snapshot and (kind := this_snapshot.model_kind_name): 152 kwargs["model_kind_name"] = kind.name 153 154 def _resolve_table(table: str | exp.Table) -> str: 155 return self._resolve_table( 156 d.normalize_model_name(table, self._default_catalog, self._dialect), 157 snapshots=snapshots, 158 table_mapping=table_mapping, 159 deployability_index=deployability_index, 160 ).sql(dialect=self._dialect, identify=True, comments=False) 161 162 macro_evaluator = MacroEvaluator( 163 self._dialect, 164 python_env=self._python_env, 165 schema=self.schema, 166 runtime_stage=runtime_stage, 167 resolve_table=_resolve_table, 168 resolve_tables=lambda e: self._resolve_tables( 169 e, 170 snapshots=snapshots, 171 table_mapping=table_mapping, 172 deployability_index=deployability_index, 173 start=start, 174 end=end, 175 execution_time=execution_time, 176 runtime_stage=runtime_stage, 177 ), 178 snapshots=snapshots, 179 default_catalog=self._default_catalog, 180 path=self._path, 181 environment_naming_info=environment_naming_info, 182 model_fqn=self._model_fqn, 183 ) 184 185 start_time, end_time = ( 186 make_inclusive(start or c.EPOCH, end or c.EPOCH, self._dialect) 187 if not self._only_execution_time 188 else (None, None) 189 ) 190 191 render_kwargs = { 192 **date_dict( 193 to_datetime(execution_time or c.EPOCH), 194 start_time, 195 end_time, 196 ), 197 **kwargs, 198 } 199 200 if this_model: 201 render_kwargs["this_model"] = this_model 202 203 macro_evaluator.locals.update(render_kwargs) 204 205 variables = kwargs.pop("variables", {}) 206 if variables: 207 macro_evaluator.locals.setdefault(c.SQLMESH_VARS, {}).update(variables) 208 209 expressions: t.List[exp.Expr] = [self._expression] 210 if isinstance(self._expression, d.Jinja): 211 try: 212 jinja_env_kwargs = { 213 **{ 214 **render_kwargs, 215 **_prepare_python_env_for_jinja(macro_evaluator, self._python_env), 216 **variables, 217 }, 218 "snapshots": snapshots or {}, 219 "table_mapping": table_mapping, 220 "deployability_index": deployability_index, 221 "default_catalog": self._default_catalog, 222 "runtime_stage": runtime_stage.value, 223 "resolve_table": _resolve_table, 224 "model_instance": self._model, 225 } 226 227 if this_model: 228 jinja_env_kwargs["this_model"] = this_model.sql( 229 dialect=self._dialect, identify=True, comments=False 230 ) 231 232 if self._model and self._model.kind.is_incremental_by_time_range: 233 all_refs = list( 234 self._jinja_macro_registry.global_objs.get("sources", {}).values() # type: ignore 235 ) + list( 236 self._jinja_macro_registry.global_objs.get("refs", {}).values() # type: ignore 237 ) 238 for ref in all_refs: 239 if ref.event_time_filter: 240 ref.event_time_filter["start"] = render_kwargs["start_tstz"] 241 ref.event_time_filter["end"] = to_tstz( 242 make_ts_exclusive(render_kwargs["end_tstz"], dialect=self._dialect) 243 ) 244 245 jinja_env = self._jinja_macro_registry.build_environment(**jinja_env_kwargs) 246 247 expressions = [] 248 rendered_expression = jinja_env.from_string(self._expression.name).render() 249 logger.debug( 250 f"Rendered Jinja expression for model '{self._model_fqn}' at '{self._path}': '{rendered_expression}'" 251 ) 252 except ParsetimeAdapterCallError: 253 raise 254 except Exception as ex: 255 raise ConfigError( 256 f"Could not render jinja for '{self._path}'.\n" + extract_error_details(ex) 257 ) from ex 258 259 if rendered_expression.strip(): 260 # ensure there is actual SQL and not just comments and non-SQL jinja 261 dialect = Dialect.get_or_raise(self._dialect) 262 tokens = dialect.tokenize(rendered_expression) 263 264 if tokens: 265 try: 266 expressions = [ 267 e for e in dialect.parser().parse(tokens, rendered_expression) if e 268 ] 269 270 if not expressions: 271 raise ConfigError( 272 f"Failed to parse an expression:\n{rendered_expression}" 273 ) 274 except Exception as ex: 275 raise ConfigError( 276 f"Could not parse the rendered jinja at '{self._path}'.\n{ex}" 277 ) from ex 278 279 for definition in self._macro_definitions: 280 try: 281 macro_evaluator.evaluate(definition) 282 except Exception as ex: 283 raise_config_error( 284 f"Failed to evaluate macro '{definition}'.\n\n{ex}\n", self._path 285 ) 286 287 resolved_expressions: t.List[t.Optional[exp.Expr]] = [] 288 289 for expression in expressions: 290 try: 291 transformed_expressions = ensure_list(macro_evaluator.transform(expression)) 292 except Exception as ex: 293 raise_config_error( 294 f"Failed to resolve macros for\n\n{expression.sql(dialect=self._dialect, pretty=True)}\n\n{ex}\n", 295 self._path, 296 ) 297 298 for expression in t.cast(t.List[exp.Expr], transformed_expressions): 299 with self._normalize_and_quote(expression) as expression: 300 if hasattr(expression, "selects"): 301 for select in expression.selects: 302 if not isinstance(select, exp.Alias) and select.output_name not in ( 303 "*", 304 "", 305 ): 306 alias = exp.alias_( 307 select, select.output_name, quoted=self._quote_identifiers 308 ) 309 comments = alias.this.comments 310 if comments: 311 alias.add_comments(comments) 312 comments.clear() 313 314 select.replace(alias) 315 resolved_expressions.append(expression) 316 317 # We dont cache here if columns_to_type was called in a macro. 318 # This allows the model's query to be re-rendered so that the 319 # MacroEvaluator can resolve columns_to_types calls and provide true schemas. 320 if should_cache and (not self.schema.empty or not macro_evaluator.columns_to_types_called): 321 self._cache = resolved_expressions 322 return resolved_expressions 323 324 def update_cache(self, expression: t.Optional[exp.Expr]) -> None: 325 self._cache = [expression] 326 327 def _resolve_table( 328 self, 329 table_name: str | exp.Expr, 330 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 331 table_mapping: t.Optional[t.Dict[str, str]] = None, 332 deployability_index: t.Optional[DeployabilityIndex] = None, 333 ) -> exp.Table: 334 table = exp.replace_tables( 335 exp.maybe_parse(table_name, into=exp.Table, dialect=self._dialect), 336 { 337 **self._to_table_mapping((snapshots or {}).values(), deployability_index), 338 **(table_mapping or {}), 339 }, 340 dialect=self._dialect, 341 copy=False, 342 ) 343 # We quote the table here to mimic the behavior of _resolve_tables, otherwise we may end 344 # up normalizing twice, because _to_table_mapping returns the mapped names unquoted. 345 return ( 346 d.quote_identifiers(table, dialect=self._dialect) if self._quote_identifiers else table 347 ) 348 349 def _resolve_tables( 350 self, 351 expression: E, 352 *, 353 start: t.Optional[TimeLike] = None, 354 end: t.Optional[TimeLike] = None, 355 execution_time: t.Optional[TimeLike] = None, 356 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 357 table_mapping: t.Optional[t.Dict[str, str]] = None, 358 expand: t.Iterable[str] = tuple(), 359 deployability_index: t.Optional[DeployabilityIndex] = None, 360 **kwargs: t.Any, 361 ) -> E: 362 if not snapshots and not table_mapping and not expand: 363 return expression 364 365 expression = expression.copy() 366 with self._normalize_and_quote(expression) as expression: 367 snapshots = snapshots or {} 368 table_mapping = table_mapping or {} 369 mapping = { 370 **self._to_table_mapping(snapshots.values(), deployability_index), 371 **table_mapping, 372 } 373 expand = set(expand) | { 374 name for name, snapshot in snapshots.items() if snapshot.is_embedded 375 } 376 377 if expand: 378 model_mapping = { 379 name: snapshot.model 380 for name, snapshot in snapshots.items() 381 if snapshot.is_model 382 } 383 384 def _expand(node: exp.Expr) -> exp.Expr: 385 if isinstance(node, exp.Table) and snapshots: 386 name = exp.table_name(node, identify=True) 387 model = model_mapping.get(name) 388 if ( 389 name in expand 390 and model 391 and not model.is_seed 392 and not model.kind.is_external 393 ): 394 nested_query = model.render_query( 395 start=start, 396 end=end, 397 execution_time=execution_time, 398 snapshots=snapshots, 399 table_mapping=table_mapping, 400 expand=expand, 401 deployability_index=deployability_index, 402 **kwargs, 403 ) 404 if nested_query is not None: 405 return nested_query.subquery( 406 alias=node.alias or model.view_name, 407 copy=False, 408 ) 409 logger.warning("Failed to expand the nested model '%s'", name) 410 return node 411 412 expression = expression.transform(_expand, copy=False) # type: ignore 413 414 if mapping: 415 expression = exp.replace_tables( 416 expression, mapping, dialect=self._dialect, copy=False 417 ) 418 419 return expression 420 421 @contextmanager 422 def _normalize_and_quote(self, query: E) -> t.Iterator[E]: 423 if self._normalize_identifiers: 424 with d.normalize_and_quote( 425 query, self._dialect, self._default_catalog, quote=self._quote_identifiers 426 ) as query: 427 yield query 428 else: 429 yield query 430 431 def _should_cache(self, runtime_stage: RuntimeStage, *args: t.Any) -> bool: 432 return runtime_stage == RuntimeStage.LOADING and not any(args) 433 434 def _to_table_mapping( 435 self, snapshots: t.Iterable[Snapshot], deployability_index: t.Optional[DeployabilityIndex] 436 ) -> t.Dict[str, str]: 437 from sqlmesh.core.snapshot import to_table_mapping 438 439 return to_table_mapping(snapshots, deployability_index)
BaseExpressionRenderer( expression: sqlglot.expressions.core.Expr, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType], macro_definitions: List[sqlmesh.core.dialect.MacroDef], path: Optional[pathlib.Path] = None, jinja_macro_registry: Optional[sqlmesh.utils.jinja.JinjaMacroRegistry] = None, python_env: Optional[Dict[str, sqlmesh.utils.metaprogramming.Executable]] = None, only_execution_time: bool = False, schema: Optional[Dict[str, Any]] = None, default_catalog: Optional[str] = None, quote_identifiers: bool = True, normalize_identifiers: bool = True, optimize_query: Optional[bool] = True, model: Optional[sqlmesh.core.model.definition._Model] = None)
50 def __init__( 51 self, 52 expression: exp.Expr, 53 dialect: DialectType, 54 macro_definitions: t.List[d.MacroDef], 55 path: t.Optional[Path] = None, 56 jinja_macro_registry: t.Optional[JinjaMacroRegistry] = None, 57 python_env: t.Optional[t.Dict[str, Executable]] = None, 58 only_execution_time: bool = False, 59 schema: t.Optional[t.Dict[str, t.Any]] = None, 60 default_catalog: t.Optional[str] = None, 61 quote_identifiers: bool = True, 62 normalize_identifiers: bool = True, 63 optimize_query: t.Optional[bool] = True, 64 model: t.Optional[_Model] = None, 65 ): 66 self._expression = expression 67 self._dialect = dialect 68 self._macro_definitions = macro_definitions 69 self._path = path 70 self._jinja_macro_registry = jinja_macro_registry or JinjaMacroRegistry() 71 self._python_env = python_env or {} 72 self._only_execution_time = only_execution_time 73 self._default_catalog = default_catalog 74 self._normalize_identifiers = normalize_identifiers 75 self._quote_identifiers = quote_identifiers 76 self.update_schema({} if schema is None else schema) 77 self._cache: t.List[t.Optional[exp.Expr]] = [] 78 self._model_fqn = model.fqn if model else None 79 self._optimize_query_flag = optimize_query is not False 80 self._model = model
442class ExpressionRenderer(BaseExpressionRenderer): 443 def render( 444 self, 445 start: t.Optional[TimeLike] = None, 446 end: t.Optional[TimeLike] = None, 447 execution_time: t.Optional[TimeLike] = None, 448 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 449 table_mapping: t.Optional[t.Dict[str, str]] = None, 450 deployability_index: t.Optional[DeployabilityIndex] = None, 451 expand: t.Iterable[str] = tuple(), 452 **kwargs: t.Any, 453 ) -> t.Optional[t.List[exp.Expr]]: 454 try: 455 expressions = super()._render( 456 start=start, 457 end=end, 458 execution_time=execution_time, 459 snapshots=snapshots, 460 deployability_index=deployability_index, 461 table_mapping=table_mapping, 462 **kwargs, 463 ) 464 except ParsetimeAdapterCallError: 465 return None 466 467 return [ 468 self._resolve_tables( 469 e, 470 snapshots=snapshots, 471 table_mapping=table_mapping, 472 expand=expand, 473 deployability_index=deployability_index, 474 start=start, 475 end=end, 476 execution_time=execution_time, 477 **kwargs, 478 ) 479 for e in expressions 480 if e and not isinstance(e, exp.Semicolon) 481 ]
def
render( self, start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, snapshots: Optional[Dict[str, sqlmesh.core.snapshot.definition.Snapshot]] = None, table_mapping: Optional[Dict[str, str]] = None, deployability_index: Optional[sqlmesh.core.snapshot.definition.DeployabilityIndex] = None, expand: Iterable[str] = (), **kwargs: Any) -> Optional[List[sqlglot.expressions.core.Expr]]:
443 def render( 444 self, 445 start: t.Optional[TimeLike] = None, 446 end: t.Optional[TimeLike] = None, 447 execution_time: t.Optional[TimeLike] = None, 448 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 449 table_mapping: t.Optional[t.Dict[str, str]] = None, 450 deployability_index: t.Optional[DeployabilityIndex] = None, 451 expand: t.Iterable[str] = tuple(), 452 **kwargs: t.Any, 453 ) -> t.Optional[t.List[exp.Expr]]: 454 try: 455 expressions = super()._render( 456 start=start, 457 end=end, 458 execution_time=execution_time, 459 snapshots=snapshots, 460 deployability_index=deployability_index, 461 table_mapping=table_mapping, 462 **kwargs, 463 ) 464 except ParsetimeAdapterCallError: 465 return None 466 467 return [ 468 self._resolve_tables( 469 e, 470 snapshots=snapshots, 471 table_mapping=table_mapping, 472 expand=expand, 473 deployability_index=deployability_index, 474 start=start, 475 end=end, 476 execution_time=execution_time, 477 **kwargs, 478 ) 479 for e in expressions 480 if e and not isinstance(e, exp.Semicolon) 481 ]
Inherited Members
def
render_statements( statements: List[str], dialect: str, default_catalog: Optional[str] = None, python_env: Optional[Dict[str, sqlmesh.utils.metaprogramming.Executable]] = None, jinja_macros: Optional[sqlmesh.utils.jinja.JinjaMacroRegistry] = None, **render_kwargs: Any) -> List[str]:
484def render_statements( 485 statements: t.List[str], 486 dialect: str, 487 default_catalog: t.Optional[str] = None, 488 python_env: t.Optional[t.Dict[str, Executable]] = None, 489 jinja_macros: t.Optional[JinjaMacroRegistry] = None, 490 **render_kwargs: t.Any, 491) -> t.List[str]: 492 rendered_statements: t.List[str] = [] 493 for statement in statements: 494 for expression in d.parse(statement, default_dialect=dialect): 495 if expression: 496 rendered = ExpressionRenderer( 497 expression, 498 dialect, 499 [], 500 jinja_macro_registry=jinja_macros, 501 python_env=python_env, 502 default_catalog=default_catalog, 503 quote_identifiers=False, 504 normalize_identifiers=False, 505 ).render(**render_kwargs) 506 507 if not rendered: 508 # Warning instead of raising for cases where a statement is conditionally executed 509 logger.warning( 510 f"Rendering `{expression.sql(dialect=dialect)}` did not return an expression" 511 ) 512 else: 513 rendered_statements.extend(expr.sql(dialect=dialect) for expr in rendered) 514 515 return rendered_statements
518class QueryRenderer(BaseExpressionRenderer): 519 def __init__(self, *args: t.Any, **kwargs: t.Any): 520 super().__init__(*args, **kwargs) 521 self._optimized_cache: t.Optional[exp.Query] = None 522 self._violated_rules: t.Dict[type[Rule], t.Any] = {} 523 524 def update_schema(self, schema: t.Dict[str, t.Any]) -> None: 525 super().update_schema(schema) 526 self._optimized_cache = None 527 528 def render( 529 self, 530 start: t.Optional[TimeLike] = None, 531 end: t.Optional[TimeLike] = None, 532 execution_time: t.Optional[TimeLike] = None, 533 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 534 table_mapping: t.Optional[t.Dict[str, str]] = None, 535 deployability_index: t.Optional[DeployabilityIndex] = None, 536 expand: t.Iterable[str] = tuple(), 537 needs_optimization: bool = True, 538 runtime_stage: RuntimeStage = RuntimeStage.LOADING, 539 **kwargs: t.Any, 540 ) -> t.Optional[exp.Query]: 541 """Renders a query, expanding macros with provided kwargs, and optionally expanding referenced models. 542 543 Args: 544 query: The query to render. 545 start: The start datetime to render. Defaults to epoch start. 546 end: The end datetime to render. Defaults to epoch start. 547 execution_time: The date/time time reference to use for execution time. Defaults to epoch start. 548 snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations. 549 table_mapping: Table mapping of physical locations. Takes precedence over snapshot mappings. 550 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 551 expand: Expand referenced models as subqueries. This is used to bypass backfills when running queries 552 that depend on materialized tables. Model definitions are inlined and can thus be run end to 553 end on the fly. 554 needs_optimization: Whether or not an optimization should be attempted 555 (if passing False, it still may return a cached optimized query). 556 runtime_stage: Indicates the current runtime stage, for example if we're still loading the project, etc. 557 kwargs: Additional kwargs to pass to the renderer. 558 559 Returns: 560 The rendered expression. 561 """ 562 563 should_cache = self._should_cache( 564 runtime_stage, start, end, execution_time, *kwargs.values() 565 ) 566 567 if should_cache and self._optimized_cache: 568 query = self._optimized_cache 569 else: 570 try: 571 expressions = super()._render( 572 start=start, 573 end=end, 574 execution_time=execution_time, 575 snapshots=snapshots, 576 table_mapping=table_mapping, 577 deployability_index=deployability_index, 578 runtime_stage=runtime_stage, 579 **kwargs, 580 ) 581 except ParsetimeAdapterCallError: 582 return None 583 584 expressions = [e for e in expressions if not isinstance(e, exp.Semicolon)] 585 586 if not expressions: 587 # We assume that if there are no expressions, then the model contains dynamic Jinja SQL 588 # and we thus treat it similar to models with adapter calls to match dbt's behavior. 589 if isinstance(self._expression, d.JinjaQuery): 590 return None 591 592 raise ConfigError(f"Failed to render query at '{self._path}':\n{self._expression}") 593 594 if len(expressions) > 1: 595 raise ConfigError(f"Too many statements in query:\n{self._expression}") 596 597 query = expressions[0] # type: ignore 598 599 if not query: 600 return None 601 if not isinstance(query, exp.Query): 602 raise_config_error( 603 f"Model query needs to be a SELECT or a UNION, got {query}.", self._path 604 ) 605 raise 606 607 if needs_optimization and self._optimize_query_flag: 608 deps = d.find_tables( 609 query, default_catalog=self._default_catalog, dialect=self._dialect 610 ) 611 612 query = self._optimize_query(query, deps) 613 614 if should_cache: 615 self._optimized_cache = query 616 617 if needs_optimization: 618 query = self._resolve_tables( 619 query, 620 snapshots=snapshots, 621 table_mapping=table_mapping, 622 expand=expand, 623 deployability_index=deployability_index, 624 start=start, 625 end=end, 626 execution_time=execution_time, 627 runtime_stage=runtime_stage, 628 **kwargs, 629 ) 630 631 return query 632 633 def update_cache( 634 self, 635 expression: t.Optional[exp.Expr], 636 violated_rules: t.Optional[t.Dict[type[Rule], t.Any]] = None, 637 optimized: bool = False, 638 ) -> None: 639 if optimized: 640 if not isinstance(expression, exp.Query): 641 raise SQLMeshError(f"Expected a Query but got: {expression}") 642 self._optimized_cache = expression 643 else: 644 super().update_cache(expression) 645 646 self._violated_rules = violated_rules or {} 647 648 def _optimize_query(self, query: exp.Query, all_deps: t.Set[str]) -> exp.Query: 649 from sqlmesh.core.linter.rules.builtin import ( 650 AmbiguousOrInvalidColumn, 651 InvalidSelectStarExpansion, 652 ) 653 654 # We don't want to normalize names in the schema because that's handled by the optimizer 655 original = query 656 missing_deps = set() 657 all_deps = all_deps - {self._model_fqn} 658 should_optimize = not self.schema.empty or not all_deps 659 660 for dep in all_deps: 661 if not self.schema.find(exp.to_table(dep)): 662 should_optimize = False 663 missing_deps.add(dep) 664 665 if self._model_fqn and not should_optimize and any(s.is_star for s in query.selects): 666 deps = ", ".join(f"'{dep}'" for dep in sorted(missing_deps)) 667 self._violated_rules[InvalidSelectStarExpansion] = deps 668 669 try: 670 if should_optimize: 671 query = query.copy() 672 simplify( 673 annotate_types( 674 qualify( 675 query, 676 dialect=self._dialect, 677 schema=self.schema, 678 infer_schema=False, 679 catalog=self._default_catalog, 680 quote_identifiers=self._quote_identifiers, 681 ), 682 schema=self.schema, 683 dialect=self._dialect, 684 ), 685 dialect=self._dialect, 686 ) 687 except SqlglotError as ex: 688 self._violated_rules[AmbiguousOrInvalidColumn] = ex 689 690 query = original 691 692 except Exception as ex: 693 raise_config_error( 694 f"Failed to optimize query, please file an issue at https://github.com/SQLMesh/sqlmesh/issues/new. {ex}", 695 self._path, 696 ) 697 698 if not query.type: 699 for select in query.expressions: 700 annotate_types(select) 701 702 return query
def
render( self, start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, snapshots: Optional[Dict[str, sqlmesh.core.snapshot.definition.Snapshot]] = None, table_mapping: Optional[Dict[str, str]] = None, deployability_index: Optional[sqlmesh.core.snapshot.definition.DeployabilityIndex] = None, expand: Iterable[str] = (), needs_optimization: bool = True, runtime_stage: sqlmesh.core.macros.RuntimeStage = <RuntimeStage.LOADING: 'loading'>, **kwargs: Any) -> Optional[sqlglot.expressions.query.Query]:
528 def render( 529 self, 530 start: t.Optional[TimeLike] = None, 531 end: t.Optional[TimeLike] = None, 532 execution_time: t.Optional[TimeLike] = None, 533 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 534 table_mapping: t.Optional[t.Dict[str, str]] = None, 535 deployability_index: t.Optional[DeployabilityIndex] = None, 536 expand: t.Iterable[str] = tuple(), 537 needs_optimization: bool = True, 538 runtime_stage: RuntimeStage = RuntimeStage.LOADING, 539 **kwargs: t.Any, 540 ) -> t.Optional[exp.Query]: 541 """Renders a query, expanding macros with provided kwargs, and optionally expanding referenced models. 542 543 Args: 544 query: The query to render. 545 start: The start datetime to render. Defaults to epoch start. 546 end: The end datetime to render. Defaults to epoch start. 547 execution_time: The date/time time reference to use for execution time. Defaults to epoch start. 548 snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations. 549 table_mapping: Table mapping of physical locations. Takes precedence over snapshot mappings. 550 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 551 expand: Expand referenced models as subqueries. This is used to bypass backfills when running queries 552 that depend on materialized tables. Model definitions are inlined and can thus be run end to 553 end on the fly. 554 needs_optimization: Whether or not an optimization should be attempted 555 (if passing False, it still may return a cached optimized query). 556 runtime_stage: Indicates the current runtime stage, for example if we're still loading the project, etc. 557 kwargs: Additional kwargs to pass to the renderer. 558 559 Returns: 560 The rendered expression. 561 """ 562 563 should_cache = self._should_cache( 564 runtime_stage, start, end, execution_time, *kwargs.values() 565 ) 566 567 if should_cache and self._optimized_cache: 568 query = self._optimized_cache 569 else: 570 try: 571 expressions = super()._render( 572 start=start, 573 end=end, 574 execution_time=execution_time, 575 snapshots=snapshots, 576 table_mapping=table_mapping, 577 deployability_index=deployability_index, 578 runtime_stage=runtime_stage, 579 **kwargs, 580 ) 581 except ParsetimeAdapterCallError: 582 return None 583 584 expressions = [e for e in expressions if not isinstance(e, exp.Semicolon)] 585 586 if not expressions: 587 # We assume that if there are no expressions, then the model contains dynamic Jinja SQL 588 # and we thus treat it similar to models with adapter calls to match dbt's behavior. 589 if isinstance(self._expression, d.JinjaQuery): 590 return None 591 592 raise ConfigError(f"Failed to render query at '{self._path}':\n{self._expression}") 593 594 if len(expressions) > 1: 595 raise ConfigError(f"Too many statements in query:\n{self._expression}") 596 597 query = expressions[0] # type: ignore 598 599 if not query: 600 return None 601 if not isinstance(query, exp.Query): 602 raise_config_error( 603 f"Model query needs to be a SELECT or a UNION, got {query}.", self._path 604 ) 605 raise 606 607 if needs_optimization and self._optimize_query_flag: 608 deps = d.find_tables( 609 query, default_catalog=self._default_catalog, dialect=self._dialect 610 ) 611 612 query = self._optimize_query(query, deps) 613 614 if should_cache: 615 self._optimized_cache = query 616 617 if needs_optimization: 618 query = self._resolve_tables( 619 query, 620 snapshots=snapshots, 621 table_mapping=table_mapping, 622 expand=expand, 623 deployability_index=deployability_index, 624 start=start, 625 end=end, 626 execution_time=execution_time, 627 runtime_stage=runtime_stage, 628 **kwargs, 629 ) 630 631 return query
Renders a query, expanding macros with provided kwargs, and optionally expanding referenced models.
Arguments:
- query: The query to render.
- start: The start datetime to render. Defaults to epoch start.
- end: The end datetime to render. Defaults to epoch start.
- execution_time: The date/time time reference to use for execution time. Defaults to epoch start.
- snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations.
- table_mapping: Table mapping of physical locations. Takes precedence over snapshot mappings.
- deployability_index: Determines snapshots that are deployable in the context of this evaluation.
- expand: Expand referenced models as subqueries. This is used to bypass backfills when running queries that depend on materialized tables. Model definitions are inlined and can thus be run end to end on the fly.
- needs_optimization: Whether or not an optimization should be attempted (if passing False, it still may return a cached optimized query).
- runtime_stage: Indicates the current runtime stage, for example if we're still loading the project, etc.
- kwargs: Additional kwargs to pass to the renderer.
Returns:
The rendered expression.
def
update_cache( self, expression: Optional[sqlglot.expressions.core.Expr], violated_rules: Optional[Dict[type[sqlmesh.core.linter.rule.Rule], Any]] = None, optimized: bool = False) -> None:
633 def update_cache( 634 self, 635 expression: t.Optional[exp.Expr], 636 violated_rules: t.Optional[t.Dict[type[Rule], t.Any]] = None, 637 optimized: bool = False, 638 ) -> None: 639 if optimized: 640 if not isinstance(expression, exp.Query): 641 raise SQLMeshError(f"Expected a Query but got: {expression}") 642 self._optimized_cache = expression 643 else: 644 super().update_cache(expression) 645 646 self._violated_rules = violated_rules or {}