Edit on GitHub

sqlmesh.core.renderer

  1from __future__ import annotations
  2
  3import logging
  4import typing as t
  5from contextlib import contextmanager
  6from functools import partial
  7from pathlib import Path
  8
  9from sqlglot import exp, Dialect
 10from sqlglot.errors import SqlglotError
 11from sqlglot.helper import ensure_list
 12from sqlglot.optimizer.annotate_types import annotate_types
 13from sqlglot.optimizer.qualify import qualify
 14from sqlglot.optimizer.simplify import simplify
 15
 16from sqlmesh.core import constants as c
 17from sqlmesh.core import dialect as d
 18from sqlmesh.core.macros import MacroEvaluator, RuntimeStage
 19from sqlmesh.utils.date import (
 20    TimeLike,
 21    date_dict,
 22    make_inclusive,
 23    to_datetime,
 24    make_ts_exclusive,
 25    to_tstz,
 26)
 27from sqlmesh.utils.errors import (
 28    ConfigError,
 29    ParsetimeAdapterCallError,
 30    SQLMeshError,
 31    raise_config_error,
 32)
 33from sqlmesh.utils.jinja import JinjaMacroRegistry, extract_error_details
 34from sqlmesh.utils.metaprogramming import Executable, prepare_env
 35
 36if t.TYPE_CHECKING:
 37    from sqlglot._typing import E
 38    from sqlglot.dialects.dialect import DialectType
 39
 40    from sqlmesh.core.linter.rule import Rule
 41    from sqlmesh.core.model.definition import _Model
 42    from sqlmesh.core.snapshot import DeployabilityIndex, Snapshot
 43
 44
 45logger = logging.getLogger(__name__)
 46
 47
 48class BaseExpressionRenderer:
 49    def __init__(
 50        self,
 51        expression: exp.Expr,
 52        dialect: DialectType,
 53        macro_definitions: t.List[d.MacroDef],
 54        path: t.Optional[Path] = None,
 55        jinja_macro_registry: t.Optional[JinjaMacroRegistry] = None,
 56        python_env: t.Optional[t.Dict[str, Executable]] = None,
 57        only_execution_time: bool = False,
 58        schema: t.Optional[t.Dict[str, t.Any]] = None,
 59        default_catalog: t.Optional[str] = None,
 60        quote_identifiers: bool = True,
 61        normalize_identifiers: bool = True,
 62        optimize_query: t.Optional[bool] = True,
 63        model: t.Optional[_Model] = None,
 64    ):
 65        self._expression = expression
 66        self._dialect = dialect
 67        self._macro_definitions = macro_definitions
 68        self._path = path
 69        self._jinja_macro_registry = jinja_macro_registry or JinjaMacroRegistry()
 70        self._python_env = python_env or {}
 71        self._only_execution_time = only_execution_time
 72        self._default_catalog = default_catalog
 73        self._normalize_identifiers = normalize_identifiers
 74        self._quote_identifiers = quote_identifiers
 75        self.update_schema({} if schema is None else schema)
 76        self._cache: t.List[t.Optional[exp.Expr]] = []
 77        self._model_fqn = model.fqn if model else None
 78        self._optimize_query_flag = optimize_query is not False
 79        self._model = model
 80
 81    def update_schema(self, schema: t.Dict[str, t.Any]) -> None:
 82        self.schema = d.normalize_mapping_schema(schema, dialect=self._dialect)
 83
 84    def _render(
 85        self,
 86        start: t.Optional[TimeLike] = None,
 87        end: t.Optional[TimeLike] = None,
 88        execution_time: t.Optional[TimeLike] = None,
 89        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
 90        table_mapping: t.Optional[t.Dict[str, str]] = None,
 91        deployability_index: t.Optional[DeployabilityIndex] = None,
 92        runtime_stage: RuntimeStage = RuntimeStage.LOADING,
 93        **kwargs: t.Any,
 94    ) -> t.List[t.Optional[exp.Expr]]:
 95        """Renders a expression, expanding macros with provided kwargs
 96
 97        Args:
 98            start: The start datetime to render. Defaults to epoch start.
 99            end: The end datetime to render. Defaults to epoch start.
100            execution_time: The date/time time reference to use for execution time.
101            snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations.
102            table_mapping: Table mapping of physical locations. Takes precedence over snapshot mappings.
103            deployability_index: Determines snapshots that are deployable in the context of this evaluation.
104            runtime_stage: Indicates the current runtime stage, for example if we're still loading the project, etc.
105            kwargs: Additional kwargs to pass to the renderer.
106
107        Returns:
108            The rendered expressions.
109        """
110
111        should_cache = self._should_cache(
112            runtime_stage, start, end, execution_time, *kwargs.values()
113        )
114
115        if should_cache and self._cache:
116            return self._cache
117
118        environment_naming_info = kwargs.get("environment_naming_info")
119        if environment_naming_info is not None:
120            kwargs["this_env"] = getattr(environment_naming_info, "name")
121            if snapshots:
122                schemas, views = set(), []
123                for snapshot in snapshots.values():
124                    if snapshot.is_model and not snapshot.is_symbolic:
125                        schemas.add(
126                            snapshot.qualified_view_name.schema_for_environment(
127                                environment_naming_info, dialect=self._dialect
128                            )
129                        )
130                        views.append(
131                            snapshot.display_name(
132                                environment_naming_info, self._default_catalog, self._dialect
133                            )
134                        )
135                if schemas:
136                    kwargs["schemas"] = list(schemas)
137                if views:
138                    kwargs["views"] = views
139
140        this_model = kwargs.pop("this_model", None)
141
142        this_snapshot = (snapshots or {}).get(self._model_fqn) if self._model_fqn else None
143        if not this_model and self._model_fqn:
144            this_model = self._resolve_table(
145                self._model_fqn,
146                snapshots={self._model_fqn: this_snapshot} if this_snapshot else None,
147                deployability_index=deployability_index,
148                table_mapping=table_mapping,
149            )
150        if this_snapshot and (kind := this_snapshot.model_kind_name):
151            kwargs["model_kind_name"] = kind.name
152
153        def _resolve_table(table: str | exp.Table) -> str:
154            return self._resolve_table(
155                d.normalize_model_name(table, self._default_catalog, self._dialect),
156                snapshots=snapshots,
157                table_mapping=table_mapping,
158                deployability_index=deployability_index,
159            ).sql(dialect=self._dialect, identify=True, comments=False)
160
161        macro_evaluator = MacroEvaluator(
162            self._dialect,
163            python_env=self._python_env,
164            schema=self.schema,
165            runtime_stage=runtime_stage,
166            resolve_table=_resolve_table,
167            resolve_tables=lambda e: self._resolve_tables(
168                e,
169                snapshots=snapshots,
170                table_mapping=table_mapping,
171                deployability_index=deployability_index,
172                start=start,
173                end=end,
174                execution_time=execution_time,
175                runtime_stage=runtime_stage,
176            ),
177            snapshots=snapshots,
178            default_catalog=self._default_catalog,
179            path=self._path,
180            environment_naming_info=environment_naming_info,
181            model_fqn=self._model_fqn,
182        )
183
184        start_time, end_time = (
185            make_inclusive(start or c.EPOCH, end or c.EPOCH, self._dialect)
186            if not self._only_execution_time
187            else (None, None)
188        )
189
190        render_kwargs = {
191            **date_dict(
192                to_datetime(execution_time or c.EPOCH),
193                start_time,
194                end_time,
195            ),
196            **kwargs,
197        }
198
199        if this_model:
200            render_kwargs["this_model"] = this_model
201
202        macro_evaluator.locals.update(render_kwargs)
203
204        variables = kwargs.pop("variables", {})
205        if variables:
206            macro_evaluator.locals.setdefault(c.SQLMESH_VARS, {}).update(variables)
207
208        expressions: t.List[exp.Expr] = [self._expression]
209        if isinstance(self._expression, d.Jinja):
210            try:
211                jinja_env_kwargs = {
212                    **{
213                        **render_kwargs,
214                        **_prepare_python_env_for_jinja(macro_evaluator, self._python_env),
215                        **variables,
216                    },
217                    "snapshots": snapshots or {},
218                    "table_mapping": table_mapping,
219                    "deployability_index": deployability_index,
220                    "default_catalog": self._default_catalog,
221                    "runtime_stage": runtime_stage.value,
222                    "resolve_table": _resolve_table,
223                    "model_instance": self._model,
224                }
225
226                if this_model:
227                    jinja_env_kwargs["this_model"] = this_model.sql(
228                        dialect=self._dialect, identify=True, comments=False
229                    )
230
231                if self._model and self._model.kind.is_incremental_by_time_range:
232                    all_refs = list(
233                        self._jinja_macro_registry.global_objs.get("sources", {}).values()  # type: ignore
234                    ) + list(
235                        self._jinja_macro_registry.global_objs.get("refs", {}).values()  # type: ignore
236                    )
237                    for ref in all_refs:
238                        if ref.event_time_filter:
239                            ref.event_time_filter["start"] = render_kwargs["start_tstz"]
240                            ref.event_time_filter["end"] = to_tstz(
241                                make_ts_exclusive(render_kwargs["end_tstz"], dialect=self._dialect)
242                            )
243
244                jinja_env = self._jinja_macro_registry.build_environment(**jinja_env_kwargs)
245
246                expressions = []
247                rendered_expression = jinja_env.from_string(self._expression.name).render()
248                logger.debug(
249                    f"Rendered Jinja expression for model '{self._model_fqn}' at '{self._path}': '{rendered_expression}'"
250                )
251            except ParsetimeAdapterCallError:
252                raise
253            except Exception as ex:
254                raise ConfigError(
255                    f"Could not render jinja for '{self._path}'.\n" + extract_error_details(ex)
256                ) from ex
257
258            if rendered_expression.strip():
259                # ensure there is actual SQL and not just comments and non-SQL jinja
260                dialect = Dialect.get_or_raise(self._dialect)
261                tokens = dialect.tokenize(rendered_expression)
262
263                if tokens:
264                    try:
265                        expressions = [
266                            e for e in dialect.parser().parse(tokens, rendered_expression) if e
267                        ]
268
269                        if not expressions:
270                            raise ConfigError(
271                                f"Failed to parse an expression:\n{rendered_expression}"
272                            )
273                    except Exception as ex:
274                        raise ConfigError(
275                            f"Could not parse the rendered jinja at '{self._path}'.\n{ex}"
276                        ) from ex
277
278        for definition in self._macro_definitions:
279            try:
280                macro_evaluator.evaluate(definition)
281            except Exception as ex:
282                raise_config_error(
283                    f"Failed to evaluate macro '{definition}'.\n\n{ex}\n", self._path
284                )
285
286        resolved_expressions: t.List[t.Optional[exp.Expr]] = []
287
288        for expression in expressions:
289            try:
290                transformed_expressions = ensure_list(macro_evaluator.transform(expression))
291            except Exception as ex:
292                raise_config_error(
293                    f"Failed to resolve macros for\n\n{expression.sql(dialect=self._dialect, pretty=True)}\n\n{ex}\n",
294                    self._path,
295                )
296
297            for expression in t.cast(t.List[exp.Expr], transformed_expressions):
298                with self._normalize_and_quote(expression) as expression:
299                    if hasattr(expression, "selects"):
300                        for select in expression.selects:
301                            if not isinstance(select, exp.Alias) and select.output_name not in (
302                                "*",
303                                "",
304                            ):
305                                alias = exp.alias_(
306                                    select, select.output_name, quoted=self._quote_identifiers
307                                )
308                                comments = alias.this.comments
309                                if comments:
310                                    alias.add_comments(comments)
311                                    comments.clear()
312
313                                select.replace(alias)
314                resolved_expressions.append(expression)
315
316        # We dont cache here if columns_to_type was called in a macro.
317        # This allows the model's query to be re-rendered so that the
318        # MacroEvaluator can resolve columns_to_types calls and provide true schemas.
319        if should_cache and (not self.schema.empty or not macro_evaluator.columns_to_types_called):
320            self._cache = resolved_expressions
321        return resolved_expressions
322
323    def update_cache(self, expression: t.Optional[exp.Expr]) -> None:
324        self._cache = [expression]
325
326    def _resolve_table(
327        self,
328        table_name: str | exp.Expr,
329        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
330        table_mapping: t.Optional[t.Dict[str, str]] = None,
331        deployability_index: t.Optional[DeployabilityIndex] = None,
332    ) -> exp.Table:
333        table = exp.replace_tables(
334            exp.maybe_parse(table_name, into=exp.Table, dialect=self._dialect),
335            {
336                **self._to_table_mapping((snapshots or {}).values(), deployability_index),
337                **(table_mapping or {}),
338            },
339            dialect=self._dialect,
340            copy=False,
341        )
342        # We quote the table here to mimic the behavior of _resolve_tables, otherwise we may end
343        # up normalizing twice, because _to_table_mapping returns the mapped names unquoted.
344        return (
345            d.quote_identifiers(table, dialect=self._dialect) if self._quote_identifiers else table
346        )
347
348    def _resolve_tables(
349        self,
350        expression: E,
351        *,
352        start: t.Optional[TimeLike] = None,
353        end: t.Optional[TimeLike] = None,
354        execution_time: t.Optional[TimeLike] = None,
355        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
356        table_mapping: t.Optional[t.Dict[str, str]] = None,
357        expand: t.Iterable[str] = tuple(),
358        deployability_index: t.Optional[DeployabilityIndex] = None,
359        **kwargs: t.Any,
360    ) -> E:
361        if not snapshots and not table_mapping and not expand:
362            return expression
363
364        expression = expression.copy()
365        with self._normalize_and_quote(expression) as expression:
366            snapshots = snapshots or {}
367            table_mapping = table_mapping or {}
368            mapping = {
369                **self._to_table_mapping(snapshots.values(), deployability_index),
370                **table_mapping,
371            }
372            expand = set(expand) | {
373                name for name, snapshot in snapshots.items() if snapshot.is_embedded
374            }
375
376            if expand:
377                model_mapping = {
378                    name: snapshot.model
379                    for name, snapshot in snapshots.items()
380                    if snapshot.is_model
381                }
382
383                def _expand(node: exp.Expr) -> exp.Expr:
384                    if isinstance(node, exp.Table) and snapshots:
385                        name = exp.table_name(node, identify=True)
386                        model = model_mapping.get(name)
387                        if (
388                            name in expand
389                            and model
390                            and not model.is_seed
391                            and not model.kind.is_external
392                        ):
393                            nested_query = model.render_query(
394                                start=start,
395                                end=end,
396                                execution_time=execution_time,
397                                snapshots=snapshots,
398                                table_mapping=table_mapping,
399                                expand=expand,
400                                deployability_index=deployability_index,
401                                **kwargs,
402                            )
403                            if nested_query is not None:
404                                return nested_query.subquery(
405                                    alias=node.alias or model.view_name,
406                                    copy=False,
407                                )
408                            logger.warning("Failed to expand the nested model '%s'", name)
409                    return node
410
411                expression = expression.transform(_expand, copy=False)  # type: ignore
412
413            if mapping:
414                expression = exp.replace_tables(
415                    expression, mapping, dialect=self._dialect, copy=False
416                )
417
418            return expression
419
420    @contextmanager
421    def _normalize_and_quote(self, query: E) -> t.Iterator[E]:
422        if self._normalize_identifiers:
423            with d.normalize_and_quote(
424                query, self._dialect, self._default_catalog, quote=self._quote_identifiers
425            ) as query:
426                yield query
427        else:
428            yield query
429
430    def _should_cache(self, runtime_stage: RuntimeStage, *args: t.Any) -> bool:
431        return runtime_stage == RuntimeStage.LOADING and not any(args)
432
433    def _to_table_mapping(
434        self, snapshots: t.Iterable[Snapshot], deployability_index: t.Optional[DeployabilityIndex]
435    ) -> t.Dict[str, str]:
436        from sqlmesh.core.snapshot import to_table_mapping
437
438        return to_table_mapping(snapshots, deployability_index)
439
440
441class ExpressionRenderer(BaseExpressionRenderer):
442    def render(
443        self,
444        start: t.Optional[TimeLike] = None,
445        end: t.Optional[TimeLike] = None,
446        execution_time: t.Optional[TimeLike] = None,
447        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
448        table_mapping: t.Optional[t.Dict[str, str]] = None,
449        deployability_index: t.Optional[DeployabilityIndex] = None,
450        expand: t.Iterable[str] = tuple(),
451        **kwargs: t.Any,
452    ) -> t.Optional[t.List[exp.Expr]]:
453        try:
454            expressions = super()._render(
455                start=start,
456                end=end,
457                execution_time=execution_time,
458                snapshots=snapshots,
459                deployability_index=deployability_index,
460                table_mapping=table_mapping,
461                **kwargs,
462            )
463        except ParsetimeAdapterCallError:
464            return None
465
466        return [
467            self._resolve_tables(
468                e,
469                snapshots=snapshots,
470                table_mapping=table_mapping,
471                expand=expand,
472                deployability_index=deployability_index,
473                start=start,
474                end=end,
475                execution_time=execution_time,
476                **kwargs,
477            )
478            for e in expressions
479            if e and not isinstance(e, exp.Semicolon)
480        ]
481
482
483def render_statements(
484    statements: t.List[str],
485    dialect: str,
486    default_catalog: t.Optional[str] = None,
487    python_env: t.Optional[t.Dict[str, Executable]] = None,
488    jinja_macros: t.Optional[JinjaMacroRegistry] = None,
489    **render_kwargs: t.Any,
490) -> t.List[str]:
491    rendered_statements: t.List[str] = []
492    for statement in statements:
493        for expression in d.parse(statement, default_dialect=dialect):
494            if expression:
495                rendered = ExpressionRenderer(
496                    expression,
497                    dialect,
498                    [],
499                    jinja_macro_registry=jinja_macros,
500                    python_env=python_env,
501                    default_catalog=default_catalog,
502                    quote_identifiers=False,
503                    normalize_identifiers=False,
504                ).render(**render_kwargs)
505
506                if not rendered:
507                    # Warning instead of raising for cases where a statement is conditionally executed
508                    logger.warning(
509                        f"Rendering `{expression.sql(dialect=dialect)}` did not return an expression"
510                    )
511                else:
512                    rendered_statements.extend(expr.sql(dialect=dialect) for expr in rendered)
513
514    return rendered_statements
515
516
517class QueryRenderer(BaseExpressionRenderer):
518    def __init__(self, *args: t.Any, **kwargs: t.Any):
519        super().__init__(*args, **kwargs)
520        self._optimized_cache: t.Optional[exp.Query] = None
521        self._violated_rules: t.Dict[type[Rule], t.Any] = {}
522
523    def update_schema(self, schema: t.Dict[str, t.Any]) -> None:
524        super().update_schema(schema)
525        self._optimized_cache = None
526
527    def render(
528        self,
529        start: t.Optional[TimeLike] = None,
530        end: t.Optional[TimeLike] = None,
531        execution_time: t.Optional[TimeLike] = None,
532        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
533        table_mapping: t.Optional[t.Dict[str, str]] = None,
534        deployability_index: t.Optional[DeployabilityIndex] = None,
535        expand: t.Iterable[str] = tuple(),
536        needs_optimization: bool = True,
537        runtime_stage: RuntimeStage = RuntimeStage.LOADING,
538        **kwargs: t.Any,
539    ) -> t.Optional[exp.Query]:
540        """Renders a query, expanding macros with provided kwargs, and optionally expanding referenced models.
541
542        Args:
543            query: The query to render.
544            start: The start datetime to render. Defaults to epoch start.
545            end: The end datetime to render. Defaults to epoch start.
546            execution_time: The date/time time reference to use for execution time. Defaults to epoch start.
547            snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations.
548            table_mapping: Table mapping of physical locations. Takes precedence over snapshot mappings.
549            deployability_index: Determines snapshots that are deployable in the context of this evaluation.
550            expand: Expand referenced models as subqueries. This is used to bypass backfills when running queries
551                that depend on materialized tables.  Model definitions are inlined and can thus be run end to
552                end on the fly.
553            needs_optimization: Whether or not an optimization should be attempted
554                (if passing False, it still may return a cached optimized query).
555            runtime_stage: Indicates the current runtime stage, for example if we're still loading the project, etc.
556            kwargs: Additional kwargs to pass to the renderer.
557
558        Returns:
559            The rendered expression.
560        """
561
562        should_cache = self._should_cache(
563            runtime_stage, start, end, execution_time, *kwargs.values()
564        )
565
566        if should_cache and self._optimized_cache:
567            query = self._optimized_cache
568        else:
569            try:
570                expressions = super()._render(
571                    start=start,
572                    end=end,
573                    execution_time=execution_time,
574                    snapshots=snapshots,
575                    table_mapping=table_mapping,
576                    deployability_index=deployability_index,
577                    runtime_stage=runtime_stage,
578                    **kwargs,
579                )
580            except ParsetimeAdapterCallError:
581                return None
582
583            expressions = [e for e in expressions if not isinstance(e, exp.Semicolon)]
584
585            if not expressions:
586                # We assume that if there are no expressions, then the model contains dynamic Jinja SQL
587                # and we thus treat it similar to models with adapter calls to match dbt's behavior.
588                if isinstance(self._expression, d.JinjaQuery):
589                    return None
590
591                raise ConfigError(f"Failed to render query at '{self._path}':\n{self._expression}")
592
593            if len(expressions) > 1:
594                raise ConfigError(f"Too many statements in query:\n{self._expression}")
595
596            query = expressions[0]  # type: ignore
597
598            if not query:
599                return None
600            if not isinstance(query, exp.Query):
601                raise_config_error(
602                    f"Model query needs to be a SELECT or a UNION, got {query}.", self._path
603                )
604                raise
605
606            if needs_optimization and self._optimize_query_flag:
607                deps = d.find_tables(
608                    query, default_catalog=self._default_catalog, dialect=self._dialect
609                )
610
611                query = self._optimize_query(query, deps)
612
613                if should_cache:
614                    self._optimized_cache = query
615
616        if needs_optimization:
617            query = self._resolve_tables(
618                query,
619                snapshots=snapshots,
620                table_mapping=table_mapping,
621                expand=expand,
622                deployability_index=deployability_index,
623                start=start,
624                end=end,
625                execution_time=execution_time,
626                runtime_stage=runtime_stage,
627                **kwargs,
628            )
629
630        return query
631
632    def update_cache(
633        self,
634        expression: t.Optional[exp.Expr],
635        violated_rules: t.Optional[t.Dict[type[Rule], t.Any]] = None,
636        optimized: bool = False,
637    ) -> None:
638        if optimized:
639            if not isinstance(expression, exp.Query):
640                raise SQLMeshError(f"Expected a Query but got: {expression}")
641            self._optimized_cache = expression
642        else:
643            super().update_cache(expression)
644
645        self._violated_rules = violated_rules or {}
646
647    def _optimize_query(self, query: exp.Query, all_deps: t.Set[str]) -> exp.Query:
648        from sqlmesh.core.linter.rules.builtin import (
649            AmbiguousOrInvalidColumn,
650            InvalidSelectStarExpansion,
651        )
652
653        # We don't want to normalize names in the schema because that's handled by the optimizer
654        original = query
655        missing_deps = set()
656        all_deps = all_deps - {self._model_fqn}
657        should_optimize = not self.schema.empty or not all_deps
658
659        for dep in all_deps:
660            if not self.schema.find(exp.to_table(dep)):
661                should_optimize = False
662                missing_deps.add(dep)
663
664        if self._model_fqn and not should_optimize and any(s.is_star for s in query.selects):
665            deps = ", ".join(f"'{dep}'" for dep in sorted(missing_deps))
666            self._violated_rules[InvalidSelectStarExpansion] = deps
667
668        try:
669            if should_optimize:
670                query = query.copy()
671                simplify(
672                    annotate_types(
673                        qualify(
674                            query,
675                            dialect=self._dialect,
676                            schema=self.schema,
677                            infer_schema=False,
678                            catalog=self._default_catalog,
679                            quote_identifiers=self._quote_identifiers,
680                        ),
681                        schema=self.schema,
682                        dialect=self._dialect,
683                    ),
684                    dialect=self._dialect,
685                )
686        except SqlglotError as ex:
687            self._violated_rules[AmbiguousOrInvalidColumn] = ex
688
689            query = original
690
691        except Exception as ex:
692            raise_config_error(
693                f"Failed to optimize query, please file an issue at https://github.com/SQLMesh/sqlmesh/issues/new. {ex}",
694                self._path,
695            )
696
697        if not query.type:
698            for select in query.expressions:
699                annotate_types(select)
700
701        return query
702
703
704def _prepare_python_env_for_jinja(
705    evaluator: MacroEvaluator,
706    python_env: t.Dict[str, Executable],
707) -> t.Dict[str, t.Any]:
708    prepared_env = prepare_env(python_env)
709    # Pass the evaluator to all macro functions
710    return {
711        key: partial(value, evaluator) if callable(value) else value
712        for key, value in prepared_env.items()
713    }
logger = <Logger sqlmesh.core.renderer (WARNING)>
class BaseExpressionRenderer:
 49class BaseExpressionRenderer:
 50    def __init__(
 51        self,
 52        expression: exp.Expr,
 53        dialect: DialectType,
 54        macro_definitions: t.List[d.MacroDef],
 55        path: t.Optional[Path] = None,
 56        jinja_macro_registry: t.Optional[JinjaMacroRegistry] = None,
 57        python_env: t.Optional[t.Dict[str, Executable]] = None,
 58        only_execution_time: bool = False,
 59        schema: t.Optional[t.Dict[str, t.Any]] = None,
 60        default_catalog: t.Optional[str] = None,
 61        quote_identifiers: bool = True,
 62        normalize_identifiers: bool = True,
 63        optimize_query: t.Optional[bool] = True,
 64        model: t.Optional[_Model] = None,
 65    ):
 66        self._expression = expression
 67        self._dialect = dialect
 68        self._macro_definitions = macro_definitions
 69        self._path = path
 70        self._jinja_macro_registry = jinja_macro_registry or JinjaMacroRegistry()
 71        self._python_env = python_env or {}
 72        self._only_execution_time = only_execution_time
 73        self._default_catalog = default_catalog
 74        self._normalize_identifiers = normalize_identifiers
 75        self._quote_identifiers = quote_identifiers
 76        self.update_schema({} if schema is None else schema)
 77        self._cache: t.List[t.Optional[exp.Expr]] = []
 78        self._model_fqn = model.fqn if model else None
 79        self._optimize_query_flag = optimize_query is not False
 80        self._model = model
 81
 82    def update_schema(self, schema: t.Dict[str, t.Any]) -> None:
 83        self.schema = d.normalize_mapping_schema(schema, dialect=self._dialect)
 84
 85    def _render(
 86        self,
 87        start: t.Optional[TimeLike] = None,
 88        end: t.Optional[TimeLike] = None,
 89        execution_time: t.Optional[TimeLike] = None,
 90        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
 91        table_mapping: t.Optional[t.Dict[str, str]] = None,
 92        deployability_index: t.Optional[DeployabilityIndex] = None,
 93        runtime_stage: RuntimeStage = RuntimeStage.LOADING,
 94        **kwargs: t.Any,
 95    ) -> t.List[t.Optional[exp.Expr]]:
 96        """Renders a expression, expanding macros with provided kwargs
 97
 98        Args:
 99            start: The start datetime to render. Defaults to epoch start.
100            end: The end datetime to render. Defaults to epoch start.
101            execution_time: The date/time time reference to use for execution time.
102            snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations.
103            table_mapping: Table mapping of physical locations. Takes precedence over snapshot mappings.
104            deployability_index: Determines snapshots that are deployable in the context of this evaluation.
105            runtime_stage: Indicates the current runtime stage, for example if we're still loading the project, etc.
106            kwargs: Additional kwargs to pass to the renderer.
107
108        Returns:
109            The rendered expressions.
110        """
111
112        should_cache = self._should_cache(
113            runtime_stage, start, end, execution_time, *kwargs.values()
114        )
115
116        if should_cache and self._cache:
117            return self._cache
118
119        environment_naming_info = kwargs.get("environment_naming_info")
120        if environment_naming_info is not None:
121            kwargs["this_env"] = getattr(environment_naming_info, "name")
122            if snapshots:
123                schemas, views = set(), []
124                for snapshot in snapshots.values():
125                    if snapshot.is_model and not snapshot.is_symbolic:
126                        schemas.add(
127                            snapshot.qualified_view_name.schema_for_environment(
128                                environment_naming_info, dialect=self._dialect
129                            )
130                        )
131                        views.append(
132                            snapshot.display_name(
133                                environment_naming_info, self._default_catalog, self._dialect
134                            )
135                        )
136                if schemas:
137                    kwargs["schemas"] = list(schemas)
138                if views:
139                    kwargs["views"] = views
140
141        this_model = kwargs.pop("this_model", None)
142
143        this_snapshot = (snapshots or {}).get(self._model_fqn) if self._model_fqn else None
144        if not this_model and self._model_fqn:
145            this_model = self._resolve_table(
146                self._model_fqn,
147                snapshots={self._model_fqn: this_snapshot} if this_snapshot else None,
148                deployability_index=deployability_index,
149                table_mapping=table_mapping,
150            )
151        if this_snapshot and (kind := this_snapshot.model_kind_name):
152            kwargs["model_kind_name"] = kind.name
153
154        def _resolve_table(table: str | exp.Table) -> str:
155            return self._resolve_table(
156                d.normalize_model_name(table, self._default_catalog, self._dialect),
157                snapshots=snapshots,
158                table_mapping=table_mapping,
159                deployability_index=deployability_index,
160            ).sql(dialect=self._dialect, identify=True, comments=False)
161
162        macro_evaluator = MacroEvaluator(
163            self._dialect,
164            python_env=self._python_env,
165            schema=self.schema,
166            runtime_stage=runtime_stage,
167            resolve_table=_resolve_table,
168            resolve_tables=lambda e: self._resolve_tables(
169                e,
170                snapshots=snapshots,
171                table_mapping=table_mapping,
172                deployability_index=deployability_index,
173                start=start,
174                end=end,
175                execution_time=execution_time,
176                runtime_stage=runtime_stage,
177            ),
178            snapshots=snapshots,
179            default_catalog=self._default_catalog,
180            path=self._path,
181            environment_naming_info=environment_naming_info,
182            model_fqn=self._model_fqn,
183        )
184
185        start_time, end_time = (
186            make_inclusive(start or c.EPOCH, end or c.EPOCH, self._dialect)
187            if not self._only_execution_time
188            else (None, None)
189        )
190
191        render_kwargs = {
192            **date_dict(
193                to_datetime(execution_time or c.EPOCH),
194                start_time,
195                end_time,
196            ),
197            **kwargs,
198        }
199
200        if this_model:
201            render_kwargs["this_model"] = this_model
202
203        macro_evaluator.locals.update(render_kwargs)
204
205        variables = kwargs.pop("variables", {})
206        if variables:
207            macro_evaluator.locals.setdefault(c.SQLMESH_VARS, {}).update(variables)
208
209        expressions: t.List[exp.Expr] = [self._expression]
210        if isinstance(self._expression, d.Jinja):
211            try:
212                jinja_env_kwargs = {
213                    **{
214                        **render_kwargs,
215                        **_prepare_python_env_for_jinja(macro_evaluator, self._python_env),
216                        **variables,
217                    },
218                    "snapshots": snapshots or {},
219                    "table_mapping": table_mapping,
220                    "deployability_index": deployability_index,
221                    "default_catalog": self._default_catalog,
222                    "runtime_stage": runtime_stage.value,
223                    "resolve_table": _resolve_table,
224                    "model_instance": self._model,
225                }
226
227                if this_model:
228                    jinja_env_kwargs["this_model"] = this_model.sql(
229                        dialect=self._dialect, identify=True, comments=False
230                    )
231
232                if self._model and self._model.kind.is_incremental_by_time_range:
233                    all_refs = list(
234                        self._jinja_macro_registry.global_objs.get("sources", {}).values()  # type: ignore
235                    ) + list(
236                        self._jinja_macro_registry.global_objs.get("refs", {}).values()  # type: ignore
237                    )
238                    for ref in all_refs:
239                        if ref.event_time_filter:
240                            ref.event_time_filter["start"] = render_kwargs["start_tstz"]
241                            ref.event_time_filter["end"] = to_tstz(
242                                make_ts_exclusive(render_kwargs["end_tstz"], dialect=self._dialect)
243                            )
244
245                jinja_env = self._jinja_macro_registry.build_environment(**jinja_env_kwargs)
246
247                expressions = []
248                rendered_expression = jinja_env.from_string(self._expression.name).render()
249                logger.debug(
250                    f"Rendered Jinja expression for model '{self._model_fqn}' at '{self._path}': '{rendered_expression}'"
251                )
252            except ParsetimeAdapterCallError:
253                raise
254            except Exception as ex:
255                raise ConfigError(
256                    f"Could not render jinja for '{self._path}'.\n" + extract_error_details(ex)
257                ) from ex
258
259            if rendered_expression.strip():
260                # ensure there is actual SQL and not just comments and non-SQL jinja
261                dialect = Dialect.get_or_raise(self._dialect)
262                tokens = dialect.tokenize(rendered_expression)
263
264                if tokens:
265                    try:
266                        expressions = [
267                            e for e in dialect.parser().parse(tokens, rendered_expression) if e
268                        ]
269
270                        if not expressions:
271                            raise ConfigError(
272                                f"Failed to parse an expression:\n{rendered_expression}"
273                            )
274                    except Exception as ex:
275                        raise ConfigError(
276                            f"Could not parse the rendered jinja at '{self._path}'.\n{ex}"
277                        ) from ex
278
279        for definition in self._macro_definitions:
280            try:
281                macro_evaluator.evaluate(definition)
282            except Exception as ex:
283                raise_config_error(
284                    f"Failed to evaluate macro '{definition}'.\n\n{ex}\n", self._path
285                )
286
287        resolved_expressions: t.List[t.Optional[exp.Expr]] = []
288
289        for expression in expressions:
290            try:
291                transformed_expressions = ensure_list(macro_evaluator.transform(expression))
292            except Exception as ex:
293                raise_config_error(
294                    f"Failed to resolve macros for\n\n{expression.sql(dialect=self._dialect, pretty=True)}\n\n{ex}\n",
295                    self._path,
296                )
297
298            for expression in t.cast(t.List[exp.Expr], transformed_expressions):
299                with self._normalize_and_quote(expression) as expression:
300                    if hasattr(expression, "selects"):
301                        for select in expression.selects:
302                            if not isinstance(select, exp.Alias) and select.output_name not in (
303                                "*",
304                                "",
305                            ):
306                                alias = exp.alias_(
307                                    select, select.output_name, quoted=self._quote_identifiers
308                                )
309                                comments = alias.this.comments
310                                if comments:
311                                    alias.add_comments(comments)
312                                    comments.clear()
313
314                                select.replace(alias)
315                resolved_expressions.append(expression)
316
317        # We dont cache here if columns_to_type was called in a macro.
318        # This allows the model's query to be re-rendered so that the
319        # MacroEvaluator can resolve columns_to_types calls and provide true schemas.
320        if should_cache and (not self.schema.empty or not macro_evaluator.columns_to_types_called):
321            self._cache = resolved_expressions
322        return resolved_expressions
323
324    def update_cache(self, expression: t.Optional[exp.Expr]) -> None:
325        self._cache = [expression]
326
327    def _resolve_table(
328        self,
329        table_name: str | exp.Expr,
330        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
331        table_mapping: t.Optional[t.Dict[str, str]] = None,
332        deployability_index: t.Optional[DeployabilityIndex] = None,
333    ) -> exp.Table:
334        table = exp.replace_tables(
335            exp.maybe_parse(table_name, into=exp.Table, dialect=self._dialect),
336            {
337                **self._to_table_mapping((snapshots or {}).values(), deployability_index),
338                **(table_mapping or {}),
339            },
340            dialect=self._dialect,
341            copy=False,
342        )
343        # We quote the table here to mimic the behavior of _resolve_tables, otherwise we may end
344        # up normalizing twice, because _to_table_mapping returns the mapped names unquoted.
345        return (
346            d.quote_identifiers(table, dialect=self._dialect) if self._quote_identifiers else table
347        )
348
349    def _resolve_tables(
350        self,
351        expression: E,
352        *,
353        start: t.Optional[TimeLike] = None,
354        end: t.Optional[TimeLike] = None,
355        execution_time: t.Optional[TimeLike] = None,
356        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
357        table_mapping: t.Optional[t.Dict[str, str]] = None,
358        expand: t.Iterable[str] = tuple(),
359        deployability_index: t.Optional[DeployabilityIndex] = None,
360        **kwargs: t.Any,
361    ) -> E:
362        if not snapshots and not table_mapping and not expand:
363            return expression
364
365        expression = expression.copy()
366        with self._normalize_and_quote(expression) as expression:
367            snapshots = snapshots or {}
368            table_mapping = table_mapping or {}
369            mapping = {
370                **self._to_table_mapping(snapshots.values(), deployability_index),
371                **table_mapping,
372            }
373            expand = set(expand) | {
374                name for name, snapshot in snapshots.items() if snapshot.is_embedded
375            }
376
377            if expand:
378                model_mapping = {
379                    name: snapshot.model
380                    for name, snapshot in snapshots.items()
381                    if snapshot.is_model
382                }
383
384                def _expand(node: exp.Expr) -> exp.Expr:
385                    if isinstance(node, exp.Table) and snapshots:
386                        name = exp.table_name(node, identify=True)
387                        model = model_mapping.get(name)
388                        if (
389                            name in expand
390                            and model
391                            and not model.is_seed
392                            and not model.kind.is_external
393                        ):
394                            nested_query = model.render_query(
395                                start=start,
396                                end=end,
397                                execution_time=execution_time,
398                                snapshots=snapshots,
399                                table_mapping=table_mapping,
400                                expand=expand,
401                                deployability_index=deployability_index,
402                                **kwargs,
403                            )
404                            if nested_query is not None:
405                                return nested_query.subquery(
406                                    alias=node.alias or model.view_name,
407                                    copy=False,
408                                )
409                            logger.warning("Failed to expand the nested model '%s'", name)
410                    return node
411
412                expression = expression.transform(_expand, copy=False)  # type: ignore
413
414            if mapping:
415                expression = exp.replace_tables(
416                    expression, mapping, dialect=self._dialect, copy=False
417                )
418
419            return expression
420
421    @contextmanager
422    def _normalize_and_quote(self, query: E) -> t.Iterator[E]:
423        if self._normalize_identifiers:
424            with d.normalize_and_quote(
425                query, self._dialect, self._default_catalog, quote=self._quote_identifiers
426            ) as query:
427                yield query
428        else:
429            yield query
430
431    def _should_cache(self, runtime_stage: RuntimeStage, *args: t.Any) -> bool:
432        return runtime_stage == RuntimeStage.LOADING and not any(args)
433
434    def _to_table_mapping(
435        self, snapshots: t.Iterable[Snapshot], deployability_index: t.Optional[DeployabilityIndex]
436    ) -> t.Dict[str, str]:
437        from sqlmesh.core.snapshot import to_table_mapping
438
439        return to_table_mapping(snapshots, deployability_index)
BaseExpressionRenderer( expression: sqlglot.expressions.core.Expr, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType], macro_definitions: List[sqlmesh.core.dialect.MacroDef], path: Optional[pathlib.Path] = None, jinja_macro_registry: Optional[sqlmesh.utils.jinja.JinjaMacroRegistry] = None, python_env: Optional[Dict[str, sqlmesh.utils.metaprogramming.Executable]] = None, only_execution_time: bool = False, schema: Optional[Dict[str, Any]] = None, default_catalog: Optional[str] = None, quote_identifiers: bool = True, normalize_identifiers: bool = True, optimize_query: Optional[bool] = True, model: Optional[sqlmesh.core.model.definition._Model] = None)
50    def __init__(
51        self,
52        expression: exp.Expr,
53        dialect: DialectType,
54        macro_definitions: t.List[d.MacroDef],
55        path: t.Optional[Path] = None,
56        jinja_macro_registry: t.Optional[JinjaMacroRegistry] = None,
57        python_env: t.Optional[t.Dict[str, Executable]] = None,
58        only_execution_time: bool = False,
59        schema: t.Optional[t.Dict[str, t.Any]] = None,
60        default_catalog: t.Optional[str] = None,
61        quote_identifiers: bool = True,
62        normalize_identifiers: bool = True,
63        optimize_query: t.Optional[bool] = True,
64        model: t.Optional[_Model] = None,
65    ):
66        self._expression = expression
67        self._dialect = dialect
68        self._macro_definitions = macro_definitions
69        self._path = path
70        self._jinja_macro_registry = jinja_macro_registry or JinjaMacroRegistry()
71        self._python_env = python_env or {}
72        self._only_execution_time = only_execution_time
73        self._default_catalog = default_catalog
74        self._normalize_identifiers = normalize_identifiers
75        self._quote_identifiers = quote_identifiers
76        self.update_schema({} if schema is None else schema)
77        self._cache: t.List[t.Optional[exp.Expr]] = []
78        self._model_fqn = model.fqn if model else None
79        self._optimize_query_flag = optimize_query is not False
80        self._model = model
def update_schema(self, schema: Dict[str, Any]) -> None:
82    def update_schema(self, schema: t.Dict[str, t.Any]) -> None:
83        self.schema = d.normalize_mapping_schema(schema, dialect=self._dialect)
def update_cache(self, expression: Optional[sqlglot.expressions.core.Expr]) -> None:
324    def update_cache(self, expression: t.Optional[exp.Expr]) -> None:
325        self._cache = [expression]
class ExpressionRenderer(BaseExpressionRenderer):
442class ExpressionRenderer(BaseExpressionRenderer):
443    def render(
444        self,
445        start: t.Optional[TimeLike] = None,
446        end: t.Optional[TimeLike] = None,
447        execution_time: t.Optional[TimeLike] = None,
448        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
449        table_mapping: t.Optional[t.Dict[str, str]] = None,
450        deployability_index: t.Optional[DeployabilityIndex] = None,
451        expand: t.Iterable[str] = tuple(),
452        **kwargs: t.Any,
453    ) -> t.Optional[t.List[exp.Expr]]:
454        try:
455            expressions = super()._render(
456                start=start,
457                end=end,
458                execution_time=execution_time,
459                snapshots=snapshots,
460                deployability_index=deployability_index,
461                table_mapping=table_mapping,
462                **kwargs,
463            )
464        except ParsetimeAdapterCallError:
465            return None
466
467        return [
468            self._resolve_tables(
469                e,
470                snapshots=snapshots,
471                table_mapping=table_mapping,
472                expand=expand,
473                deployability_index=deployability_index,
474                start=start,
475                end=end,
476                execution_time=execution_time,
477                **kwargs,
478            )
479            for e in expressions
480            if e and not isinstance(e, exp.Semicolon)
481        ]
def render( self, start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, snapshots: Optional[Dict[str, sqlmesh.core.snapshot.definition.Snapshot]] = None, table_mapping: Optional[Dict[str, str]] = None, deployability_index: Optional[sqlmesh.core.snapshot.definition.DeployabilityIndex] = None, expand: Iterable[str] = (), **kwargs: Any) -> Optional[List[sqlglot.expressions.core.Expr]]:
443    def render(
444        self,
445        start: t.Optional[TimeLike] = None,
446        end: t.Optional[TimeLike] = None,
447        execution_time: t.Optional[TimeLike] = None,
448        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
449        table_mapping: t.Optional[t.Dict[str, str]] = None,
450        deployability_index: t.Optional[DeployabilityIndex] = None,
451        expand: t.Iterable[str] = tuple(),
452        **kwargs: t.Any,
453    ) -> t.Optional[t.List[exp.Expr]]:
454        try:
455            expressions = super()._render(
456                start=start,
457                end=end,
458                execution_time=execution_time,
459                snapshots=snapshots,
460                deployability_index=deployability_index,
461                table_mapping=table_mapping,
462                **kwargs,
463            )
464        except ParsetimeAdapterCallError:
465            return None
466
467        return [
468            self._resolve_tables(
469                e,
470                snapshots=snapshots,
471                table_mapping=table_mapping,
472                expand=expand,
473                deployability_index=deployability_index,
474                start=start,
475                end=end,
476                execution_time=execution_time,
477                **kwargs,
478            )
479            for e in expressions
480            if e and not isinstance(e, exp.Semicolon)
481        ]
def render_statements( statements: List[str], dialect: str, default_catalog: Optional[str] = None, python_env: Optional[Dict[str, sqlmesh.utils.metaprogramming.Executable]] = None, jinja_macros: Optional[sqlmesh.utils.jinja.JinjaMacroRegistry] = None, **render_kwargs: Any) -> List[str]:
484def render_statements(
485    statements: t.List[str],
486    dialect: str,
487    default_catalog: t.Optional[str] = None,
488    python_env: t.Optional[t.Dict[str, Executable]] = None,
489    jinja_macros: t.Optional[JinjaMacroRegistry] = None,
490    **render_kwargs: t.Any,
491) -> t.List[str]:
492    rendered_statements: t.List[str] = []
493    for statement in statements:
494        for expression in d.parse(statement, default_dialect=dialect):
495            if expression:
496                rendered = ExpressionRenderer(
497                    expression,
498                    dialect,
499                    [],
500                    jinja_macro_registry=jinja_macros,
501                    python_env=python_env,
502                    default_catalog=default_catalog,
503                    quote_identifiers=False,
504                    normalize_identifiers=False,
505                ).render(**render_kwargs)
506
507                if not rendered:
508                    # Warning instead of raising for cases where a statement is conditionally executed
509                    logger.warning(
510                        f"Rendering `{expression.sql(dialect=dialect)}` did not return an expression"
511                    )
512                else:
513                    rendered_statements.extend(expr.sql(dialect=dialect) for expr in rendered)
514
515    return rendered_statements
class QueryRenderer(BaseExpressionRenderer):
518class QueryRenderer(BaseExpressionRenderer):
519    def __init__(self, *args: t.Any, **kwargs: t.Any):
520        super().__init__(*args, **kwargs)
521        self._optimized_cache: t.Optional[exp.Query] = None
522        self._violated_rules: t.Dict[type[Rule], t.Any] = {}
523
524    def update_schema(self, schema: t.Dict[str, t.Any]) -> None:
525        super().update_schema(schema)
526        self._optimized_cache = None
527
528    def render(
529        self,
530        start: t.Optional[TimeLike] = None,
531        end: t.Optional[TimeLike] = None,
532        execution_time: t.Optional[TimeLike] = None,
533        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
534        table_mapping: t.Optional[t.Dict[str, str]] = None,
535        deployability_index: t.Optional[DeployabilityIndex] = None,
536        expand: t.Iterable[str] = tuple(),
537        needs_optimization: bool = True,
538        runtime_stage: RuntimeStage = RuntimeStage.LOADING,
539        **kwargs: t.Any,
540    ) -> t.Optional[exp.Query]:
541        """Renders a query, expanding macros with provided kwargs, and optionally expanding referenced models.
542
543        Args:
544            query: The query to render.
545            start: The start datetime to render. Defaults to epoch start.
546            end: The end datetime to render. Defaults to epoch start.
547            execution_time: The date/time time reference to use for execution time. Defaults to epoch start.
548            snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations.
549            table_mapping: Table mapping of physical locations. Takes precedence over snapshot mappings.
550            deployability_index: Determines snapshots that are deployable in the context of this evaluation.
551            expand: Expand referenced models as subqueries. This is used to bypass backfills when running queries
552                that depend on materialized tables.  Model definitions are inlined and can thus be run end to
553                end on the fly.
554            needs_optimization: Whether or not an optimization should be attempted
555                (if passing False, it still may return a cached optimized query).
556            runtime_stage: Indicates the current runtime stage, for example if we're still loading the project, etc.
557            kwargs: Additional kwargs to pass to the renderer.
558
559        Returns:
560            The rendered expression.
561        """
562
563        should_cache = self._should_cache(
564            runtime_stage, start, end, execution_time, *kwargs.values()
565        )
566
567        if should_cache and self._optimized_cache:
568            query = self._optimized_cache
569        else:
570            try:
571                expressions = super()._render(
572                    start=start,
573                    end=end,
574                    execution_time=execution_time,
575                    snapshots=snapshots,
576                    table_mapping=table_mapping,
577                    deployability_index=deployability_index,
578                    runtime_stage=runtime_stage,
579                    **kwargs,
580                )
581            except ParsetimeAdapterCallError:
582                return None
583
584            expressions = [e for e in expressions if not isinstance(e, exp.Semicolon)]
585
586            if not expressions:
587                # We assume that if there are no expressions, then the model contains dynamic Jinja SQL
588                # and we thus treat it similar to models with adapter calls to match dbt's behavior.
589                if isinstance(self._expression, d.JinjaQuery):
590                    return None
591
592                raise ConfigError(f"Failed to render query at '{self._path}':\n{self._expression}")
593
594            if len(expressions) > 1:
595                raise ConfigError(f"Too many statements in query:\n{self._expression}")
596
597            query = expressions[0]  # type: ignore
598
599            if not query:
600                return None
601            if not isinstance(query, exp.Query):
602                raise_config_error(
603                    f"Model query needs to be a SELECT or a UNION, got {query}.", self._path
604                )
605                raise
606
607            if needs_optimization and self._optimize_query_flag:
608                deps = d.find_tables(
609                    query, default_catalog=self._default_catalog, dialect=self._dialect
610                )
611
612                query = self._optimize_query(query, deps)
613
614                if should_cache:
615                    self._optimized_cache = query
616
617        if needs_optimization:
618            query = self._resolve_tables(
619                query,
620                snapshots=snapshots,
621                table_mapping=table_mapping,
622                expand=expand,
623                deployability_index=deployability_index,
624                start=start,
625                end=end,
626                execution_time=execution_time,
627                runtime_stage=runtime_stage,
628                **kwargs,
629            )
630
631        return query
632
633    def update_cache(
634        self,
635        expression: t.Optional[exp.Expr],
636        violated_rules: t.Optional[t.Dict[type[Rule], t.Any]] = None,
637        optimized: bool = False,
638    ) -> None:
639        if optimized:
640            if not isinstance(expression, exp.Query):
641                raise SQLMeshError(f"Expected a Query but got: {expression}")
642            self._optimized_cache = expression
643        else:
644            super().update_cache(expression)
645
646        self._violated_rules = violated_rules or {}
647
648    def _optimize_query(self, query: exp.Query, all_deps: t.Set[str]) -> exp.Query:
649        from sqlmesh.core.linter.rules.builtin import (
650            AmbiguousOrInvalidColumn,
651            InvalidSelectStarExpansion,
652        )
653
654        # We don't want to normalize names in the schema because that's handled by the optimizer
655        original = query
656        missing_deps = set()
657        all_deps = all_deps - {self._model_fqn}
658        should_optimize = not self.schema.empty or not all_deps
659
660        for dep in all_deps:
661            if not self.schema.find(exp.to_table(dep)):
662                should_optimize = False
663                missing_deps.add(dep)
664
665        if self._model_fqn and not should_optimize and any(s.is_star for s in query.selects):
666            deps = ", ".join(f"'{dep}'" for dep in sorted(missing_deps))
667            self._violated_rules[InvalidSelectStarExpansion] = deps
668
669        try:
670            if should_optimize:
671                query = query.copy()
672                simplify(
673                    annotate_types(
674                        qualify(
675                            query,
676                            dialect=self._dialect,
677                            schema=self.schema,
678                            infer_schema=False,
679                            catalog=self._default_catalog,
680                            quote_identifiers=self._quote_identifiers,
681                        ),
682                        schema=self.schema,
683                        dialect=self._dialect,
684                    ),
685                    dialect=self._dialect,
686                )
687        except SqlglotError as ex:
688            self._violated_rules[AmbiguousOrInvalidColumn] = ex
689
690            query = original
691
692        except Exception as ex:
693            raise_config_error(
694                f"Failed to optimize query, please file an issue at https://github.com/SQLMesh/sqlmesh/issues/new. {ex}",
695                self._path,
696            )
697
698        if not query.type:
699            for select in query.expressions:
700                annotate_types(select)
701
702        return query
QueryRenderer(*args: Any, **kwargs: Any)
519    def __init__(self, *args: t.Any, **kwargs: t.Any):
520        super().__init__(*args, **kwargs)
521        self._optimized_cache: t.Optional[exp.Query] = None
522        self._violated_rules: t.Dict[type[Rule], t.Any] = {}
def update_schema(self, schema: Dict[str, Any]) -> None:
524    def update_schema(self, schema: t.Dict[str, t.Any]) -> None:
525        super().update_schema(schema)
526        self._optimized_cache = None
def render( self, start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, snapshots: Optional[Dict[str, sqlmesh.core.snapshot.definition.Snapshot]] = None, table_mapping: Optional[Dict[str, str]] = None, deployability_index: Optional[sqlmesh.core.snapshot.definition.DeployabilityIndex] = None, expand: Iterable[str] = (), needs_optimization: bool = True, runtime_stage: sqlmesh.core.macros.RuntimeStage = <RuntimeStage.LOADING: 'loading'>, **kwargs: Any) -> Optional[sqlglot.expressions.query.Query]:
528    def render(
529        self,
530        start: t.Optional[TimeLike] = None,
531        end: t.Optional[TimeLike] = None,
532        execution_time: t.Optional[TimeLike] = None,
533        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
534        table_mapping: t.Optional[t.Dict[str, str]] = None,
535        deployability_index: t.Optional[DeployabilityIndex] = None,
536        expand: t.Iterable[str] = tuple(),
537        needs_optimization: bool = True,
538        runtime_stage: RuntimeStage = RuntimeStage.LOADING,
539        **kwargs: t.Any,
540    ) -> t.Optional[exp.Query]:
541        """Renders a query, expanding macros with provided kwargs, and optionally expanding referenced models.
542
543        Args:
544            query: The query to render.
545            start: The start datetime to render. Defaults to epoch start.
546            end: The end datetime to render. Defaults to epoch start.
547            execution_time: The date/time time reference to use for execution time. Defaults to epoch start.
548            snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations.
549            table_mapping: Table mapping of physical locations. Takes precedence over snapshot mappings.
550            deployability_index: Determines snapshots that are deployable in the context of this evaluation.
551            expand: Expand referenced models as subqueries. This is used to bypass backfills when running queries
552                that depend on materialized tables.  Model definitions are inlined and can thus be run end to
553                end on the fly.
554            needs_optimization: Whether or not an optimization should be attempted
555                (if passing False, it still may return a cached optimized query).
556            runtime_stage: Indicates the current runtime stage, for example if we're still loading the project, etc.
557            kwargs: Additional kwargs to pass to the renderer.
558
559        Returns:
560            The rendered expression.
561        """
562
563        should_cache = self._should_cache(
564            runtime_stage, start, end, execution_time, *kwargs.values()
565        )
566
567        if should_cache and self._optimized_cache:
568            query = self._optimized_cache
569        else:
570            try:
571                expressions = super()._render(
572                    start=start,
573                    end=end,
574                    execution_time=execution_time,
575                    snapshots=snapshots,
576                    table_mapping=table_mapping,
577                    deployability_index=deployability_index,
578                    runtime_stage=runtime_stage,
579                    **kwargs,
580                )
581            except ParsetimeAdapterCallError:
582                return None
583
584            expressions = [e for e in expressions if not isinstance(e, exp.Semicolon)]
585
586            if not expressions:
587                # We assume that if there are no expressions, then the model contains dynamic Jinja SQL
588                # and we thus treat it similar to models with adapter calls to match dbt's behavior.
589                if isinstance(self._expression, d.JinjaQuery):
590                    return None
591
592                raise ConfigError(f"Failed to render query at '{self._path}':\n{self._expression}")
593
594            if len(expressions) > 1:
595                raise ConfigError(f"Too many statements in query:\n{self._expression}")
596
597            query = expressions[0]  # type: ignore
598
599            if not query:
600                return None
601            if not isinstance(query, exp.Query):
602                raise_config_error(
603                    f"Model query needs to be a SELECT or a UNION, got {query}.", self._path
604                )
605                raise
606
607            if needs_optimization and self._optimize_query_flag:
608                deps = d.find_tables(
609                    query, default_catalog=self._default_catalog, dialect=self._dialect
610                )
611
612                query = self._optimize_query(query, deps)
613
614                if should_cache:
615                    self._optimized_cache = query
616
617        if needs_optimization:
618            query = self._resolve_tables(
619                query,
620                snapshots=snapshots,
621                table_mapping=table_mapping,
622                expand=expand,
623                deployability_index=deployability_index,
624                start=start,
625                end=end,
626                execution_time=execution_time,
627                runtime_stage=runtime_stage,
628                **kwargs,
629            )
630
631        return query

