Edit on GitHub

sqlmesh.core.renderer

  1from __future__ import annotations
  2
  3import logging
  4import typing as t
  5from contextlib import contextmanager
  6from pathlib import Path
  7
  8from sqlglot import exp, parse
  9from sqlglot.errors import SqlglotError
 10from sqlglot.optimizer.annotate_types import annotate_types
 11from sqlglot.optimizer.qualify import qualify
 12from sqlglot.optimizer.simplify import simplify
 13
 14from sqlmesh.core import constants as c
 15from sqlmesh.core import dialect as d
 16from sqlmesh.core.macros import MacroEvaluator, RuntimeStage
 17from sqlmesh.utils.date import TimeLike, date_dict, make_inclusive_end, to_datetime
 18from sqlmesh.utils.errors import (
 19    ConfigError,
 20    MacroEvalError,
 21    ParsetimeAdapterCallError,
 22    SQLMeshError,
 23    raise_config_error,
 24)
 25from sqlmesh.utils.jinja import JinjaMacroRegistry
 26from sqlmesh.utils.metaprogramming import Executable, prepare_env
 27
 28if t.TYPE_CHECKING:
 29    from sqlglot._typing import E
 30
 31    from sqlmesh.core.snapshot import DeployabilityIndex, Snapshot
 32
 33
 34logger = logging.getLogger(__name__)
 35
 36
 37class BaseExpressionRenderer:
 38    def __init__(
 39        self,
 40        expression: exp.Expression,
 41        dialect: str,
 42        macro_definitions: t.List[d.MacroDef],
 43        path: Path = Path(),
 44        jinja_macro_registry: t.Optional[JinjaMacroRegistry] = None,
 45        python_env: t.Optional[t.Dict[str, Executable]] = None,
 46        only_execution_time: bool = False,
 47        schema: t.Optional[t.Dict[str, t.Any]] = None,
 48        default_catalog: t.Optional[str] = None,
 49        quote_identifiers: bool = True,
 50        model_fqn: t.Optional[str] = None,
 51        normalize_identifiers: bool = True,
 52    ):
 53        self._expression = expression
 54        self._dialect = dialect
 55        self._macro_definitions = macro_definitions
 56        self._path = path
 57        self._jinja_macro_registry = jinja_macro_registry or JinjaMacroRegistry()
 58        self._python_env = python_env or {}
 59        self._only_execution_time = only_execution_time
 60        self._default_catalog = default_catalog
 61        self._normalize_identifiers = normalize_identifiers
 62        self._quote_identifiers = quote_identifiers
 63        self.update_schema({} if schema is None else schema)
 64        self._cache: t.List[t.Optional[exp.Expression]] = []
 65        self._model_fqn = model_fqn
 66
 67    def update_schema(self, schema: t.Dict[str, t.Any]) -> None:
 68        self.schema = d.normalize_mapping_schema(schema, dialect=self._dialect)
 69
 70    def _render(
 71        self,
 72        start: t.Optional[TimeLike] = None,
 73        end: t.Optional[TimeLike] = None,
 74        execution_time: t.Optional[TimeLike] = None,
 75        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
 76        table_mapping: t.Optional[t.Dict[str, str]] = None,
 77        deployability_index: t.Optional[DeployabilityIndex] = None,
 78        runtime_stage: RuntimeStage = RuntimeStage.LOADING,
 79        **kwargs: t.Any,
 80    ) -> t.List[t.Optional[exp.Expression]]:
 81        """Renders a expression, expanding macros with provided kwargs
 82
 83        Args:
 84            start: The start datetime to render. Defaults to epoch start.
 85            end: The end datetime to render. Defaults to epoch start.
 86            execution_time: The date/time time reference to use for execution time.
 87            snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations.
 88            table_mapping: Table mapping of physical locations. Takes precedence over snapshot mappings.
 89            deployability_index: Determines snapshots that are deployable in the context of this evaluation.
 90            runtime_stage: Indicates the current runtime stage, for example if we're still loading the project, etc.
 91            kwargs: Additional kwargs to pass to the renderer.
 92
 93        Returns:
 94            The rendered expressions.
 95        """
 96
 97        should_cache = self._should_cache(
 98            runtime_stage, start, end, execution_time, *kwargs.values()
 99        )
