sqlmesh.core.renderer
1from __future__ import annotations 2 3import logging 4import typing as t 5from contextlib import contextmanager 6from pathlib import Path 7 8from sqlglot import exp, parse 9from sqlglot.errors import SqlglotError 10from sqlglot.optimizer.annotate_types import annotate_types 11from sqlglot.optimizer.qualify import qualify 12from sqlglot.optimizer.simplify import simplify 13 14from sqlmesh.core import constants as c 15from sqlmesh.core import dialect as d 16from sqlmesh.core.macros import MacroEvaluator, RuntimeStage 17from sqlmesh.utils.date import TimeLike, date_dict, make_inclusive_end, to_datetime 18from sqlmesh.utils.errors import ( 19 ConfigError, 20 MacroEvalError, 21 ParsetimeAdapterCallError, 22 SQLMeshError, 23 raise_config_error, 24) 25from sqlmesh.utils.jinja import JinjaMacroRegistry 26from sqlmesh.utils.metaprogramming import Executable, prepare_env 27 28if t.TYPE_CHECKING: 29 from sqlglot._typing import E 30 31 from sqlmesh.core.snapshot import DeployabilityIndex, Snapshot 32 33 34logger = logging.getLogger(__name__) 35 36 37class BaseExpressionRenderer: 38 def __init__( 39 self, 40 expression: exp.Expression, 41 dialect: str, 42 macro_definitions: t.List[d.MacroDef], 43 path: Path = Path(), 44 jinja_macro_registry: t.Optional[JinjaMacroRegistry] = None, 45 python_env: t.Optional[t.Dict[str, Executable]] = None, 46 only_execution_time: bool = False, 47 schema: t.Optional[t.Dict[str, t.Any]] = None, 48 default_catalog: t.Optional[str] = None, 49 quote_identifiers: bool = True, 50 model_fqn: t.Optional[str] = None, 51 normalize_identifiers: bool = True, 52 ): 53 self._expression = expression 54 self._dialect = dialect 55 self._macro_definitions = macro_definitions 56 self._path = path 57 self._jinja_macro_registry = jinja_macro_registry or JinjaMacroRegistry() 58 self._python_env = python_env or {} 59 self._only_execution_time = only_execution_time 60 self._default_catalog = default_catalog 61 self._normalize_identifiers = normalize_identifiers 62 self._quote_identifiers = quote_identifiers 63 self.update_schema({} if schema is None else schema) 64 self._cache: t.List[t.Optional[exp.Expression]] = [] 65 self._model_fqn = model_fqn 66 67 def update_schema(self, schema: t.Dict[str, t.Any]) -> None: 68 self.schema = d.normalize_mapping_schema(schema, dialect=self._dialect) 69 70 def _render( 71 self, 72 start: t.Optional[TimeLike] = None, 73 end: t.Optional[TimeLike] = None, 74 execution_time: t.Optional[TimeLike] = None, 75 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 76 table_mapping: t.Optional[t.Dict[str, str]] = None, 77 deployability_index: t.Optional[DeployabilityIndex] = None, 78 runtime_stage: RuntimeStage = RuntimeStage.LOADING, 79 **kwargs: t.Any, 80 ) -> t.List[t.Optional[exp.Expression]]: 81 """Renders a expression, expanding macros with provided kwargs 82 83 Args: 84 start: The start datetime to render. Defaults to epoch start. 85 end: The end datetime to render. Defaults to epoch start. 86 execution_time: The date/time time reference to use for execution time. 87 snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations. 88 table_mapping: Table mapping of physical locations. Takes precedence over snapshot mappings. 89 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 90 runtime_stage: Indicates the current runtime stage, for example if we're still loading the project, etc. 91 kwargs: Additional kwargs to pass to the renderer. 92 93 Returns: 94 The rendered expressions. 95 """ 96 97 should_cache = self._should_cache( 98 runtime_stage, start, end, execution_time, *kwargs.values() 99 ) 100 101 if should_cache and self._cache: 102 return self._cache 103 104 if self._model_fqn and "this_model" not in kwargs: 105 kwargs["this_model"] = exp.to_table( 106 self._to_table_mapping( 107 ( 108 [snapshots[self._model_fqn]] 109 if snapshots and self._model_fqn in snapshots 110 else [] 111 ), 112 deployability_index, 113 ).get(self._model_fqn, self._model_fqn), 114 dialect=self._dialect, 115 ).sql(dialect=self._dialect, identify=True) 116 117 expressions = [self._expression] 118 119 render_kwargs = { 120 **date_dict( 121 to_datetime(execution_time or c.EPOCH), 122 to_datetime(start or c.EPOCH) if not self._only_execution_time else None, 123 make_inclusive_end(end or c.EPOCH) if not self._only_execution_time else None, 124 ), 125 **kwargs, 126 } 127 128 jinja_env = self._jinja_macro_registry.build_environment( 129 **{**render_kwargs, **prepare_env(self._python_env)}, 130 snapshots=(snapshots or {}), 131 table_mapping=table_mapping, 132 deployability_index=deployability_index, 133 default_catalog=self._default_catalog, 134 runtime_stage=runtime_stage.value, 135 ) 136 137 if isinstance(self._expression, d.Jinja): 138 try: 139 expressions = [] 140 rendered_expression = jinja_env.from_string(self._expression.name).render() 141 if rendered_expression.strip(): 142 expressions = [e for e in parse(rendered_expression, read=self._dialect) if e] 143 144 if not expressions: 145 raise ConfigError(f"Failed to parse an expression:\n{self._expression}") 146 except ParsetimeAdapterCallError: 147 raise 148 except Exception as ex: 149 raise ConfigError( 150 f"Could not render or parse jinja at '{self._path}'.\n{ex}" 151 ) from ex 152 153 macro_evaluator = MacroEvaluator( 154 self._dialect, 155 python_env=self._python_env, 156 jinja_env=jinja_env, 157 schema=self.schema, 158 runtime_stage=runtime_stage, 159 resolve_tables=lambda e: self._resolve_tables( 160 e, 161 snapshots=snapshots, 162 table_mapping=table_mapping, 163 deployability_index=deployability_index, 164 start=start, 165 end=end, 166 execution_time=execution_time, 167 runtime_stage=runtime_stage, 168 ), 169 snapshots=snapshots, 170 default_catalog=self._default_catalog, 171 path=self._path, 172 ) 173 174 for definition in self._macro_definitions: 175 try: 176 macro_evaluator.evaluate(definition) 177 except MacroEvalError as ex: 178 raise_config_error(f"Failed to evaluate macro '{definition}'. {ex}", self._path) 179 180 macro_evaluator.locals.update(render_kwargs) 181 182 resolved_expressions: t.List[t.Optional[exp.Expression]] = [] 183 184 for expression in expressions: 185 try: 186 expression = macro_evaluator.transform(expression) # type: ignore 187 except MacroEvalError as ex: 188 raise_config_error(f"Failed to resolve macro for expression. {ex}", self._path) 189 190 if expression: 191 with self._normalize_and_quote(expression) as expression: 192 if hasattr(expression, "selects"): 193 for select in expression.selects: 194 if not isinstance(select, exp.Alias) and select.output_name not in ( 195 "*", 196 "", 197 ): 198 alias = exp.alias_( 199 select, select.output_name, quoted=self._quote_identifiers 200 ) 201 comments = alias.this.comments 202 if comments: 203 alias.add_comments(comments) 204 comments.clear() 205 206 select.replace(alias) 207 resolved_expressions.append(expression) 208 209 # We dont cache here if columns_to_type was called in a macro. 210 # This allows the model's query to be re-rendered so that the 211 # MacroEvaluator can resolve columns_to_types calls and provide true schemas. 212 if should_cache and (not self.schema.empty or not macro_evaluator.columns_to_types_called): 213 self._cache = resolved_expressions 214 return resolved_expressions 215 216 def update_cache(self, expression: t.Optional[exp.Expression]) -> None: 217 self._cache = [expression] 218 219 def _resolve_tables( 220 self, 221 expression: E, 222 *, 223 start: t.Optional[TimeLike] = None, 224 end: t.Optional[TimeLike] = None, 225 execution_time: t.Optional[TimeLike] = None, 226 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 227 table_mapping: t.Optional[t.Dict[str, str]] = None, 228 expand: t.Iterable[str] = tuple(), 229 deployability_index: t.Optional[DeployabilityIndex] = None, 230 **kwargs: t.Any, 231 ) -> E: 232 if not snapshots and not table_mapping and not expand: 233 return expression 234 235 expression = expression.copy() 236 with self._normalize_and_quote(expression) as expression: 237 snapshots = snapshots or {} 238 table_mapping = table_mapping or {} 239 mapping = { 240 **self._to_table_mapping(snapshots.values(), deployability_index), 241 **table_mapping, 242 } 243 expand = set(expand) | { 244 name for name, snapshot in snapshots.items() if snapshot.is_embedded 245 } 246 247 if expand: 248 model_mapping = { 249 name: snapshot.model 250 for name, snapshot in snapshots.items() 251 if snapshot.is_model 252 } 253 254 def _expand(node: exp.Expression) -> exp.Expression: 255 if isinstance(node, exp.Table) and snapshots: 256 name = exp.table_name(node, identify=True) 257 model = model_mapping.get(name) 258 if ( 259 name in expand 260 and model 261 and not model.is_seed 262 and not model.kind.is_external 263 ): 264 nested_query = model.render_query( 265 start=start, 266 end=end, 267 execution_time=execution_time, 268 snapshots=snapshots, 269 table_mapping=table_mapping, 270 expand=expand, 271 deployability_index=deployability_index, 272 **kwargs, 273 ) 274 if nested_query is not None: 275 return nested_query.subquery( 276 alias=node.alias or model.view_name, 277 copy=False, 278 ) 279 else: 280 logger.warning("Failed to expand the nested model '%s'", name) 281 return node 282 283 expression = expression.transform(_expand, copy=False) # type: ignore 284 285 if mapping: 286 expression = exp.replace_tables( 287 expression, mapping, dialect=self._dialect, copy=False 288 ) 289 290 return expression 291 292 @contextmanager 293 def _normalize_and_quote(self, query: E) -> t.Iterator[E]: 294 if self._normalize_identifiers: 295 with d.normalize_and_quote( 296 query, self._dialect, self._default_catalog, quote=self._quote_identifiers 297 ) as query: 298 yield query 299 else: 300 yield query 301 302 def _should_cache(self, runtime_stage: RuntimeStage, *args: t.Any) -> bool: 303 return runtime_stage == RuntimeStage.LOADING and not any(args) 304 305 def _to_table_mapping( 306 self, snapshots: t.Iterable[Snapshot], deployability_index: t.Optional[DeployabilityIndex] 307 ) -> t.Dict[str, str]: 308 from sqlmesh.core.snapshot import to_table_mapping 309 310 return to_table_mapping(snapshots, deployability_index) 311 312 313class ExpressionRenderer(BaseExpressionRenderer): 314 def render( 315 self, 316 start: t.Optional[TimeLike] = None, 317 end: t.Optional[TimeLike] = None, 318 execution_time: t.Optional[TimeLike] = None, 319 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 320 table_mapping: t.Optional[t.Dict[str, str]] = None, 321 deployability_index: t.Optional[DeployabilityIndex] = None, 322 expand: t.Iterable[str] = tuple(), 323 **kwargs: t.Any, 324 ) -> t.Optional[t.List[exp.Expression]]: 325 try: 326 expressions = super()._render( 327 start=start, 328 end=end, 329 execution_time=execution_time, 330 snapshots=snapshots, 331 deployability_index=deployability_index, 332 **kwargs, 333 ) 334 except ParsetimeAdapterCallError: 335 return None 336 337 return [ 338 self._resolve_tables( 339 e, 340 snapshots=snapshots, 341 table_mapping=table_mapping, 342 expand=expand, 343 deployability_index=deployability_index, 344 start=start, 345 end=end, 346 execution_time=execution_time, 347 **kwargs, 348 ) 349 for e in expressions 350 if e 351 ] 352 353 354class QueryRenderer(BaseExpressionRenderer): 355 def __init__(self, *args: t.Any, **kwargs: t.Any): 356 super().__init__(*args, **kwargs) 357 self._optimized_cache: t.Optional[exp.Query] = None 358 359 def update_schema(self, schema: t.Dict[str, t.Any]) -> None: 360 super().update_schema(schema) 361 self._optimized_cache = None 362 363 def render( 364 self, 365 start: t.Optional[TimeLike] = None, 366 end: t.Optional[TimeLike] = None, 367 execution_time: t.Optional[TimeLike] = None, 368 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 369 table_mapping: t.Optional[t.Dict[str, str]] = None, 370 deployability_index: t.Optional[DeployabilityIndex] = None, 371 expand: t.Iterable[str] = tuple(), 372 optimize: bool = True, 373 runtime_stage: RuntimeStage = RuntimeStage.LOADING, 374 **kwargs: t.Any, 375 ) -> t.Optional[exp.Query]: 376 """Renders a query, expanding macros with provided kwargs, and optionally expanding referenced models. 377 378 Args: 379 query: The query to render. 380 start: The start datetime to render. Defaults to epoch start. 381 end: The end datetime to render. Defaults to epoch start. 382 execution_time: The date/time time reference to use for execution time. Defaults to epoch start. 383 snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations. 384 table_mapping: Table mapping of physical locations. Takes precedence over snapshot mappings. 385 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 386 expand: Expand referenced models as subqueries. This is used to bypass backfills when running queries 387 that depend on materialized tables. Model definitions are inlined and can thus be run end to 388 end on the fly. 389 optimize: Whether to optimize the query. 390 runtime_stage: Indicates the current runtime stage, for example if we're still loading the project, etc. 391 kwargs: Additional kwargs to pass to the renderer. 392 393 Returns: 394 The rendered expression. 395 """ 396 397 should_cache = self._should_cache( 398 runtime_stage, start, end, execution_time, *kwargs.values() 399 ) 400 401 if should_cache and self._optimized_cache and optimize: 402 query = self._optimized_cache 403 else: 404 try: 405 expressions = super()._render( 406 start=start, 407 end=end, 408 execution_time=execution_time, 409 snapshots=snapshots, 410 table_mapping=table_mapping, 411 deployability_index=deployability_index, 412 runtime_stage=runtime_stage, 413 **kwargs, 414 ) 415 except ParsetimeAdapterCallError: 416 return None 417 418 if not expressions: 419 raise ConfigError(f"Failed to render query at '{self._path}':\n{self._expression}") 420 421 if len(expressions) > 1: 422 raise ConfigError(f"Too many statements in query:\n{self._expression}") 423 424 query = expressions[0] # type: ignore 425 426 if not query: 427 return None 428 if not isinstance(query, exp.Query): 429 raise_config_error(f"Query needs to be a SELECT or a UNION {query}.", self._path) 430 raise 431 432 if optimize: 433 deps = d.find_tables( 434 query, default_catalog=self._default_catalog, dialect=self._dialect 435 ) 436 437 query = self._optimize_query(query, deps) 438 439 if should_cache: 440 self._optimized_cache = query 441 442 if optimize: 443 query = self._resolve_tables( 444 query, 445 snapshots=snapshots, 446 table_mapping=table_mapping, 447 expand=expand, 448 deployability_index=deployability_index, 449 start=start, 450 end=end, 451 execution_time=execution_time, 452 runtime_stage=runtime_stage, 453 **kwargs, 454 ) 455 456 return query 457 458 def update_cache(self, expression: t.Optional[exp.Expression], optimized: bool = False) -> None: 459 if optimized: 460 if not isinstance(expression, exp.Query): 461 raise SQLMeshError(f"Expected a Query but got: {expression}") 462 self._optimized_cache = expression 463 else: 464 super().update_cache(expression) 465 466 def _optimize_query(self, query: exp.Query, all_deps: t.Set[str]) -> exp.Query: 467 # We don't want to normalize names in the schema because that's handled by the optimizer 468 original = query 469 missing_deps = set() 470 all_deps = all_deps - {self._model_fqn} 471 should_optimize = not self.schema.empty or not all_deps 472 473 for dep in all_deps: 474 if not self.schema.find(exp.to_table(dep)): 475 should_optimize = False 476 missing_deps.add(dep) 477 478 if self._model_fqn and not should_optimize and any(s.is_star for s in query.selects): 479 deps = ", ".join(f"'{dep}'" for dep in sorted(missing_deps)) 480 481 logger.warning( 482 f"SELECT * cannot be expanded due to missing schema(s) for model(s): {deps}. " 483 "Run `sqlmesh create_external_models` and / or make sure that the model " 484 f"'{self._model_fqn}' can be rendered at parse time.", 485 ) 486 487 try: 488 if should_optimize: 489 query = query.copy() 490 simplify( 491 annotate_types( 492 qualify( 493 query, 494 dialect=self._dialect, 495 schema=self.schema, 496 infer_schema=False, 497 catalog=self._default_catalog, 498 quote_identifiers=self._quote_identifiers, 499 ), 500 schema=self.schema, 501 ) 502 ) 503 except SqlglotError as ex: 504 query = original 505 506 logger.warning( 507 "%s for model '%s', the column may not exist or is ambiguous", ex, self._model_fqn 508 ) 509 510 if not query.type: 511 for select in query.expressions: 512 annotate_types(select) 513 514 return query
class
BaseExpressionRenderer:
38class BaseExpressionRenderer: 39 def __init__( 40 self, 41 expression: exp.Expression, 42 dialect: str, 43 macro_definitions: t.List[d.MacroDef], 44 path: Path = Path(), 45 jinja_macro_registry: t.Optional[JinjaMacroRegistry] = None, 46 python_env: t.Optional[t.Dict[str, Executable]] = None, 47 only_execution_time: bool = False, 48 schema: t.Optional[t.Dict[str, t.Any]] = None, 49 default_catalog: t.Optional[str] = None, 50 quote_identifiers: bool = True, 51 model_fqn: t.Optional[str] = None, 52 normalize_identifiers: bool = True, 53 ): 54 self._expression = expression 55 self._dialect = dialect 56 self._macro_definitions = macro_definitions 57 self._path = path 58 self._jinja_macro_registry = jinja_macro_registry or JinjaMacroRegistry() 59 self._python_env = python_env or {} 60 self._only_execution_time = only_execution_time 61 self._default_catalog = default_catalog 62 self._normalize_identifiers = normalize_identifiers 63 self._quote_identifiers = quote_identifiers 64 self.update_schema({} if schema is None else schema) 65 self._cache: t.List[t.Optional[exp.Expression]] = [] 66 self._model_fqn = model_fqn 67 68 def update_schema(self, schema: t.Dict[str, t.Any]) -> None: 69 self.schema = d.normalize_mapping_schema(schema, dialect=self._dialect) 70 71 def _render( 72 self, 73 start: t.Optional[TimeLike] = None, 74 end: t.Optional[TimeLike] = None, 75 execution_time: t.Optional[TimeLike] = None, 76 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 77 table_mapping: t.Optional[t.Dict[str, str]] = None, 78 deployability_index: t.Optional[DeployabilityIndex] = None, 79 runtime_stage: RuntimeStage = RuntimeStage.LOADING, 80 **kwargs: t.Any, 81 ) -> t.List[t.Optional[exp.Expression]]: 82 """Renders a expression, expanding macros with provided kwargs 83 84 Args: 85 start: The start datetime to render. Defaults to epoch start. 86 end: The end datetime to render. Defaults to epoch start. 87 execution_time: The date/time time reference to use for execution time. 88 snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations. 89 table_mapping: Table mapping of physical locations. Takes precedence over snapshot mappings. 90 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 91 runtime_stage: Indicates the current runtime stage, for example if we're still loading the project, etc. 92 kwargs: Additional kwargs to pass to the renderer. 93 94 Returns: 95 The rendered expressions. 96 """ 97 98 should_cache = self._should_cache( 99 runtime_stage, start, end, execution_time, *kwargs.values() 100 ) 101 102 if should_cache and self._cache: 103 return self._cache 104 105 if self._model_fqn and "this_model" not in kwargs: 106 kwargs["this_model"] = exp.to_table( 107 self._to_table_mapping( 108 ( 109 [snapshots[self._model_fqn]] 110 if snapshots and self._model_fqn in snapshots 111 else [] 112 ), 113 deployability_index, 114 ).get(self._model_fqn, self._model_fqn), 115 dialect=self._dialect, 116 ).sql(dialect=self._dialect, identify=True) 117 118 expressions = [self._expression] 119 120 render_kwargs = { 121 **date_dict( 122 to_datetime(execution_time or c.EPOCH), 123 to_datetime(start or c.EPOCH) if not self._only_execution_time else None, 124 make_inclusive_end(end or c.EPOCH) if not self._only_execution_time else None, 125 ), 126 **kwargs, 127 } 128 129 jinja_env = self._jinja_macro_registry.build_environment( 130 **{**render_kwargs, **prepare_env(self._python_env)}, 131 snapshots=(snapshots or {}), 132 table_mapping=table_mapping, 133 deployability_index=deployability_index, 134 default_catalog=self._default_catalog, 135 runtime_stage=runtime_stage.value, 136 ) 137 138 if isinstance(self._expression, d.Jinja): 139 try: 140 expressions = [] 141 rendered_expression = jinja_env.from_string(self._expression.name).render() 142 if rendered_expression.strip(): 143 expressions = [e for e in parse(rendered_expression, read=self._dialect) if e] 144 145 if not expressions: 146 raise ConfigError(f"Failed to parse an expression:\n{self._expression}") 147 except ParsetimeAdapterCallError: 148 raise 149 except Exception as ex: 150 raise ConfigError( 151 f"Could not render or parse jinja at '{self._path}'.\n{ex}" 152 ) from ex 153 154 macro_evaluator = MacroEvaluator( 155 self._dialect, 156 python_env=self._python_env, 157 jinja_env=jinja_env, 158 schema=self.schema, 159 runtime_stage=runtime_stage, 160 resolve_tables=lambda e: self._resolve_tables( 161 e, 162 snapshots=snapshots, 163 table_mapping=table_mapping, 164 deployability_index=deployability_index, 165 start=start, 166 end=end, 167 execution_time=execution_time, 168 runtime_stage=runtime_stage, 169 ), 170 snapshots=snapshots, 171 default_catalog=self._default_catalog, 172 path=self._path, 173 ) 174 175 for definition in self._macro_definitions: 176 try: 177 macro_evaluator.evaluate(definition) 178 except MacroEvalError as ex: 179 raise_config_error(f"Failed to evaluate macro '{definition}'. {ex}", self._path) 180 181 macro_evaluator.locals.update(render_kwargs) 182 183 resolved_expressions: t.List[t.Optional[exp.Expression]] = [] 184 185 for expression in expressions: 186 try: 187 expression = macro_evaluator.transform(expression) # type: ignore 188 except MacroEvalError as ex: 189 raise_config_error(f"Failed to resolve macro for expression. {ex}", self._path) 190 191 if expression: 192 with self._normalize_and_quote(expression) as expression: 193 if hasattr(expression, "selects"): 194 for select in expression.selects: 195 if not isinstance(select, exp.Alias) and select.output_name not in ( 196 "*", 197 "", 198 ): 199 alias = exp.alias_( 200 select, select.output_name, quoted=self._quote_identifiers 201 ) 202 comments = alias.this.comments 203 if comments: 204 alias.add_comments(comments) 205 comments.clear() 206 207 select.replace(alias) 208 resolved_expressions.append(expression) 209 210 # We dont cache here if columns_to_type was called in a macro. 211 # This allows the model's query to be re-rendered so that the 212 # MacroEvaluator can resolve columns_to_types calls and provide true schemas. 213 if should_cache and (not self.schema.empty or not macro_evaluator.columns_to_types_called): 214 self._cache = resolved_expressions 215 return resolved_expressions 216 217 def update_cache(self, expression: t.Optional[exp.Expression]) -> None: 218 self._cache = [expression] 219 220 def _resolve_tables( 221 self, 222 expression: E, 223 *, 224 start: t.Optional[TimeLike] = None, 225 end: t.Optional[TimeLike] = None, 226 execution_time: t.Optional[TimeLike] = None, 227 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 228 table_mapping: t.Optional[t.Dict[str, str]] = None, 229 expand: t.Iterable[str] = tuple(), 230 deployability_index: t.Optional[DeployabilityIndex] = None, 231 **kwargs: t.Any, 232 ) -> E: 233 if not snapshots and not table_mapping and not expand: 234 return expression 235 236 expression = expression.copy() 237 with self._normalize_and_quote(expression) as expression: 238 snapshots = snapshots or {} 239 table_mapping = table_mapping or {} 240 mapping = { 241 **self._to_table_mapping(snapshots.values(), deployability_index), 242 **table_mapping, 243 } 244 expand = set(expand) | { 245 name for name, snapshot in snapshots.items() if snapshot.is_embedded 246 } 247 248 if expand: 249 model_mapping = { 250 name: snapshot.model 251 for name, snapshot in snapshots.items() 252 if snapshot.is_model 253 } 254 255 def _expand(node: exp.Expression) -> exp.Expression: 256 if isinstance(node, exp.Table) and snapshots: 257 name = exp.table_name(node, identify=True) 258 model = model_mapping.get(name) 259 if ( 260 name in expand 261 and model 262 and not model.is_seed 263 and not model.kind.is_external 264 ): 265 nested_query = model.render_query( 266 start=start, 267 end=end, 268 execution_time=execution_time, 269 snapshots=snapshots, 270 table_mapping=table_mapping, 271 expand=expand, 272 deployability_index=deployability_index, 273 **kwargs, 274 ) 275 if nested_query is not None: 276 return nested_query.subquery( 277 alias=node.alias or model.view_name, 278 copy=False, 279 ) 280 else: 281 logger.warning("Failed to expand the nested model '%s'", name) 282 return node 283 284 expression = expression.transform(_expand, copy=False) # type: ignore 285 286 if mapping: 287 expression = exp.replace_tables( 288 expression, mapping, dialect=self._dialect, copy=False 289 ) 290 291 return expression 292 293 @contextmanager 294 def _normalize_and_quote(self, query: E) -> t.Iterator[E]: 295 if self._normalize_identifiers: 296 with d.normalize_and_quote( 297 query, self._dialect, self._default_catalog, quote=self._quote_identifiers 298 ) as query: 299 yield query 300 else: 301 yield query 302 303 def _should_cache(self, runtime_stage: RuntimeStage, *args: t.Any) -> bool: 304 return runtime_stage == RuntimeStage.LOADING and not any(args) 305 306 def _to_table_mapping( 307 self, snapshots: t.Iterable[Snapshot], deployability_index: t.Optional[DeployabilityIndex] 308 ) -> t.Dict[str, str]: 309 from sqlmesh.core.snapshot import to_table_mapping 310 311 return to_table_mapping(snapshots, deployability_index)
BaseExpressionRenderer( expression: sqlglot.expressions.Expression, dialect: str, macro_definitions: List[sqlmesh.core.dialect.MacroDef], path: pathlib.Path = PosixPath('.'), jinja_macro_registry: Union[sqlmesh.utils.jinja.JinjaMacroRegistry, NoneType] = None, python_env: Union[Dict[str, sqlmesh.utils.metaprogramming.Executable], NoneType] = None, only_execution_time: bool = False, schema: Union[Dict[str, Any], NoneType] = None, default_catalog: Union[str, NoneType] = None, quote_identifiers: bool = True, model_fqn: Union[str, NoneType] = None, normalize_identifiers: bool = True)
39 def __init__( 40 self, 41 expression: exp.Expression, 42 dialect: str, 43 macro_definitions: t.List[d.MacroDef], 44 path: Path = Path(), 45 jinja_macro_registry: t.Optional[JinjaMacroRegistry] = None, 46 python_env: t.Optional[t.Dict[str, Executable]] = None, 47 only_execution_time: bool = False, 48 schema: t.Optional[t.Dict[str, t.Any]] = None, 49 default_catalog: t.Optional[str] = None, 50 quote_identifiers: bool = True, 51 model_fqn: t.Optional[str] = None, 52 normalize_identifiers: bool = True, 53 ): 54 self._expression = expression 55 self._dialect = dialect 56 self._macro_definitions = macro_definitions 57 self._path = path 58 self._jinja_macro_registry = jinja_macro_registry or JinjaMacroRegistry() 59 self._python_env = python_env or {} 60 self._only_execution_time = only_execution_time 61 self._default_catalog = default_catalog 62 self._normalize_identifiers = normalize_identifiers 63 self._quote_identifiers = quote_identifiers 64 self.update_schema({} if schema is None else schema) 65 self._cache: t.List[t.Optional[exp.Expression]] = [] 66 self._model_fqn = model_fqn
314class ExpressionRenderer(BaseExpressionRenderer): 315 def render( 316 self, 317 start: t.Optional[TimeLike] = None, 318 end: t.Optional[TimeLike] = None, 319 execution_time: t.Optional[TimeLike] = None, 320 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 321 table_mapping: t.Optional[t.Dict[str, str]] = None, 322 deployability_index: t.Optional[DeployabilityIndex] = None, 323 expand: t.Iterable[str] = tuple(), 324 **kwargs: t.Any, 325 ) -> t.Optional[t.List[exp.Expression]]: 326 try: 327 expressions = super()._render( 328 start=start, 329 end=end, 330 execution_time=execution_time, 331 snapshots=snapshots, 332 deployability_index=deployability_index, 333 **kwargs, 334 ) 335 except ParsetimeAdapterCallError: 336 return None 337 338 return [ 339 self._resolve_tables( 340 e, 341 snapshots=snapshots, 342 table_mapping=table_mapping, 343 expand=expand, 344 deployability_index=deployability_index, 345 start=start, 346 end=end, 347 execution_time=execution_time, 348 **kwargs, 349 ) 350 for e in expressions 351 if e 352 ]
def
render( self, start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, snapshots: Union[Dict[str, sqlmesh.core.snapshot.definition.Snapshot], NoneType] = None, table_mapping: Union[Dict[str, str], NoneType] = None, deployability_index: Union[sqlmesh.core.snapshot.definition.DeployabilityIndex, NoneType] = None, expand: Iterable[str] = (), **kwargs: Any) -> Union[List[sqlglot.expressions.Expression], NoneType]:
315 def render( 316 self, 317 start: t.Optional[TimeLike] = None, 318 end: t.Optional[TimeLike] = None, 319 execution_time: t.Optional[TimeLike] = None, 320 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 321 table_mapping: t.Optional[t.Dict[str, str]] = None, 322 deployability_index: t.Optional[DeployabilityIndex] = None, 323 expand: t.Iterable[str] = tuple(), 324 **kwargs: t.Any, 325 ) -> t.Optional[t.List[exp.Expression]]: 326 try: 327 expressions = super()._render( 328 start=start, 329 end=end, 330 execution_time=execution_time, 331 snapshots=snapshots, 332 deployability_index=deployability_index, 333 **kwargs, 334 ) 335 except ParsetimeAdapterCallError: 336 return None 337 338 return [ 339 self._resolve_tables( 340 e, 341 snapshots=snapshots, 342 table_mapping=table_mapping, 343 expand=expand, 344 deployability_index=deployability_index, 345 start=start, 346 end=end, 347 execution_time=execution_time, 348 **kwargs, 349 ) 350 for e in expressions 351 if e 352 ]
Inherited Members
355class QueryRenderer(BaseExpressionRenderer): 356 def __init__(self, *args: t.Any, **kwargs: t.Any): 357 super().__init__(*args, **kwargs) 358 self._optimized_cache: t.Optional[exp.Query] = None 359 360 def update_schema(self, schema: t.Dict[str, t.Any]) -> None: 361 super().update_schema(schema) 362 self._optimized_cache = None 363 364 def render( 365 self, 366 start: t.Optional[TimeLike] = None, 367 end: t.Optional[TimeLike] = None, 368 execution_time: t.Optional[TimeLike] = None, 369 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 370 table_mapping: t.Optional[t.Dict[str, str]] = None, 371 deployability_index: t.Optional[DeployabilityIndex] = None, 372 expand: t.Iterable[str] = tuple(), 373 optimize: bool = True, 374 runtime_stage: RuntimeStage = RuntimeStage.LOADING, 375 **kwargs: t.Any, 376 ) -> t.Optional[exp.Query]: 377 """Renders a query, expanding macros with provided kwargs, and optionally expanding referenced models. 378 379 Args: 380 query: The query to render. 381 start: The start datetime to render. Defaults to epoch start. 382 end: The end datetime to render. Defaults to epoch start. 383 execution_time: The date/time time reference to use for execution time. Defaults to epoch start. 384 snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations. 385 table_mapping: Table mapping of physical locations. Takes precedence over snapshot mappings. 386 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 387 expand: Expand referenced models as subqueries. This is used to bypass backfills when running queries 388 that depend on materialized tables. Model definitions are inlined and can thus be run end to 389 end on the fly. 390 optimize: Whether to optimize the query. 391 runtime_stage: Indicates the current runtime stage, for example if we're still loading the project, etc. 392 kwargs: Additional kwargs to pass to the renderer. 393 394 Returns: 395 The rendered expression. 396 """ 397 398 should_cache = self._should_cache( 399 runtime_stage, start, end, execution_time, *kwargs.values() 400 ) 401 402 if should_cache and self._optimized_cache and optimize: 403 query = self._optimized_cache 404 else: 405 try: 406 expressions = super()._render( 407 start=start, 408 end=end, 409 execution_time=execution_time, 410 snapshots=snapshots, 411 table_mapping=table_mapping, 412 deployability_index=deployability_index, 413 runtime_stage=runtime_stage, 414 **kwargs, 415 ) 416 except ParsetimeAdapterCallError: 417 return None 418 419 if not expressions: 420 raise ConfigError(f"Failed to render query at '{self._path}':\n{self._expression}") 421 422 if len(expressions) > 1: 423 raise ConfigError(f"Too many statements in query:\n{self._expression}") 424 425 query = expressions[0] # type: ignore 426 427 if not query: 428 return None 429 if not isinstance(query, exp.Query): 430 raise_config_error(f"Query needs to be a SELECT or a UNION {query}.", self._path) 431 raise 432 433 if optimize: 434 deps = d.find_tables( 435 query, default_catalog=self._default_catalog, dialect=self._dialect 436 ) 437 438 query = self._optimize_query(query, deps) 439 440 if should_cache: 441 self._optimized_cache = query 442 443 if optimize: 444 query = self._resolve_tables( 445 query, 446 snapshots=snapshots, 447 table_mapping=table_mapping, 448 expand=expand, 449 deployability_index=deployability_index, 450 start=start, 451 end=end, 452 execution_time=execution_time, 453 runtime_stage=runtime_stage, 454 **kwargs, 455 ) 456 457 return query 458 459 def update_cache(self, expression: t.Optional[exp.Expression], optimized: bool = False) -> None: 460 if optimized: 461 if not isinstance(expression, exp.Query): 462 raise SQLMeshError(f"Expected a Query but got: {expression}") 463 self._optimized_cache = expression 464 else: 465 super().update_cache(expression) 466 467 def _optimize_query(self, query: exp.Query, all_deps: t.Set[str]) -> exp.Query: 468 # We don't want to normalize names in the schema because that's handled by the optimizer 469 original = query 470 missing_deps = set() 471 all_deps = all_deps - {self._model_fqn} 472 should_optimize = not self.schema.empty or not all_deps 473 474 for dep in all_deps: 475 if not self.schema.find(exp.to_table(dep)): 476 should_optimize = False 477 missing_deps.add(dep) 478 479 if self._model_fqn and not should_optimize and any(s.is_star for s in query.selects): 480 deps = ", ".join(f"'{dep}'" for dep in sorted(missing_deps)) 481 482 logger.warning( 483 f"SELECT * cannot be expanded due to missing schema(s) for model(s): {deps}. " 484 "Run `sqlmesh create_external_models` and / or make sure that the model " 485 f"'{self._model_fqn}' can be rendered at parse time.", 486 ) 487 488 try: 489 if should_optimize: 490 query = query.copy() 491 simplify( 492 annotate_types( 493 qualify( 494 query, 495 dialect=self._dialect, 496 schema=self.schema, 497 infer_schema=False, 498 catalog=self._default_catalog, 499 quote_identifiers=self._quote_identifiers, 500 ), 501 schema=self.schema, 502 ) 503 ) 504 except SqlglotError as ex: 505 query = original 506 507 logger.warning( 508 "%s for model '%s', the column may not exist or is ambiguous", ex, self._model_fqn 509 ) 510 511 if not query.type: 512 for select in query.expressions: 513 annotate_types(select) 514 515 return query
def
render( self, start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, snapshots: Union[Dict[str, sqlmesh.core.snapshot.definition.Snapshot], NoneType] = None, table_mapping: Union[Dict[str, str], NoneType] = None, deployability_index: Union[sqlmesh.core.snapshot.definition.DeployabilityIndex, NoneType] = None, expand: Iterable[str] = (), optimize: bool = True, runtime_stage: sqlmesh.core.macros.RuntimeStage = <RuntimeStage.LOADING: 'loading'>, **kwargs: Any) -> Union[sqlglot.expressions.Query, NoneType]:
364 def render( 365 self, 366 start: t.Optional[TimeLike] = None, 367 end: t.Optional[TimeLike] = None, 368 execution_time: t.Optional[TimeLike] = None, 369 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 370 table_mapping: t.Optional[t.Dict[str, str]] = None, 371 deployability_index: t.Optional[DeployabilityIndex] = None, 372 expand: t.Iterable[str] = tuple(), 373 optimize: bool = True, 374 runtime_stage: RuntimeStage = RuntimeStage.LOADING, 375 **kwargs: t.Any, 376 ) -> t.Optional[exp.Query]: 377 """Renders a query, expanding macros with provided kwargs, and optionally expanding referenced models. 378 379 Args: 380 query: The query to render. 381 start: The start datetime to render. Defaults to epoch start. 382 end: The end datetime to render. Defaults to epoch start. 383 execution_time: The date/time time reference to use for execution time. Defaults to epoch start. 384 snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations. 385 table_mapping: Table mapping of physical locations. Takes precedence over snapshot mappings. 386 deployability_index: Determines snapshots that are deployable in the context of this evaluation. 387 expand: Expand referenced models as subqueries. This is used to bypass backfills when running queries 388 that depend on materialized tables. Model definitions are inlined and can thus be run end to 389 end on the fly. 390 optimize: Whether to optimize the query. 391 runtime_stage: Indicates the current runtime stage, for example if we're still loading the project, etc. 392 kwargs: Additional kwargs to pass to the renderer. 393 394 Returns: 395 The rendered expression. 396 """ 397 398 should_cache = self._should_cache( 399 runtime_stage, start, end, execution_time, *kwargs.values() 400 ) 401 402 if should_cache and self._optimized_cache and optimize: 403 query = self._optimized_cache 404 else: 405 try: 406 expressions = super()._render( 407 start=start, 408 end=end, 409 execution_time=execution_time, 410 snapshots=snapshots, 411 table_mapping=table_mapping, 412 deployability_index=deployability_index, 413 runtime_stage=runtime_stage, 414 **kwargs, 415 ) 416 except ParsetimeAdapterCallError: 417 return None 418 419 if not expressions: 420 raise ConfigError(f"Failed to render query at '{self._path}':\n{self._expression}") 421 422 if len(expressions) > 1: 423 raise ConfigError(f"Too many statements in query:\n{self._expression}") 424 425 query = expressions[0] # type: ignore 426 427 if not query: 428 return None 429 if not isinstance(query, exp.Query): 430 raise_config_error(f"Query needs to be a SELECT or a UNION {query}.", self._path) 431 raise 432 433 if optimize: 434 deps = d.find_tables( 435 query, default_catalog=self._default_catalog, dialect=self._dialect 436 ) 437 438 query = self._optimize_query(query, deps) 439 440 if should_cache: 441 self._optimized_cache = query 442 443 if optimize: 444 query = self._resolve_tables( 445 query, 446 snapshots=snapshots, 447 table_mapping=table_mapping, 448 expand=expand, 449 deployability_index=deployability_index, 450 start=start, 451 end=end, 452 execution_time=execution_time, 453 runtime_stage=runtime_stage, 454 **kwargs, 455 ) 456 457 return query
Renders a query, expanding macros with provided kwargs, and optionally expanding referenced models.
Arguments:
- query: The query to render.
- start: The start datetime to render. Defaults to epoch start.
- end: The end datetime to render. Defaults to epoch start.
- execution_time: The date/time time reference to use for execution time. Defaults to epoch start.
- snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations.
- table_mapping: Table mapping of physical locations. Takes precedence over snapshot mappings.
- deployability_index: Determines snapshots that are deployable in the context of this evaluation.
- expand: Expand referenced models as subqueries. This is used to bypass backfills when running queries that depend on materialized tables. Model definitions are inlined and can thus be run end to end on the fly.
- optimize: Whether to optimize the query.
- runtime_stage: Indicates the current runtime stage, for example if we're still loading the project, etc.
- kwargs: Additional kwargs to pass to the renderer.
Returns:
The rendered expression.
def
update_cache( self, expression: Union[sqlglot.expressions.Expression, NoneType], optimized: bool = False) -> None:
459 def update_cache(self, expression: t.Optional[exp.Expression], optimized: bool = False) -> None: 460 if optimized: 461 if not isinstance(expression, exp.Query): 462 raise SQLMeshError(f"Expected a Query but got: {expression}") 463 self._optimized_cache = expression 464 else: 465 super().update_cache(expression)