Renders a query, expanding macros with provided kwargs, and optionally expanding referenced models.

Arguments:
  • query: The query to render.
  • start: The start datetime to render. Defaults to epoch start.
  • end: The end datetime to render. Defaults to epoch start.
  • execution_time: The date/time time reference to use for execution time. Defaults to epoch start.
  • snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations.
  • table_mapping: Table mapping of physical locations. Takes precedence over snapshot mappings.
  • deployability_index: Determines snapshots that are deployable in the context of this evaluation.
  • expand: Expand referenced models as subqueries. This is used to bypass backfills when running queries that depend on materialized tables. Model definitions are inlined and can thus be run end to end on the fly.
  • needs_optimization: Whether or not an optimization should be attempted (if passing False, it still may return a cached optimized query).
  • runtime_stage: Indicates the current runtime stage, for example if we're still loading the project, etc.
  • kwargs: Additional kwargs to pass to the renderer.
Returns:

The rendered expression.

def update_cache( self, expression: Optional[sqlglot.expressions.core.Expr], violated_rules: Optional[Dict[type[sqlmesh.core.linter.rule.Rule], Any]] = None, optimized: bool = False) -> None:
633    def update_cache(
634        self,
635        expression: t.Optional[exp.Expr],
636        violated_rules: t.Optional[t.Dict[type[Rule], t.Any]] = None,
637        optimized: bool = False,
638    ) -> None:
639        if optimized:
640            if not isinstance(expression, exp.Query):
641                raise SQLMeshError(f"Expected a Query but got: {expression}")
642            self._optimized_cache = expression
643        else:
644            super().update_cache(expression)
645
646        self._violated_rules = violated_rules or {}