100
101        if should_cache and self._cache:
102            return self._cache
103
104        if self._model_fqn and "this_model" not in kwargs:
105            kwargs["this_model"] = exp.to_table(
106                self._to_table_mapping(
107                    (
108                        [snapshots[self._model_fqn]]
109                        if snapshots and self._model_fqn in snapshots
110                        else []
111                    ),
112                    deployability_index,
113                ).get(self._model_fqn, self._model_fqn),
114                dialect=self._dialect,
115            ).sql(dialect=self._dialect, identify=True)
116
117        expressions = [self._expression]
118
119        render_kwargs = {
120            **date_dict(
121                to_datetime(execution_time or c.EPOCH),
122                to_datetime(start or c.EPOCH) if not self._only_execution_time else None,
123                make_inclusive_end(end or c.EPOCH) if not self._only_execution_time else None,
124            ),
125            **kwargs,
126        }
127
128        jinja_env = self._jinja_macro_registry.build_environment(
129            **{**render_kwargs, **prepare_env(self._python_env)},
130            snapshots=(snapshots or {}),
131            table_mapping=table_mapping,
132            deployability_index=deployability_index,
133            default_catalog=self._default_catalog,
134            runtime_stage=runtime_stage.value,
135        )
136
137        if isinstance(self._expression, d.Jinja):
138            try:
139                expressions = []
140                rendered_expression = jinja_env.from_string(self._expression.name).render()
141                if rendered_expression.strip():
142                    expressions = [e for e in parse(rendered_expression, read=self._dialect) if e]
143
144                    if not expressions:
145                        raise ConfigError(f"Failed to parse an expression:\n{self._expression}")
146            except ParsetimeAdapterCallError:
147                raise
148            except Exception as ex:
149                raise ConfigError(
150                    f"Could not render or parse jinja at '{self._path}'.\n{ex}"
151                ) from ex
152
153        macro_evaluator = MacroEvaluator(
154            self._dialect,
155            python_env=self._python_env,
156            jinja_env=jinja_env,
157            schema=self.schema,
158            runtime_stage=runtime_stage,
159            resolve_tables=lambda e: self._resolve_tables(
160                e,
161                snapshots=snapshots,
162                table_mapping=table_mapping,
163                deployability_index=deployability_index,
164                start=start,
165                end=end,
166                execution_time=execution_time,
167                runtime_stage=runtime_stage,
168            ),
169            snapshots=snapshots,
170            default_catalog=self._default_catalog,
171            path=self._path,
172        )
173
174        for definition in self._macro_definitions:
175            try:
176                macro_evaluator.evaluate(definition)
177            except MacroEvalError as ex:
178                raise_config_error(f"Failed to evaluate macro '{definition}'. {ex}", self._path)
179
180        macro_evaluator.locals.update(render_kwargs)
181
182        resolved_expressions: t.List[t.Optional[exp.Expression]] = []
183
184        for expression in expressions:
185            try:
186                expression = macro_evaluator.transform(expression)  # type: ignore
187            except MacroEvalError as ex:
188                raise_config_error(f"Failed to resolve macro for expression. {ex}", self._path)
189
190            if expression:
191                with self._normalize_and_quote(expression) as expression:
192                    if hasattr(expression, "selects"):
193                        for select in expression.selects:
194                            if not isinstance(select, exp.Alias) and select.output_name not in (
195                                "*",
196                                "",
197                            ):
198                                alias = exp.alias_(
199                                    select, select.output_name, quoted=self._quote_identifiers
200                                )
201                                comments = alias.this.comments
202                                if comments:
203                                    alias.add_comments(comments)
204                                    comments.clear()
205
206                                select.replace(alias)
207                resolved_expressions.append(expression)
208
209        # We dont cache here if columns_to_type was called in a macro.
210        # This allows the model's query to be re-rendered so that the
211        # MacroEvaluator can resolve columns_to_types calls and provide true schemas.
212        if should_cache and (not self.schema.empty or not macro_evaluator.columns_to_types_called):
213            self._cache = resolved_expressions
214        return resolved_expressions
215
216    def update_cache(self, expression: t.Optional[exp.Expression]) -> None:
217        self._cache = [expression]
218
219    def _resolve_tables(
220        self,
221        expression: E,
222        *,
223        start: t.Optional[TimeLike] = None,
224        end: t.Optional[TimeLike] = None,
225        execution_time: t.Optional[TimeLike] = None,
226        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
227        table_mapping: t.Optional[t.Dict[str, str]] = None,
228        expand: t.Iterable[str] = tuple(),
229        deployability_index: t.Optional[DeployabilityIndex] = None,
230        **kwargs: t.Any,
231    ) -> E:
232        if not snapshots and not table_mapping and not expand:
233            return expression
234
235        expression = expression.copy()
236        with self._normalize_and_quote(expression) as expression:
237            snapshots = snapshots or {}
238            table_mapping = table_mapping or {}
239            mapping = {
240                **self._to_table_mapping(snapshots.values(), deployability_index),
241                **table_mapping,
242            }
243            expand = set(expand) | {
244                name for name, snapshot in snapshots.items() if snapshot.is_embedded
245            }
246
247            if expand:
248                model_mapping = {
249                    name: snapshot.model
250                    for name, snapshot in snapshots.items()
251                    if snapshot.is_model
252                }
253
254                def _expand(node: exp.Expression) -> exp.Expression:
255                    if isinstance(node, exp.Table) and snapshots:
256                        name = exp.table_name(node, identify=True)
257                        model = model_mapping.get(name)
258                        if (
259                            name in expand
260                            and model
261                            and not model.is_seed
262                            and not model.kind.is_external
263                        ):
264                            nested_query = model.render_query(
265                                start=start,
266                                end=end,
267                                execution_time=execution_time,
268                                snapshots=snapshots,
269                                table_mapping=table_mapping,
270                                expand=expand,
271                                deployability_index=deployability_index,
272                                **kwargs,
273                            )
274                            if nested_query is not None:
275                                return nested_query.subquery(
276                                    alias=node.alias or model.view_name,
277                                    copy=False,
278                                )
279                            else:
280                                logger.warning("Failed to expand the nested model '%s'", name)
281                    return node
282
283                expression = expression.transform(_expand, copy=False)  # type: ignore
284
285            if mapping:
286                expression = exp.replace_tables(
287                    expression, mapping, dialect=self._dialect, copy=False
288                )
289
290            return expression
291
292    @contextmanager
293    def _normalize_and_quote(self, query: E) -> t.Iterator[E]:
294        if self._normalize_identifiers:
295            with d.normalize_and_quote(
296                query, self._dialect, self._default_catalog, quote=self._quote_identifiers
297            ) as query:
298                yield query
299        else:
300            yield query
301
302    def _should_cache(self, runtime_stage: RuntimeStage, *args: t.Any) -> bool:
303        return runtime_stage == RuntimeStage.LOADING and not any(args)
304
305    def _to_table_mapping(
306        self, snapshots: t.Iterable[Snapshot], deployability_index: t.Optional[DeployabilityIndex]
307    ) -> t.Dict[str, str]:
308        from sqlmesh.core.snapshot import to_table_mapping
309
310        return to_table_mapping(snapshots, deployability_index)
311
312
313class ExpressionRenderer(BaseExpressionRenderer):
314    def render(
315        self,
316        start: t.Optional[TimeLike] = None,
317        end: t.Optional[TimeLike] = None,
318        execution_time: t.Optional[TimeLike] = None,
319        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
320        table_mapping: t.Optional[t.Dict[str, str]] = None,
321        deployability_index: t.Optional[DeployabilityIndex] = None,
322        expand: t.Iterable[str] = tuple(),
323        **kwargs: t.Any,
324    ) -> t.Optional[t.List[exp.Expression]]:
325        try:
326            expressions = super()._render(
327                start=start,
328                end=end,
329                execution_time=execution_time,
330                snapshots=snapshots,
331                deployability_index=deployability_index,
332                **kwargs,
333            )
334        except ParsetimeAdapterCallError:
335            return None
336
337        return [
338            self._resolve_tables(
339                e,
340                snapshots=snapshots,
341                table_mapping=table_mapping,
342                expand=expand,
343                deployability_index=deployability_index,
344                start=start,
345                end=end,
346                execution_time=execution_time,
347                **kwargs,
348            )
349            for e in expressions
350            if e
351        ]
352
353
354class QueryRenderer(BaseExpressionRenderer):
355    def __init__(self, *args: t.Any, **kwargs: t.Any):
356        super().__init__(*args, **kwargs)
357        self._optimized_cache: t.Optional[exp.Query] = None
358
359    def update_schema(self, schema: t.Dict[str, t.Any]) -> None:
360        super().update_schema(schema)
361        self._optimized_cache = None
362
363    def render(
364        self,
365        start: t.Optional[TimeLike] = None,
366        end: t.Optional[TimeLike] = None,
367        execution_time: t.Optional[TimeLike] = None,
368        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
369        table_mapping: t.Optional[t.Dict[str, str]] = None,
370        deployability_index: t.Optional[DeployabilityIndex] = None,
371        expand: t.Iterable[str] = tuple(),
372        optimize: bool = True,
373        runtime_stage: RuntimeStage = RuntimeStage.LOADING,
374        **kwargs: t.Any,
375    ) -> t.Optional[exp.Query]:
376        """Renders a query, expanding macros with provided kwargs, and optionally expanding referenced models.
377
378        Args:
379            query: The query to render.
380            start: The start datetime to render. Defaults to epoch start.
381            end: The end datetime to render. Defaults to epoch start.
382            execution_time: The date/time time reference to use for execution time. Defaults to epoch start.
383            snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations.
384            table_mapping: Table mapping of physical locations. Takes precedence over snapshot mappings.
385            deployability_index: Determines snapshots that are deployable in the context of this evaluation.
386            expand: Expand referenced models as subqueries. This is used to bypass backfills when running queries
387                that depend on materialized tables.  Model definitions are inlined and can thus be run end to
388                end on the fly.
389            optimize: Whether to optimize the query.
390            runtime_stage: Indicates the current runtime stage, for example if we're still loading the project, etc.
391            kwargs: Additional kwargs to pass to the renderer.
392
393        Returns:
394            The rendered expression.
395        """
396
397        should_cache = self._should_cache(
398            runtime_stage, start, end, execution_time, *kwargs.values()
399        )
400
401        if should_cache and self._optimized_cache and optimize:
402            query = self._optimized_cache
403        else:
404            try:
405                expressions = super()._render(
406                    start=start,
407                    end=end,
408                    execution_time=execution_time,
409                    snapshots=snapshots,
410                    table_mapping=table_mapping,
411                    deployability_index=deployability_index,
412                    runtime_stage=runtime_stage,
413                    **kwargs,
414                )
415            except ParsetimeAdapterCallError:
416                return None
417
418            if not expressions:
419                raise ConfigError(f"Failed to render query at '{self._path}':\n{self._expression}")
420
421            if len(expressions) > 1:
422                raise ConfigError(f"Too many statements in query:\n{self._expression}")
423
424            query = expressions[0]  # type: ignore
425
426            if not query:
427                return None
428            if not isinstance(query, exp.Query):
429                raise_config_error(f"Query needs to be a SELECT or a UNION {query}.", self._path)
430                raise
431
432            if optimize:
433                deps = d.find_tables(
434                    query, default_catalog=self._default_catalog, dialect=self._dialect
435                )
436
437                query = self._optimize_query(query, deps)
438
439                if should_cache:
440                    self._optimized_cache = query
441
442        if optimize:
443            query = self._resolve_tables(
444                query,
445                snapshots=snapshots,
446                table_mapping=table_mapping,
447                expand=expand,
448                deployability_index=deployability_index,
449                start=start,
450                end=end,
451                execution_time=execution_time,
452                runtime_stage=runtime_stage,
453                **kwargs,
454            )
455
456        return query
457
458    def update_cache(self, expression: t.Optional[exp.Expression], optimized: bool = False) -> None:
459        if optimized:
460            if not isinstance(expression, exp.Query):
461                raise SQLMeshError(f"Expected a Query but got: {expression}")
462            self._optimized_cache = expression
463        else:
464            super().update_cache(expression)
465
466    def _optimize_query(self, query: exp.Query, all_deps: t.Set[str]) -> exp.Query:
467        # We don't want to normalize names in the schema because that's handled by the optimizer
468        original = query
469        missing_deps = set()
470        all_deps = all_deps - {self._model_fqn}
471        should_optimize = not self.schema.empty or not all_deps
472
473        for dep in all_deps:
474            if not self.schema.find(exp.to_table(dep)):
475                should_optimize = False
476                missing_deps.add(dep)
477
478        if self._model_fqn and not should_optimize and any(s.is_star for s in query.selects):
479            deps = ", ".join(f"'{dep}'" for dep in sorted(missing_deps))
480
481            logger.warning(
482                f"SELECT * cannot be expanded due to missing schema(s) for model(s): {deps}. "
483                "Run `sqlmesh create_external_models` and / or make sure that the model "
484                f"'{self._model_fqn}' can be rendered at parse time.",
485            )
486
487        try:
488            if should_optimize:
489                query = query.copy()
490                simplify(
491                    annotate_types(
492                        qualify(
493                            query,
494                            dialect=self._dialect,
495                            schema=self.schema,
496                            infer_schema=False,
497                            catalog=self._default_catalog,
498                            quote_identifiers=self._quote_identifiers,
499                        ),
500                        schema=self.schema,
501                    )
502                )
503        except SqlglotError as ex:
504            query = original
505
506            logger.warning(
507                "%s for model '%s', the column may not exist or is ambiguous", ex, self._model_fqn
508            )
509
510        if not query.type:
511            for select in query.expressions:
512                annotate_types(select)
513
514        return query
class BaseExpressionRenderer:
 38class BaseExpressionRenderer:
 39    def __init__(
 40        self,
 41        expression: exp.Expression,
 42        dialect: str,
 43        macro_definitions: t.List[d.MacroDef],
 44        path: Path = Path(),
 45        jinja_macro_registry: t.Optional[JinjaMacroRegistry] = None,
 46        python_env: t.Optional[t.Dict[str, Executable]] = None,
 47        only_execution_time: bool = False,
 48        schema: t.Optional[t.Dict[str, t.Any]] = None,
 49        default_catalog: t.Optional[str] = None,
 50        quote_identifiers: bool = True,
 51        model_fqn: t.Optional[str] = None,
 52        normalize_identifiers: bool = True,
 53    ):
 54        self._expression = expression
 55        self._dialect = dialect
 56        self._macro_definitions = macro_definitions
 57        self._path = path
 58        self._jinja_macro_registry = jinja_macro_registry or JinjaMacroRegistry()
 59        self._python_env = python_env or {}
 60        self._only_execution_time = only_execution_time
 61        self._default_catalog = default_catalog
 62        self._normalize_identifiers = normalize_identifiers
 63        self._quote_identifiers = quote_identifiers
 64        self.update_schema({} if schema is None else schema)
 65        self._cache: t.List[t.Optional[exp.Expression]] = []
 66        self._model_fqn = model_fqn
 67
 68    def update_schema(self, schema: t.Dict[str, t.Any]) -> None:
 69        self.schema = d.normalize_mapping_schema(schema, dialect=self._dialect)
 70
 71    def _render(
 72        self,
 73        start: t.Optional[TimeLike] = None,
 74        end: t.Optional[TimeLike] = None,
 75        execution_time: t.Optional[TimeLike] = None,
 76        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
 77        table_mapping: t.Optional[t.Dict[str, str]] = None,
 78        deployability_index: t.Optional[DeployabilityIndex] = None,
 79        runtime_stage: RuntimeStage = RuntimeStage.LOADING,
 80        **kwargs: t.Any,
 81    ) -> t.List[t.Optional[exp.Expression]]:
 82        """Renders a expression, expanding macros with provided kwargs
 83
 84        Args:
 85            start: The start datetime to render. Defaults to epoch start.
 86            end: The end datetime to render. Defaults to epoch start.
 87            execution_time: The date/time time reference to use for execution time.
 88            snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations.
 89            table_mapping: Table mapping of physical locations. Takes precedence over snapshot mappings.
 90            deployability_index: Determines snapshots that are deployable in the context of this evaluation.
 91            runtime_stage: Indicates the current runtime stage, for example if we're still loading the project, etc.
 92            kwargs: Additional kwargs to pass to the renderer.
 93
 94        Returns:
 95            The rendered expressions.
 96        """
 97
 98        should_cache = self._should_cache(
 99            runtime_stage, start, end, execution_time, *kwargs.values()
100        )
101
102        if should_cache and self._cache:
103            return self._cache
104
105        if self._model_fqn and "this_model" not in kwargs:
106            kwargs["this_model"] = exp.to_table(
107                self._to_table_mapping(
108                    (
109                        [snapshots[self._model_fqn]]
110                        if snapshots and self._model_fqn in snapshots
111                        else []
112                    ),
113                    deployability_index,
114                ).get(self._model_fqn, self._model_fqn),
115                dialect=self._dialect,
116            ).sql(dialect=self._dialect, identify=True)
117
118        expressions = [self._expression]
119
120        render_kwargs = {
121            **date_dict(
122                to_datetime(execution_time or c.EPOCH),
123                to_datetime(start or c.EPOCH) if not self._only_execution_time else None,
124                make_inclusive_end(end or c.EPOCH) if not self._only_execution_time else None,
125            ),
126            **kwargs,
127        }
128
129        jinja_env = self._jinja_macro_registry.build_environment(
130            **{**render_kwargs, **prepare_env(self._python_env)},
131            snapshots=(snapshots or {}),
132            table_mapping=table_mapping,
133            deployability_index=deployability_index,
134            default_catalog=self._default_catalog,
135            runtime_stage=runtime_stage.value,
136        )
137
138        if isinstance(self._expression, d.Jinja):
139            try:
140                expressions = []
141                rendered_expression = jinja_env.from_string(self._expression.name).render()
142                if rendered_expression.strip():
143                    expressions = [e for e in parse(rendered_expression, read=self._dialect) if e]
144
145                    if not expressions:
146                        raise ConfigError(f"Failed to parse an expression:\n{self._expression}")
147            except ParsetimeAdapterCallError:
148                raise
149            except Exception as ex:
150                raise ConfigError(
151                    f"Could not render or parse jinja at '{self._path}'.\n{ex}"
152                ) from ex
153
154        macro_evaluator = MacroEvaluator(
155            self._dialect,
156            python_env=self._python_env,
157            jinja_env=jinja_env,
158            schema=self.schema,
159            runtime_stage=runtime_stage,
160            resolve_tables=lambda e: self._resolve_tables(
161                e,
162                snapshots=snapshots,
163                table_mapping=table_mapping,
164                deployability_index=deployability_index,
165                start=start,
166                end=end,
167                execution_time=execution_time,
168                runtime_stage=runtime_stage,
169            ),
170            snapshots=snapshots,
171            default_catalog=self._default_catalog,
172            path=self._path,
173        )
174
175        for definition in self._macro_definitions:
176            try:
177                macro_evaluator.evaluate(definition)
178            except MacroEvalError as ex:
179                raise_config_error(f"Failed to evaluate macro '{definition}'. {ex}", self._path)
180
181        macro_evaluator.locals.update(render_kwargs)
182
183        resolved_expressions: t.List[t.Optional[exp.Expression]] = []
184
185        for expression in expressions:
186            try:
187                expression = macro_evaluator.transform(expression)  # type: ignore
188            except MacroEvalError as ex:
189                raise_config_error(f"Failed to resolve macro for expression. {ex}", self._path)
190
191            if expression:
192                with self._normalize_and_quote(expression) as expression:
193                    if hasattr(expression, "selects"):
194                        for select in expression.selects:
195                            if not isinstance(select, exp.Alias) and select.output_name not in (
196                                "*",
197                                "",
198                            ):
199                                alias = exp.alias_(
200                                    select, select.output_name, quoted=self._quote_identifiers
201                                )
202                                comments = alias.this.comments
203                                if comments:
204                                    alias.add_comments(comments)
205                                    comments.clear()
206
207                                select.replace(alias)
208                resolved_expressions.append(expression)
209
210        # We dont cache here if columns_to_type was called in a macro.
211        # This allows the model's query to be re-rendered so that the
212        # MacroEvaluator can resolve columns_to_types calls and provide true schemas.
213        if should_cache and (not self.schema.empty or not macro_evaluator.columns_to_types_called):
214            self._cache = resolved_expressions
215        return resolved_expressions
216
217    def update_cache(self, expression: t.Optional[exp.Expression]) -> None:
218        self._cache = [expression]
219
220    def _resolve_tables(
221        self,
222        expression: E,
223        *,
224        start: t.Optional[TimeLike] = None,
225        end: t.Optional[TimeLike] = None,
226        execution_time: t.Optional[TimeLike] = None,
227        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
228        table_mapping: t.Optional[t.Dict[str, str]] = None,
229        expand: t.Iterable[str] = tuple(),
230        deployability_index: t.Optional[DeployabilityIndex] = None,
231        **kwargs: t.Any,
232    ) -> E:
233        if not snapshots and not table_mapping and not expand:
234            return expression
235
236        expression = expression.copy()
237        with self._normalize_and_quote(expression) as expression:
238            snapshots = snapshots or {}
239            table_mapping = table_mapping or {}
240            mapping = {
241                **self._to_table_mapping(snapshots.values(), deployability_index),
242                **table_mapping,
243            }
244            expand = set(expand) | {
245                name for name, snapshot in snapshots.items() if snapshot.is_embedded
246            }
247
248            if expand:
249                model_mapping = {
250                    name: snapshot.model
251                    for name, snapshot in snapshots.items()
252                    if snapshot.is_model
253                }
254
255                def _expand(node: exp.Expression) -> exp.Expression:
256                    if isinstance(node, exp.Table) and snapshots:
257                        name = exp.table_name(node, identify=True)
258                        model = model_mapping.get(name)
259                        if (
260                            name in expand
261                            and model
262                            and not model.is_seed
263                            and not model.kind.is_external
264                        ):
265                            nested_query = model.render_query(
266                                start=start,
267                                end=end,
268                                execution_time=execution_time,
269                                snapshots=snapshots,
270                                table_mapping=table_mapping,
271                                expand=expand,
272                                deployability_index=deployability_index,
273                                **kwargs,
274                            )
275                            if nested_query is not None:
276                                return nested_query.subquery(
277                                    alias=node.alias or model.view_name,
278                                    copy=False,
279                                )
280                            else:
281                                logger.warning("Failed to expand the nested model '%s'", name)
282                    return node
283
284                expression = expression.transform(_expand, copy=False)  # type: ignore
285
286            if mapping:
287                expression = exp.replace_tables(
288                    expression, mapping, dialect=self._dialect, copy=False
289                )
290
291            return expression
292
293    @contextmanager
294    def _normalize_and_quote(self, query: E) -> t.Iterator[E]:
295        if self._normalize_identifiers:
296            with d.normalize_and_quote(
297                query, self._dialect, self._default_catalog, quote=self._quote_identifiers
298            ) as query:
299                yield query
300        else:
301            yield query
302
303    def _should_cache(self, runtime_stage: RuntimeStage, *args: t.Any) -> bool:
304        return runtime_stage == RuntimeStage.LOADING and not any(args)
305
306    def _to_table_mapping(
307        self, snapshots: t.Iterable[Snapshot], deployability_index: t.Optional[DeployabilityIndex]
308    ) -> t.Dict[str, str]:
309        from sqlmesh.core.snapshot import to_table_mapping
310
311        return to_table_mapping(snapshots, deployability_index)
BaseExpressionRenderer( expression: sqlglot.expressions.Expression, dialect: str, macro_definitions: List[sqlmesh.core.dialect.MacroDef], path: pathlib.Path = PosixPath('.'), jinja_macro_registry: Union[sqlmesh.utils.jinja.JinjaMacroRegistry, NoneType] = None, python_env: Union[Dict[str, sqlmesh.utils.metaprogramming.Executable], NoneType] = None, only_execution_time: bool = False, schema: Union[Dict[str, Any], NoneType] = None, default_catalog: Union[str, NoneType] = None, quote_identifiers: bool = True, model_fqn: Union[str, NoneType] = None, normalize_identifiers: bool = True)
39    def __init__(
40        self,
41        expression: exp.Expression,
42        dialect: str,
43        macro_definitions: t.List[d.MacroDef],
44        path: Path = Path(),
45        jinja_macro_registry: t.Optional[JinjaMacroRegistry] = None,
46        python_env: t.Optional[t.Dict[str, Executable]] = None,
47        only_execution_time: bool = False,
48        schema: t.Optional[t.Dict[str, t.Any]] = None,
49        default_catalog: t.Optional[str] = None,
50        quote_identifiers: bool = True,
51        model_fqn: t.Optional[str] = None,
52        normalize_identifiers: bool = True,
53    ):
54        self._expression = expression
55        self._dialect = dialect
56        self._macro_definitions = macro_definitions
57        self._path = path
58        self._jinja_macro_registry = jinja_macro_registry or JinjaMacroRegistry()
59        self._python_env = python_env or {}
60        self._only_execution_time = only_execution_time
61        self._default_catalog = default_catalog
62        self._normalize_identifiers = normalize_identifiers
63        self._quote_identifiers = quote_identifiers
64        self.update_schema({} if schema is None else schema)
65        self._cache: t.List[t.Optional[exp.Expression]] = []
66        self._model_fqn = model_fqn
def update_schema(self, schema: Dict[str, Any]) -> None:
68    def update_schema(self, schema: t.Dict[str, t.Any]) -> None:
69        self.schema = d.normalize_mapping_schema(schema, dialect=self._dialect)
def update_cache( self, expression: Union[sqlglot.expressions.Expression, NoneType]) -> None:
217    def update_cache(self, expression: t.Optional[exp.Expression]) -> None:
218        self._cache = [expression]
class ExpressionRenderer(BaseExpressionRenderer):
314class ExpressionRenderer(BaseExpressionRenderer):
315    def render(
316        self,
317        start: t.Optional[TimeLike] = None,
318        end: t.Optional[TimeLike] = None,
319        execution_time: t.Optional[TimeLike] = None,
320        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
321        table_mapping: t.Optional[t.Dict[str, str]] = None,
322        deployability_index: t.Optional[DeployabilityIndex] = None,
323        expand: t.Iterable[str] = tuple(),
324        **kwargs: t.Any,
325    ) -> t.Optional[t.List[exp.Expression]]:
326        try:
327            expressions = super()._render(
328                start=start,
329                end=end,
330                execution_time=execution_time,
331                snapshots=snapshots,
332                deployability_index=deployability_index,
333                **kwargs,
334            )
335        except ParsetimeAdapterCallError:
336            return None
337
338        return [
339            self._resolve_tables(
340                e,
341                snapshots=snapshots,
342                table_mapping=table_mapping,
343                expand=expand,
344                deployability_index=deployability_index,
345                start=start,
346                end=end,
347                execution_time=execution_time,
348                **kwargs,
349            )
350            for e in expressions
351            if e
352        ]
def render( self, start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, snapshots: Union[Dict[str, sqlmesh.core.snapshot.definition.Snapshot], NoneType] = None, table_mapping: Union[Dict[str, str], NoneType] = None, deployability_index: Union[sqlmesh.core.snapshot.definition.DeployabilityIndex, NoneType] = None, expand: Iterable[str] = (), **kwargs: Any) -> Union[List[sqlglot.expressions.Expression], NoneType]:
315    def render(
316        self,
317        start: t.Optional[TimeLike] = None,
318        end: t.Optional[TimeLike] = None,
319        execution_time: t.Optional[TimeLike] = None,
320        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
321        table_mapping: t.Optional[t.Dict[str, str]] = None,
322        deployability_index: t.Optional[DeployabilityIndex] = None,
323        expand: t.Iterable[str] = tuple(),
324        **kwargs: t.Any,
325    ) -> t.Optional[t.List[exp.Expression]]:
326        try:
327            expressions = super()._render(
328                start=start,
329                end=end,
330                execution_time=execution_time,
331                snapshots=snapshots,
332                deployability_index=deployability_index,
333                **kwargs,
334            )
335        except ParsetimeAdapterCallError:
336            return None
337
338        return [
339            self._resolve_tables(
340                e,
341                snapshots=snapshots,
342                table_mapping=table_mapping,
343                expand=expand,
344                deployability_index=deployability_index,
345                start=start,
346                end=end,
347                execution_time=execution_time,
348                **kwargs,
349            )
350            for e in expressions
351            if e
352        ]
class QueryRenderer(BaseExpressionRenderer):
355class QueryRenderer(BaseExpressionRenderer):
356    def __init__(self, *args: t.Any, **kwargs: t.Any):
357        super().__init__(*args, **kwargs)
358        self._optimized_cache: t.Optional[exp.Query] = None
359
360    def update_schema(self, schema: t.Dict[str, t.Any]) -> None:
361        super().update_schema(schema)
362        self._optimized_cache = None
363
364    def render(
365        self,
366        start: t.Optional[TimeLike] = None,
367        end: t.Optional[TimeLike] = None,
368        execution_time: t.Optional[TimeLike] = None,
369        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
370        table_mapping: t.Optional[t.Dict[str, str]] = None,
371        deployability_index: t.Optional[DeployabilityIndex] = None,
372        expand: t.Iterable[str] = tuple(),
373        optimize: bool = True,
374        runtime_stage: RuntimeStage = RuntimeStage.LOADING,
375        **kwargs: t.Any,
376    ) -> t.Optional[exp.Query]:
377        """Renders a query, expanding macros with provided kwargs, and optionally expanding referenced models.
378
379        Args:
380            query: The query to render.
381            start: The start datetime to render. Defaults to epoch start.
382            end: The end datetime to render. Defaults to epoch start.
383            execution_time: The date/time time reference to use for execution time. Defaults to epoch start.
384            snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations.
385            table_mapping: Table mapping of physical locations. Takes precedence over snapshot mappings.
386            deployability_index: Determines snapshots that are deployable in the context of this evaluation.
387            expand: Expand referenced models as subqueries. This is used to bypass backfills when running queries
388                that depend on materialized tables.  Model definitions are inlined and can thus be run end to
389                end on the fly.
390            optimize: Whether to optimize the query.
391            runtime_stage: Indicates the current runtime stage, for example if we're still loading the project, etc.
392            kwargs: Additional kwargs to pass to the renderer.
393
394        Returns:
395            The rendered expression.
396        """
397
398        should_cache = self._should_cache(
399            runtime_stage, start, end, execution_time, *kwargs.values()
400        )
401
402        if should_cache and self._optimized_cache and optimize:
403            query = self._optimized_cache
404        else:
405            try:
406                expressions = super()._render(
407                    start=start,
408                    end=end,
409                    execution_time=execution_time,
410                    snapshots=snapshots,
411                    table_mapping=table_mapping,
412                    deployability_index=deployability_index,
413                    runtime_stage=runtime_stage,
414                    **kwargs,
415                )
416            except ParsetimeAdapterCallError:
417                return None
418
419            if not expressions:
420                raise ConfigError(f"Failed to render query at '{self._path}':\n{self._expression}")
421
422            if len(expressions) > 1:
423                raise ConfigError(f"Too many statements in query:\n{self._expression}")
424
425            query = expressions[0]  # type: ignore
426
427            if not query:
428                return None
429            if not isinstance(query, exp.Query):
430                raise_config_error(f"Query needs to be a SELECT or a UNION {query}.", self._path)
431                raise
432
433            if optimize:
434                deps = d.find_tables(
435                    query, default_catalog=self._default_catalog, dialect=self._dialect
436                )
437
438                query = self._optimize_query(query, deps)
439
440                if should_cache:
441                    self._optimized_cache = query
442
443        if optimize:
444            query = self._resolve_tables(
445                query,
446                snapshots=snapshots,
447                table_mapping=table_mapping,
448                expand=expand,
449                deployability_index=deployability_index,
450                start=start,
451                end=end,
452                execution_time=execution_time,
453                runtime_stage=runtime_stage,
454                **kwargs,
455            )
456
457        return query
458
459    def update_cache(self, expression: t.Optional[exp.Expression], optimized: bool = False) -> None:
460        if optimized:
461            if not isinstance(expression, exp.Query):
462                raise SQLMeshError(f"Expected a Query but got: {expression}")
463            self._optimized_cache = expression
464        else:
465            super().update_cache(expression)
466
467    def _optimize_query(self, query: exp.Query, all_deps: t.Set[str]) -> exp.Query:
468        # We don't want to normalize names in the schema because that's handled by the optimizer
469        original = query
470        missing_deps = set()
471        all_deps = all_deps - {self._model_fqn}
472        should_optimize = not self.schema.empty or not all_deps
473
474        for dep in all_deps:
475            if not self.schema.find(exp.to_table(dep)):
476                should_optimize = False
477                missing_deps.add(dep)
478
479        if self._model_fqn and not should_optimize and any(s.is_star for s in query.selects):
480            deps = ", ".join(f"'{dep}'" for dep in sorted(missing_deps))
481
482            logger.warning(
483                f"SELECT * cannot be expanded due to missing schema(s) for model(s): {deps}. "
484                "Run `sqlmesh create_external_models` and / or make sure that the model "
485                f"'{self._model_fqn}' can be rendered at parse time.",
486            )
487
488        try:
489            if should_optimize:
490                query = query.copy()
491                simplify(
492                    annotate_types(
493                        qualify(
494                            query,
495                            dialect=self._dialect,
496                            schema=self.schema,
497                            infer_schema=False,
498                            catalog=self._default_catalog,
499                            quote_identifiers=self._quote_identifiers,
500                        ),
501                        schema=self.schema,
502                    )
503                )
504        except SqlglotError as ex:
505            query = original
506
507            logger.warning(
508                "%s for model '%s', the column may not exist or is ambiguous", ex, self._model_fqn
509            )
510
511        if not query.type:
512            for select in query.expressions:
513                annotate_types(select)
514
515        return query
QueryRenderer(*args: Any, **kwargs: Any)
356    def __init__(self, *args: t.Any, **kwargs: t.Any):
357        super().__init__(*args, **kwargs)
358        self._optimized_cache: t.Optional[exp.Query] = None
def update_schema(self, schema: Dict[str, Any]) -> None:
360    def update_schema(self, schema: t.Dict[str, t.Any]) -> None:
361        super().update_schema(schema)
362        self._optimized_cache = None
def render( self, start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, snapshots: Union[Dict[str, sqlmesh.core.snapshot.definition.Snapshot], NoneType] = None, table_mapping: Union[Dict[str, str], NoneType] = None, deployability_index: Union[sqlmesh.core.snapshot.definition.DeployabilityIndex, NoneType] = None, expand: Iterable[str] = (), optimize: bool = True, runtime_stage: sqlmesh.core.macros.RuntimeStage = <RuntimeStage.LOADING: 'loading'>, **kwargs: Any) -> Union[sqlglot.expressions.Query, NoneType]:
364    def render(
365        self,
366        start: t.Optional[TimeLike] = None,
367        end: t.Optional[TimeLike] = None,
368        execution_time: t.Optional[TimeLike] = None,
369        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
370        table_mapping: t.Optional[t.Dict[str, str]] = None,
371        deployability_index: t.Optional[DeployabilityIndex] = None,
372        expand: t.Iterable[str] = tuple(),
373        optimize: bool = True,
374        runtime_stage: RuntimeStage = RuntimeStage.LOADING,
375        **kwargs: t.Any,
376    ) -> t.Optional[exp.Query]:
377        """Renders a query, expanding macros with provided kwargs, and optionally expanding referenced models.
378
379        Args:
380            query: The query to render.
381            start: The start datetime to render. Defaults to epoch start.
382            end: The end datetime to render. Defaults to epoch start.
383            execution_time: The date/time time reference to use for execution time. Defaults to epoch start.
384            snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations.
385            table_mapping: Table mapping of physical locations. Takes precedence over snapshot mappings.
386            deployability_index: Determines snapshots that are deployable in the context of this evaluation.
387            expand: Expand referenced models as subqueries. This is used to bypass backfills when running queries
388                that depend on materialized tables.  Model definitions are inlined and can thus be run end to
389                end on the fly.
390            optimize: Whether to optimize the query.
391            runtime_stage: Indicates the current runtime stage, for example if we're still loading the project, etc.
392            kwargs: Additional kwargs to pass to the renderer.
393
394        Returns:
395            The rendered expression.
396        """
397
398        should_cache = self._should_cache(
399            runtime_stage, start, end, execution_time, *kwargs.values()
400        )
401
402        if should_cache and self._optimized_cache and optimize:
403            query = self._optimized_cache
404        else:
405            try:
406                expressions = super()._render(
407                    start=start,
408                    end=end,
409                    execution_time=execution_time,
410                    snapshots=snapshots,
411                    table_mapping=table_mapping,
412                    deployability_index=deployability_index,
413                    runtime_stage=runtime_stage,
414                    **kwargs,
415                )
416            except ParsetimeAdapterCallError:
417                return None
418
419            if not expressions:
420                raise ConfigError(f"Failed to render query at '{self._path}':\n{self._expression}")
421
422            if len(expressions) > 1:
423                raise ConfigError(f"Too many statements in query:\n{self._expression}")
424
425            query = expressions[0]  # type: ignore
426
427            if not query:
428                return None
429            if not isinstance(query, exp.Query):
430                raise_config_error(f"Query needs to be a SELECT or a UNION {query}.", self._path)
431                raise
432
433            if optimize:
434                deps = d.find_tables(
435                    query, default_catalog=self._default_catalog, dialect=self._dialect
436                )
437
438                query = self._optimize_query(query, deps)
439
440                if should_cache:
441                    self._optimized_cache = query
442
443        if optimize:
444            query = self._resolve_tables(
445                query,
446                snapshots=snapshots,
447                table_mapping=table_mapping,
448                expand=expand,
449                deployability_index=deployability_index,
450                start=start,
451                end=end,
452                execution_time=execution_time,
453                runtime_stage=runtime_stage,
454                **kwargs,
455            )
456
457        return query

Renders a query, expanding macros with provided kwargs, and optionally expanding referenced models.

Arguments:
  • query: The query to render.
  • start: The start datetime to render. Defaults to epoch start.
  • end: The end datetime to render. Defaults to epoch start.
  • execution_time: The date/time time reference to use for execution time. Defaults to epoch start.
  • snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations.
  • table_mapping: Table mapping of physical locations. Takes precedence over snapshot mappings.
  • deployability_index: Determines snapshots that are deployable in the context of this evaluation.
  • expand: Expand referenced models as subqueries. This is used to bypass backfills when running queries that depend on materialized tables. Model definitions are inlined and can thus be run end to end on the fly.
  • optimize: Whether to optimize the query.
  • runtime_stage: Indicates the current runtime stage, for example if we're still loading the project, etc.
  • kwargs: Additional kwargs to pass to the renderer.
Returns:

The rendered expression.

def update_cache( self, expression: Union[sqlglot.expressions.Expression, NoneType], optimized: bool = False) -> None:
459    def update_cache(self, expression: t.Optional[exp.Expression], optimized: bool = False) -> None:
460        if optimized:
461            if not isinstance(expression, exp.Query):
462                raise SQLMeshError(f"Expected a Query but got: {expression}")
463            self._optimized_cache = expression
464        else:
465            super().update_cache(expression)