sqlmesh.core.model.definition
1from __future__ import annotations 2 3import json 4import logging 5import types 6import re 7import typing as t 8from functools import cached_property, partial 9from pathlib import Path 10 11from pydantic import Field 12from sqlglot import diff, exp 13from sqlglot.diff import Insert 14from sqlglot.helper import seq_get 15from sqlglot.optimizer.qualify_columns import quote_identifiers 16from sqlglot.optimizer.simplify import gen 17from sqlglot.optimizer.normalize_identifiers import normalize_identifiers 18from sqlglot.schema import MappingSchema, nested_set 19from sqlglot.time import format_time 20 21from sqlmesh.core import constants as c 22from sqlmesh.core import dialect as d 23from sqlmesh.core.audit import Audit, ModelAudit 24from sqlmesh.core.node import IntervalUnit 25from sqlmesh.core.macros import MacroRegistry, macro 26from sqlmesh.core.model.common import ( 27 ParsableSql, 28 make_python_env, 29 parse_dependencies, 30 parse_strings_with_macro_refs, 31 single_value_or_tuple, 32 sorted_python_env_payloads, 33 validate_extra_and_required_fields, 34) 35from sqlmesh.core.model.meta import ModelMeta 36from sqlmesh.core.model.kind import ( 37 ExternalKind, 38 ModelKindName, 39 SeedKind, 40 ModelKind, 41 FullKind, 42 create_model_kind, 43 CustomKind, 44) 45from sqlmesh.core.model.seed import CsvSeedReader, Seed, create_seed 46from sqlmesh.core.renderer import ExpressionRenderer, QueryRenderer 47from sqlmesh.core.signal import SignalRegistry 48from sqlmesh.utils import columns_to_types_all_known, str_to_bool, UniqueKeyDict 49from sqlmesh.utils.cron import CroniterCache 50from sqlmesh.utils.date import TimeLike, make_inclusive, to_datetime, to_time_column 51from sqlmesh.utils.errors import ConfigError, SQLMeshError, raise_config_error, PythonModelEvalError 52from sqlmesh.utils.hashing import hash_data 53from sqlmesh.utils.jinja import JinjaMacroRegistry, extract_macro_references_and_variables 54from sqlmesh.utils.pydantic import PydanticModel, PRIVATE_FIELDS 55from sqlmesh.utils.metaprogramming import ( 56 Executable, 57 SqlValue, 58 build_env, 59 prepare_env, 60 serialize_env, 61 format_evaluated_code_exception, 62) 63 64if t.TYPE_CHECKING: 65 from sqlglot.dialects.dialect import DialectType 66 from sqlmesh.core.node import _Node 67 from sqlmesh.core._typing import Self, TableName, SessionProperties 68 from sqlmesh.core.context import ExecutionContext 69 from sqlmesh.core.engine_adapter import EngineAdapter 70 from sqlmesh.core.engine_adapter._typing import QueryOrDF 71 from sqlmesh.core.engine_adapter.shared import DataObjectType 72 from sqlmesh.core.linter.rule import Rule 73 from sqlmesh.core.snapshot import DeployabilityIndex, Node, Snapshot 74 from sqlmesh.utils.jinja import MacroReference 75 76 77logger = logging.getLogger(__name__) 78 79 80PROPERTIES = {"physical_properties", "session_properties", "virtual_properties"} 81 82RUNTIME_RENDERED_MODEL_FIELDS = { 83 "audits", 84 "signals", 85 "merge_filter", 86} | PROPERTIES 87 88CRON_SHORTCUTS = { 89 "@midnight", 90 "@hourly", 91 "@daily", 92 "@weekly", 93 "@monthly", 94 "@yearly", 95 "@annually", 96} 97 98 99class _Model(ModelMeta, frozen=True): 100 """Model is the core abstraction for user defined datasets. 101 102 A model consists of logic that fetches the data (a SQL query, a Python script or a seed) and metadata 103 associated with it. Models can be run on arbitrary cadences and support incremental or full refreshes. 104 Models can also be materialized into physical tables or shared across other models as temporary views. 105 106 Example: 107 MODEL ( 108 name sushi.order_items, 109 owner jen, 110 cron '@daily', 111 start '2020-01-01', 112 partitioned_by ds 113 ); 114 115 @DEF(var, 'my_var'); 116 117 SELECT 118 1 AS column_a # my first column, 119 @var AS my_column # my second column, 120 ; 121 122 Args: 123 name: The name of the model, which is of the form [catalog].[db].table. 124 The catalog and db are optional. 125 dialect: The SQL dialect that the model's query is written in. By default, 126 this is assumed to be the dialect of the context. 127 owner: The owner of the model. 128 cron: A cron string specifying how often the model should be refreshed, leveraging the 129 [croniter](https://github.com/kiorky/croniter) library. 130 description: The optional model description. 131 stamp: An optional arbitrary string sequence used to create new model versions without making 132 changes to any of the functional components of the definition. 133 start: The earliest date that the model will be backfilled for. If this is None, 134 then the date is inferred by taking the most recent start date of its ancestors. 135 The start date can be a static datetime or a relative datetime like "1 year ago" 136 end: The date that the model will be backfilled up until. Follows the same syntax as 'start', 137 should be omitted if there is no end date. 138 lookback: The number of previous incremental intervals in the lookback window. 139 table_format: The table format used to manage the physical table files defined by `storage_format`, only applicable in certain engines. 140 (eg, 'iceberg', 'delta', 'hudi') 141 storage_format: The storage format used to store the physical table, only applicable in certain engines. 142 (eg. 'parquet', 'orc') 143 partitioned_by: The partition columns or engine specific expressions, only applicable in certain engines. (eg. (ds, hour)) 144 clustered_by: The cluster columns or engine specific expressions, only applicable in certain engines. (eg. (ds, hour)) 145 python_env: Dictionary containing all global variables needed to render the model's macros. 146 mapping_schema: The schema of table names to column and types. 147 extract_dependencies_from_query: Whether to extract additional dependencies from the rendered model's query. 148 physical_schema_override: The desired physical schema name override. 149 """ 150 151 python_env: t.Dict[str, Executable] = {} 152 jinja_macros: JinjaMacroRegistry = JinjaMacroRegistry() 153 audit_definitions: t.Dict[str, ModelAudit] = {} 154 mapping_schema: t.Dict[str, t.Any] = {} 155 extract_dependencies_from_query: bool = True 156 pre_statements_: t.Optional[t.List[ParsableSql]] = Field(default=None, alias="pre_statements") 157 post_statements_: t.Optional[t.List[ParsableSql]] = Field(default=None, alias="post_statements") 158 on_virtual_update_: t.Optional[t.List[ParsableSql]] = Field( 159 default=None, alias="on_virtual_update" 160 ) 161 162 _full_depends_on: t.Optional[t.Set[str]] = None 163 _statement_renderer_cache: t.Dict[int, ExpressionRenderer] = {} 164 _is_metadata_only_change_cache: t.Dict[int, bool] = {} 165 166 _expressions_validator = ParsableSql.validator() 167 168 def __getstate__(self) -> t.Dict[t.Any, t.Any]: 169 state = super().__getstate__() 170 private = state[PRIVATE_FIELDS] 171 private["_statement_renderer_cache"] = {} 172 return state 173 174 def copy(self, **kwargs: t.Any) -> Self: 175 model = super().copy(**kwargs) 176 model._statement_renderer_cache = {} 177 return model 178 179 def render( 180 self, 181 *, 182 context: ExecutionContext, 183 start: t.Optional[TimeLike] = None, 184 end: t.Optional[TimeLike] = None, 185 execution_time: t.Optional[TimeLike] = None, 186 **kwargs: t.Any, 187 ) -> t.Iterator[QueryOrDF]: 188 """Renders the content of this model in a form of either a SELECT query, executing which the data for this model can 189 be fetched, or a dataframe object which contains the data itself. 190 191 The type of the returned object (query or dataframe) depends on whether the model was sourced from a SQL query, 192 a Python script or a pre-built dataset (seed). 193 194 Args: 195 context: The execution context used for fetching data. 196 start: The start date/time of the run. 197 end: The end date/time of the run. 198 execution_time: The date/time time reference to use for execution time. 199 200 Returns: 201 A generator which yields either a query object or one of the supported dataframe objects. 202 """ 203 yield self.render_query_or_raise( 204 start=start, 205 end=end, 206 execution_time=execution_time, 207 snapshots=context.snapshots, 208 deployability_index=context.deployability_index, 209 engine_adapter=context.engine_adapter, 210 **kwargs, 211 ) 212 213 def render_definition( 214 self, 215 include_python: bool = True, 216 include_defaults: bool = False, 217 render_query: bool = False, 218 ) -> t.List[exp.Expr]: 219 """Returns the original list of sql expressions comprising the model definition. 220 221 Args: 222 include_python: Whether or not to include Python code in the rendered definition. 223 """ 224 expressions = [] 225 comment = None 226 for field_name, field_info in ModelMeta.all_field_infos().items(): 227 field_value = getattr(self, field_name) 228 229 if (include_defaults and field_value) or field_value != field_info.default: 230 if field_name == "description": 231 comment = field_value 232 elif field_name == "kind": 233 expressions.append( 234 exp.Property( 235 this="kind", 236 value=field_value.to_expression(dialect=self.dialect), 237 ) 238 ) 239 elif field_name == "name": 240 expressions.append( 241 exp.Property( 242 this=field_name, 243 value=exp.to_table(field_value, dialect=self.dialect), 244 ) 245 ) 246 elif field_name not in ("default_catalog", "enabled", "ignored_rules_"): 247 expressions.append( 248 exp.Property( 249 this=field_info.alias or field_name, 250 value=META_FIELD_CONVERTER.get(field_name, exp.to_identifier)( 251 field_value 252 ), 253 ) 254 ) 255 256 model = d.Model(expressions=expressions) 257 model.comments = [comment] if comment else None 258 259 jinja_expressions = [] 260 python_expressions = [] 261 if include_python: 262 python_env = d.PythonCode(expressions=sorted_python_env_payloads(self.python_env)) 263 if python_env.expressions: 264 python_expressions.append(python_env) 265 266 jinja_expressions = self.jinja_macros.to_expressions() 267 268 return [ 269 model, 270 *python_expressions, 271 *jinja_expressions, 272 ] 273 274 def render_query( 275 self, 276 *, 277 start: t.Optional[TimeLike] = None, 278 end: t.Optional[TimeLike] = None, 279 execution_time: t.Optional[TimeLike] = None, 280 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 281 table_mapping: t.Optional[t.Dict[str, str]] = None, 282 expand: t.Iterable[str] = tuple(), 283 deployability_index: t.Optional[DeployabilityIndex] = None, 284 engine_adapter: t.Optional[EngineAdapter] = None, 285 **kwargs: t.Any, 286 ) -> t.Optional[exp.Query]: 287 """Renders a model's query, expanding macros with provided kwargs, and optionally expanding referenced models. 288 289 Args: 290 start: The start datetime to render. Defaults to epoch start. 291 end: The end datetime to render. Defaults to epoch start. 292 execution_time: The date/time time reference to use for execution time. 293 snapshots: All upstream snapshots (by name) to use for expansion and mapping of physical locations. 294 table_mapping: Table mapping of physical locations. Takes precedence over snapshot mappings. 295 expand: Expand referenced models as subqueries. This is used to bypass backfills when running queries 296 that depend on materialized tables. Model definitions are inlined and can thus be run end to 297 end on the fly. 298 deployability_index: Determines snapshots that are deployable in the context of this render. 299 kwargs: Additional kwargs to pass to the renderer. 300 301 Returns: 302 The rendered expression. 303 """ 304 return exp.select( 305 *( 306 exp.cast(exp.Null(), column_type, copy=False).as_(name, copy=False, quoted=True) 307 for name, column_type in (self.columns_to_types or {}).items() 308 ), 309 copy=False, 310 ).from_(exp.values([tuple([1])], alias="t", columns=["dummy"]), copy=False) 311 312 def render_query_or_raise( 313 self, 314 *, 315 start: t.Optional[TimeLike] = None, 316 end: t.Optional[TimeLike] = None, 317 execution_time: t.Optional[TimeLike] = None, 318 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 319 table_mapping: t.Optional[t.Dict[str, str]] = None, 320 expand: t.Iterable[str] = tuple(), 321 deployability_index: t.Optional[DeployabilityIndex] = None, 322 engine_adapter: t.Optional[EngineAdapter] = None, 323 **kwargs: t.Any, 324 ) -> exp.Query: 325 """Same as `render_query()` but raises an exception if the query can't be rendered. 326 327 Args: 328 start: The start datetime to render. Defaults to epoch start. 329 end: The end datetime to render. Defaults to epoch start. 330 execution_time: The date/time time reference to use for execution time. 331 snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations. 332 table_mapping: Table mapping of physical locations. Takes precedence over snapshot mappings. 333 expand: Expand referenced models as subqueries. This is used to bypass backfills when running queries 334 that depend on materialized tables. Model definitions are inlined and can thus be run end to 335 end on the fly. 336 deployability_index: Determines snapshots that are deployable in the context of this render. 337 kwargs: Additional kwargs to pass to the renderer. 338 339 Returns: 340 The rendered expression. 341 """ 342 query = self.render_query( 343 start=start, 344 end=end, 345 execution_time=execution_time, 346 snapshots=snapshots, 347 table_mapping=table_mapping, 348 expand=expand, 349 deployability_index=deployability_index, 350 engine_adapter=engine_adapter, 351 **kwargs, 352 ) 353 if query is None: 354 raise SQLMeshError(f"Failed to render query for model '{self.name}'.") 355 return query 356 357 def render_pre_statements( 358 self, 359 *, 360 start: t.Optional[TimeLike] = None, 361 end: t.Optional[TimeLike] = None, 362 execution_time: t.Optional[TimeLike] = None, 363 snapshots: t.Optional[t.Collection[Snapshot]] = None, 364 expand: t.Iterable[str] = tuple(), 365 deployability_index: t.Optional[DeployabilityIndex] = None, 366 engine_adapter: t.Optional[EngineAdapter] = None, 367 inside_transaction: t.Optional[bool] = True, 368 **kwargs: t.Any, 369 ) -> t.List[exp.Expr]: 370 """Renders pre-statements for a model. 371 372 Pre-statements are statements that preceded the model's SELECT query. 373 374 Args: 375 start: The start datetime to render. Defaults to epoch start. 376 end: The end datetime to render. Defaults to epoch start. 377 execution_time: The date/time time reference to use for execution time. 378 snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations. 379 expand: Expand referenced models as subqueries. This is used to bypass backfills when running queries 380 that depend on materialized tables. Model definitions are inlined and can thus be run end to 381 end on the fly. 382 deployability_index: Determines snapshots that are deployable in the context of this render. 383 kwargs: Additional kwargs to pass to the renderer. 384 385 Returns: 386 The list of rendered expressions. 387 """ 388 return self._render_statements( 389 [ 390 stmt 391 for stmt in self.pre_statements 392 if stmt.args.get("transaction", True) == inside_transaction 393 ], 394 start=start, 395 end=end, 396 execution_time=execution_time, 397 snapshots=snapshots, 398 expand=expand, 399 deployability_index=deployability_index, 400 engine_adapter=engine_adapter, 401 **kwargs, 402 ) 403 404 def render_post_statements( 405 self, 406 *, 407 start: t.Optional[TimeLike] = None, 408 end: t.Optional[TimeLike] = None, 409 execution_time: t.Optional[TimeLike] = None, 410 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 411 expand: t.Iterable[str] = tuple(), 412 deployability_index: t.Optional[DeployabilityIndex] = None, 413 engine_adapter: t.Optional[EngineAdapter] = None, 414 inside_transaction: t.Optional[bool] = True, 415 **kwargs: t.Any, 416 ) -> t.List[exp.Expr]: 417 """Renders post-statements for a model. 418 419 Post-statements are statements that follow after the model's SELECT query. 420 421 Args: 422 start: The start datetime to render. Defaults to epoch start. 423 end: The end datetime to render. Defaults to epoch start. 424 execution_time: The date/time time reference to use for execution time. 425 snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations. 426 expand: Expand referenced models as subqueries. This is used to bypass backfills when running queries 427 that depend on materialized tables. Model definitions are inlined and can thus be run end to 428 end on the fly. 429 deployability_index: Determines snapshots that are deployable in the context of this render. 430 inside_transaction: Whether to render hooks with transaction=True (inside) or transaction=False (outside). 431 kwargs: Additional kwargs to pass to the renderer. 432 433 Returns: 434 The list of rendered expressions. 435 """ 436 return self._render_statements( 437 [ 438 stmt 439 for stmt in self.post_statements 440 if stmt.args.get("transaction", True) == inside_transaction 441 ], 442 start=start, 443 end=end, 444 execution_time=execution_time, 445 snapshots=snapshots, 446 expand=expand, 447 deployability_index=deployability_index, 448 engine_adapter=engine_adapter, 449 **kwargs, 450 ) 451 452 def render_on_virtual_update( 453 self, 454 *, 455 start: t.Optional[TimeLike] = None, 456 end: t.Optional[TimeLike] = None, 457 execution_time: t.Optional[TimeLike] = None, 458 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 459 expand: t.Iterable[str] = tuple(), 460 deployability_index: t.Optional[DeployabilityIndex] = None, 461 engine_adapter: t.Optional[EngineAdapter] = None, 462 **kwargs: t.Any, 463 ) -> t.List[exp.Expr]: 464 return self._render_statements( 465 self.on_virtual_update, 466 start=start, 467 end=end, 468 execution_time=execution_time, 469 snapshots=snapshots, 470 expand=expand, 471 deployability_index=deployability_index, 472 engine_adapter=engine_adapter, 473 **kwargs, 474 ) 475 476 def render_audit_query( 477 self, 478 audit: Audit, 479 *, 480 start: t.Optional[TimeLike] = None, 481 end: t.Optional[TimeLike] = None, 482 execution_time: t.Optional[TimeLike] = None, 483 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 484 deployability_index: t.Optional[DeployabilityIndex] = None, 485 **kwargs: t.Any, 486 ) -> exp.Query: 487 from sqlmesh.core.snapshot import DeployabilityIndex 488 489 deployability_index = deployability_index or DeployabilityIndex.all_deployable() 490 snapshot = (snapshots or {}).get(self.fqn) 491 492 this_model = kwargs.pop("this_model", None) or ( 493 snapshot.table_name(deployability_index.is_deployable(snapshot)) 494 if snapshot 495 else self.fqn 496 ) 497 498 columns_to_types: t.Optional[t.Dict[str, t.Any]] = None 499 if "engine_adapter" in kwargs: 500 try: 501 columns_to_types = kwargs["engine_adapter"].columns(this_model) 502 except Exception: 503 pass 504 505 if self.time_column: 506 low, high = [ 507 self.convert_to_time_column(dt, columns_to_types) 508 for dt in make_inclusive(start or c.EPOCH, end or c.EPOCH, self.dialect) 509 ] 510 where = self.time_column.column.between(low, high) 511 else: 512 where = None 513 514 # The model's name is already normalized, but in case of snapshots we also prepend a 515 # case-sensitive physical schema name, so we quote here to ensure that we won't have 516 # a broken schema reference after the resulting query is normalized in `render`. 517 quoted_model_name = quote_identifiers( 518 exp.to_table(this_model, dialect=self.dialect), dialect=self.dialect 519 ) 520 521 query_renderer = QueryRenderer( 522 audit.query, 523 audit.dialect or self.dialect, 524 audit.macro_definitions, 525 path=audit._path or Path(), 526 jinja_macro_registry=audit.jinja_macros, 527 python_env=self.python_env, 528 only_execution_time=self.kind.only_execution_time, 529 default_catalog=self.default_catalog, 530 ) 531 532 rendered_query = query_renderer.render( 533 start=start, 534 end=end, 535 execution_time=execution_time, 536 snapshots=snapshots, 537 deployability_index=deployability_index, 538 **{ 539 **audit.defaults, 540 "this_model": exp.select("*").from_(quoted_model_name).where(where).subquery() 541 if where is not None 542 else quoted_model_name, 543 **kwargs, 544 }, # type: ignore 545 ) 546 547 if rendered_query is None: 548 raise SQLMeshError( 549 f"Failed to render query for audit '{audit.name}', model '{self.name}'." 550 ) 551 552 return rendered_query 553 554 @property 555 def pre_statements(self) -> t.List[exp.Expr]: 556 return self._get_parsed_statements("pre_statements_") 557 558 @property 559 def post_statements(self) -> t.List[exp.Expr]: 560 return self._get_parsed_statements("post_statements_") 561 562 @property 563 def on_virtual_update(self) -> t.List[exp.Expr]: 564 return self._get_parsed_statements("on_virtual_update_") 565 566 @property 567 def macro_definitions(self) -> t.List[d.MacroDef]: 568 """All macro definitions from the list of expressions.""" 569 return [ 570 s 571 for s in self.pre_statements + self.post_statements + self.on_virtual_update 572 if isinstance(s, d.MacroDef) 573 ] 574 575 def _get_parsed_statements(self, attr_name: str) -> t.List[exp.Expr]: 576 value = getattr(self, attr_name) 577 if not value: 578 return [] 579 result = [] 580 for v in value: 581 parsed = v.parse(self.dialect) 582 if getattr(v, "transaction", None) is not None: 583 parsed.set("transaction", v.transaction) 584 if not isinstance(parsed, exp.Semicolon): 585 result.append(parsed) 586 return result 587 588 def _render_statements( 589 self, 590 statements: t.Iterable[exp.Expr], 591 **kwargs: t.Any, 592 ) -> t.List[exp.Expr]: 593 rendered = ( 594 self._statement_renderer(statement).render(**kwargs) 595 for statement in statements 596 if not isinstance(statement, d.MacroDef) 597 ) 598 return [r for expressions in rendered if expressions for r in expressions] 599 600 def _statement_renderer(self, expression: exp.Expr) -> ExpressionRenderer: 601 expression_key = id(expression) 602 if expression_key not in self._statement_renderer_cache: 603 self._statement_renderer_cache[expression_key] = ExpressionRenderer( 604 expression, 605 self.dialect, 606 self.macro_definitions, 607 path=self._path, 608 jinja_macro_registry=self.jinja_macros, 609 python_env=self.python_env, 610 only_execution_time=False, 611 default_catalog=self.default_catalog, 612 model=self, 613 ) 614 return self._statement_renderer_cache[expression_key] 615 616 def render_signals( 617 self, 618 *, 619 start: t.Optional[TimeLike] = None, 620 end: t.Optional[TimeLike] = None, 621 execution_time: t.Optional[TimeLike] = None, 622 ) -> t.List[t.Dict[str, str | int | float | bool]]: 623 """Renders external; signals defined for this model. 624 625 Args: 626 start: The start datetime to render. Defaults to epoch start. 627 end: The end datetime to render. Defaults to epoch start. 628 execution_time: The date/time time reference to use for execution time. 629 630 Returns: 631 The list of rendered expressions. 632 """ 633 634 def _render(e: exp.Expr) -> str | int | float | bool: 635 rendered_exprs = ( 636 self._create_renderer(e).render(start=start, end=end, execution_time=execution_time) 637 or [] 638 ) 639 if len(rendered_exprs) != 1: 640 raise SQLMeshError(f"Expected one expression but got {len(rendered_exprs)}") 641 642 rendered = rendered_exprs[0] 643 if rendered.is_int: 644 return int(rendered.this) 645 if rendered.is_number: 646 return float(rendered.this) 647 if isinstance(rendered, (exp.Literal, exp.Boolean)): 648 return rendered.this 649 return rendered.sql(dialect=self.dialect) 650 651 # airflow only 652 return [ 653 {k: _render(v) for k, v in signal.items()} for name, signal in self.signals if not name 654 ] 655 656 def render_signal_calls(self) -> EvaluatableSignals: 657 python_env = self.python_env 658 env = prepare_env(python_env) 659 signals_to_kwargs = { 660 name: { 661 k: seq_get(self._create_renderer(v).render() or [], 0) for k, v in kwargs.items() 662 } 663 for name, kwargs in self.signals 664 if name 665 } 666 667 return EvaluatableSignals( 668 signals_to_kwargs=signals_to_kwargs, 669 python_env=python_env, 670 prepared_python_env=env, 671 ) 672 673 def render_merge_filter( 674 self, 675 *, 676 start: t.Optional[TimeLike] = None, 677 end: t.Optional[TimeLike] = None, 678 execution_time: t.Optional[TimeLike] = None, 679 ) -> t.Optional[exp.Expr]: 680 if self.merge_filter is None: 681 return None 682 rendered_exprs = ( 683 self._create_renderer(self.merge_filter).render( 684 start=start, end=end, execution_time=execution_time 685 ) 686 or [] 687 ) 688 if len(rendered_exprs) != 1: 689 raise SQLMeshError(f"Expected one expression but got {len(rendered_exprs)}") 690 return rendered_exprs[0].transform(d.replace_merge_table_aliases, dialect=self.dialect) 691 692 def _render_properties( 693 self, properties: t.Dict[str, exp.Expr] | SessionProperties, **render_kwargs: t.Any 694 ) -> t.Dict[str, t.Any]: 695 def _render(expression: exp.Expr) -> exp.Expr | None: 696 # note: we use the _statement_renderer instead of _create_renderer because it sets model_fqn which 697 # in turn makes @this_model available in the evaluation context 698 rendered_exprs = self._statement_renderer(expression).render(**render_kwargs) 699 700 # Inform instead of raising for cases where a property is conditionally assigned 701 if not rendered_exprs or rendered_exprs[0].sql().lower() in {"none", "null"}: 702 logger.info( 703 f"Rendering '{expression.sql(dialect=self.dialect)}' did not return an expression" 704 ) 705 return None 706 707 if len(rendered_exprs) != 1: 708 raise SQLMeshError( 709 f"Expected one result when rendering '{expression.sql(dialect=self.dialect)}' but got {len(rendered_exprs)}" 710 ) 711 712 return rendered_exprs[0] 713 714 return { 715 k: rendered 716 for k, v in properties.items() 717 if (rendered := (_render(v) if isinstance(v, exp.Expr) else v)) 718 } 719 720 def render_physical_properties(self, **render_kwargs: t.Any) -> t.Dict[str, t.Any]: 721 return self._render_properties(properties=self.physical_properties, **render_kwargs) 722 723 def render_virtual_properties(self, **render_kwargs: t.Any) -> t.Dict[str, t.Any]: 724 return self._render_properties(properties=self.virtual_properties, **render_kwargs) 725 726 def render_session_properties(self, **render_kwargs: t.Any) -> t.Dict[str, t.Any]: 727 return self._render_properties(properties=self.session_properties, **render_kwargs) 728 729 def _create_renderer(self, expression: exp.Expr) -> ExpressionRenderer: 730 return ExpressionRenderer( 731 expression, 732 self.dialect, 733 [], 734 path=self._path, 735 jinja_macro_registry=self.jinja_macros, 736 python_env=self.python_env, 737 only_execution_time=False, 738 quote_identifiers=False, 739 ) 740 741 def ctas_query(self, **render_kwarg: t.Any) -> exp.Query: 742 """Return a dummy query to do a CTAS. 743 744 If a model's column types are unknown, the only way to create the table is to 745 run the fully expanded query. This can be expensive so we add a WHERE FALSE to all 746 SELECTS and hopefully the optimizer is smart enough to not do anything. 747 748 Args: 749 render_kwarg: Additional kwargs to pass to the renderer. 750 Return: 751 The mocked out ctas query. 752 """ 753 query = self.render_query_or_raise(**render_kwarg).limit(0) 754 755 for select_or_set_op in query.find_all(exp.Select, exp.SetOperation): 756 if isinstance(select_or_set_op, exp.Select) and select_or_set_op.args.get("from_"): 757 select_or_set_op.where(exp.false(), copy=False) 758 759 if self.managed_columns: 760 query.select( 761 *[ 762 exp.alias_(exp.cast(exp.Null(), to=col_type), col) 763 for col, col_type in self.managed_columns.items() 764 if col not in query.named_selects 765 ], 766 append=True, 767 copy=False, 768 ) 769 return query 770 771 def text_diff(self, other: Node, rendered: bool = False) -> str: 772 """Produce a text diff against another node. 773 774 Args: 775 other: The node to diff against. 776 rendered: Whether the diff should compare raw vs rendered models 777 778 Returns: 779 A unified text diff showing additions and deletions. 780 """ 781 if not isinstance(other, _Model): 782 raise SQLMeshError( 783 f"Cannot diff model '{self.name} against a non-model node '{other.name}'" 784 ) 785 786 text_diff = d.text_diff( 787 self.render_definition(render_query=rendered), 788 other.render_definition(render_query=rendered), 789 self.dialect, 790 other.dialect, 791 ).strip() 792 793 if not text_diff and not rendered: 794 text_diff = d.text_diff( 795 self.render_definition(render_query=True), 796 other.render_definition(render_query=True), 797 self.dialect, 798 other.dialect, 799 ).strip() 800 801 return text_diff 802 803 def set_time_format(self, default_time_format: str = c.DEFAULT_TIME_COLUMN_FORMAT) -> None: 804 """Sets the default time format for a model. 805 806 Args: 807 default_time_format: A python time format used as the default format when none is provided. 808 """ 809 if not self.time_column: 810 return 811 812 if self.time_column.format: 813 # Transpile the time column format into the generic dialect 814 formatted_time = format_time( 815 self.time_column.format, 816 d.Dialect.get_or_raise(self.dialect).TIME_MAPPING, 817 ) 818 assert formatted_time is not None 819 self.time_column.format = formatted_time 820 else: 821 self.time_column.format = default_time_format 822 823 def convert_to_time_column( 824 self, time: TimeLike, columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None 825 ) -> exp.Expr: 826 """Convert a TimeLike object to the same time format and type as the model's time column.""" 827 if self.time_column: 828 if columns_to_types is None: 829 columns_to_types = self.columns_to_types_or_raise 830 831 if self.time_column.column.name not in columns_to_types: 832 raise ConfigError( 833 f"Time column '{self.time_column.column.sql(dialect=self.dialect)}' not found in model '{self.name}'." 834 ) 835 836 time_column_type = columns_to_types[self.time_column.column.name] 837 838 return to_time_column( 839 time, 840 time_column_type, 841 self.dialect, 842 self.time_column.format, 843 ) 844 return exp.convert(time) 845 846 def set_mapping_schema(self, schema: t.Dict) -> None: 847 self.mapping_schema.clear() 848 self.mapping_schema.update(schema) 849 850 def update_schema(self, schema: MappingSchema) -> None: 851 """Updates the schema for this model's dependencies based on the given mapping schema.""" 852 for dep in self.depends_on: 853 table = exp.to_table(dep) 854 mapping_schema = schema.find(table) 855 856 if mapping_schema: 857 nested_set( 858 self.mapping_schema, 859 tuple(part.sql(copy=False) for part in table.parts), 860 {col: dtype.sql(dialect=self.dialect) for col, dtype in mapping_schema.items()}, 861 ) 862 863 @property 864 def depends_on(self) -> t.Set[str]: 865 """All of the upstream dependencies referenced in the model's query, excluding self references. 866 867 Returns: 868 A list of all the upstream table names. 869 """ 870 return self.full_depends_on - {self.fqn} 871 872 @property 873 def columns_to_types(self) -> t.Optional[t.Dict[str, exp.DataType]]: 874 """Returns the mapping of column names to types of this model.""" 875 if self.columns_to_types_ is None: 876 return None 877 return {**self.columns_to_types_, **self.managed_columns} 878 879 @property 880 def columns_to_types_or_raise(self) -> t.Dict[str, exp.DataType]: 881 """Returns the mapping of column names to types of this model or raise if not available.""" 882 columns_to_types = self.columns_to_types 883 if columns_to_types is None: 884 raise SQLMeshError(f"Column information is not available for model '{self.name}'") 885 return columns_to_types 886 887 @property 888 def annotated(self) -> bool: 889 """Checks if all column projection types of this model are known.""" 890 if self.columns_to_types is None: 891 return False 892 columns_to_types = { 893 k: v for k, v in self.columns_to_types.items() if k not in self.managed_columns 894 } 895 if not columns_to_types: 896 return False 897 return columns_to_types_all_known(columns_to_types) 898 899 @property 900 def sorted_python_env(self) -> t.List[t.Tuple[str, Executable]]: 901 """Returns the python env sorted by executable kind and then var name.""" 902 return sorted(self.python_env.items(), key=lambda x: (x[1].kind, x[0])) 903 904 @property 905 def view_name(self) -> str: 906 return self.fully_qualified_table.name 907 908 @property 909 def schema_name(self) -> str: 910 return self.fully_qualified_table.db or c.DEFAULT_SCHEMA 911 912 @property 913 def physical_schema(self) -> str: 914 return self.physical_schema_override or f"{c.SQLMESH}__{self.schema_name}" 915 916 @property 917 def is_sql(self) -> bool: 918 return False 919 920 @property 921 def is_python(self) -> bool: 922 return False 923 924 @property 925 def is_seed(self) -> bool: 926 return False 927 928 @property 929 def depends_on_self(self) -> bool: 930 return self.fqn in self.full_depends_on 931 932 @property 933 def forward_only(self) -> bool: 934 return getattr(self.kind, "forward_only", False) 935 936 @property 937 def disable_restatement(self) -> bool: 938 return getattr(self.kind, "disable_restatement", False) 939 940 @property 941 def auto_restatement_intervals(self) -> t.Optional[int]: 942 return getattr(self.kind, "auto_restatement_intervals", None) 943 944 @property 945 def auto_restatement_cron(self) -> t.Optional[str]: 946 return getattr(self.kind, "auto_restatement_cron", None) 947 948 def auto_restatement_croniter(self, value: TimeLike) -> CroniterCache: 949 cron = self.auto_restatement_cron 950 if cron is None: 951 raise SQLMeshError("Auto restatement cron is not set.") 952 return CroniterCache(cron, value) 953 954 @property 955 def wap_supported(self) -> bool: 956 return self.kind.is_materialized and (self.storage_format or "").lower() == "iceberg" 957 958 def validate_definition(self) -> None: 959 """Validates the model's definition. 960 961 Raises: 962 ConfigError 963 """ 964 965 for field in ("partitioned_by", "clustered_by"): 966 values = getattr(self, field) 967 968 if values: 969 values = [ 970 col.name 971 for expr in values 972 for col in t.cast( 973 exp.Expr, exp.maybe_parse(expr, dialect=self.dialect) 974 ).find_all(exp.Column) 975 ] 976 977 unique_keys = set(values) 978 979 if len(values) != len(unique_keys): 980 raise_config_error( 981 f"All keys in '{field}' must be unique in the model definition", 982 self._path, 983 ) 984 985 columns_to_types = self.columns_to_types 986 if columns_to_types is not None: 987 missing_keys = unique_keys - set(columns_to_types) 988 if missing_keys: 989 missing_keys_str = ", ".join(f"'{k}'" for k in sorted(missing_keys)) 990 raise_config_error( 991 f"{field} keys [{missing_keys_str}] are missing in the model definition", 992 self._path, 993 ) 994 995 if self.kind.is_incremental_by_time_range and not self.time_column: 996 raise_config_error( 997 "Incremental by time range models must have a time_column field", 998 self._path, 999 ) 1000 1001 if ( 1002 self.kind.is_incremental_unmanaged 1003 and getattr(self.kind, "insert_overwrite", False) 1004 and not self.partitioned_by_ 1005 ): 1006 raise_config_error( 1007 "Unmanaged incremental models with insert / overwrite enabled must specify the partitioned_by field", 1008 self._path, 1009 ) 1010 1011 if self.kind.is_managed: 1012 # TODO: would this sort of logic be better off moved into the Kind? 1013 if self.dialect == "snowflake" and "target_lag" not in self.physical_properties: 1014 raise_config_error( 1015 "Snowflake managed tables must specify the 'target_lag' physical property", 1016 self._path, 1017 ) 1018 1019 if self.physical_version is not None and not self.forward_only: 1020 raise_config_error( 1021 "Pinning a physical version is only supported for forward only models", 1022 self._path, 1023 ) 1024 1025 # The following attributes should be set only for SQL models 1026 if not self.is_sql: 1027 if self.optimize_query: 1028 raise_config_error( 1029 "SQLMesh query optimizer can only be enabled for SQL models", 1030 self._path, 1031 ) 1032 1033 if isinstance(self.kind, CustomKind): 1034 from sqlmesh.core.snapshot.evaluator import get_custom_materialization_type_or_raise 1035 1036 # Will raise if the custom materialization points to an invalid class 1037 get_custom_materialization_type_or_raise(self.kind.materialization) 1038 1039 # Embedded model kind shouldn't have audits 1040 if self.kind.name == ModelKindName.EMBEDDED and self.audits: 1041 raise_config_error( 1042 "Audits are not supported for embedded models", 1043 self._path, 1044 ) 1045 1046 def is_breaking_change(self, previous: Model) -> t.Optional[bool]: 1047 """Determines whether this model is a breaking change in relation to the `previous` model. 1048 1049 Args: 1050 previous: The previous model to compare against. 1051 1052 Returns: 1053 True if this model instance represents a breaking change, False if it's a non-breaking change 1054 and None if the nature of the change can't be determined. 1055 """ 1056 raise NotImplementedError 1057 1058 def is_metadata_only_change(self, other: _Node) -> bool: 1059 if self._is_metadata_only_change_cache.get(id(other), None) is not None: 1060 return self._is_metadata_only_change_cache[id(other)] 1061 1062 is_metadata_change = True 1063 if ( 1064 not isinstance(other, _Model) 1065 or self.metadata_hash == other.metadata_hash 1066 or self._data_hash_values_no_sql != other._data_hash_values_no_sql 1067 ): 1068 is_metadata_change = False 1069 else: 1070 this_statements = [ 1071 s 1072 for s in [*self.pre_statements, *self.post_statements] 1073 if not self._is_metadata_statement(s) 1074 ] 1075 other_statements = [ 1076 s 1077 for s in [*other.pre_statements, *other.post_statements] 1078 if not other._is_metadata_statement(s) 1079 ] 1080 if len(this_statements) != len(other_statements): 1081 is_metadata_change = False 1082 else: 1083 for this_statement, other_statement in zip(this_statements, other_statements): 1084 this_rendered = ( 1085 self._statement_renderer(this_statement).render() or this_statement 1086 ) 1087 other_rendered = ( 1088 other._statement_renderer(other_statement).render() or other_statement 1089 ) 1090 if this_rendered != other_rendered: 1091 is_metadata_change = False 1092 break 1093 1094 self._is_metadata_only_change_cache[id(other)] = is_metadata_change 1095 return is_metadata_change 1096 1097 @property 1098 def data_hash(self) -> str: 1099 """ 1100 Computes the data hash for the node. 1101 1102 Returns: 1103 The data hash for the node. 1104 """ 1105 if self._data_hash is None: 1106 self._data_hash = hash_data(self._data_hash_values) 1107 return self._data_hash 1108 1109 @property 1110 def _data_hash_values(self) -> t.List[str]: 1111 return self._data_hash_values_no_sql + self._data_hash_values_sql 1112 1113 @property 1114 def _data_hash_values_sql(self) -> t.List[str]: 1115 data = [] 1116 1117 for statements in [self.pre_statements_, self.post_statements_]: 1118 for statement in statements or []: 1119 data.append(statement.sql) 1120 1121 return data 1122 1123 @property 1124 def _data_hash_values_no_sql(self) -> t.List[str]: 1125 data = [ 1126 str( # Exclude metadata only macro funcs 1127 [(k, v) for k, v in self.sorted_python_env if not v.is_metadata] 1128 ), 1129 *self.kind.data_hash_values, 1130 self.table_format, 1131 self.storage_format, 1132 str(self.lookback), 1133 *(gen(expr) for expr in (self.partitioned_by or [])), 1134 *(gen(expr) for expr in (self.clustered_by or [])), 1135 self.stamp, 1136 self.physical_schema, 1137 self.physical_version, 1138 self.gateway, 1139 self.interval_unit.value if self.interval_unit is not None else None, 1140 str(self.optimize_query) if self.optimize_query is not None else None, 1141 self.virtual_environment_mode.value, 1142 ] 1143 1144 for column_name, column_type in (self.columns_to_types_ or {}).items(): 1145 data.append(column_name) 1146 data.append(column_type.sql(dialect=self.dialect)) 1147 1148 for key, value in (self.physical_properties or {}).items(): 1149 data.append(key) 1150 data.append(gen(value)) 1151 1152 return data # type: ignore 1153 1154 def _audit_metadata_hash_values(self) -> t.List[str]: 1155 from sqlmesh.core.audit.builtin import BUILT_IN_AUDITS 1156 1157 metadata = [] 1158 1159 for audit_name, audit_args in sorted(self.audits, key=lambda a: a[0]): 1160 metadata.append(audit_name) 1161 if audit_name in BUILT_IN_AUDITS: 1162 for arg_name, arg_value in audit_args.items(): 1163 metadata.append(arg_name) 1164 metadata.append(gen(arg_value)) 1165 else: 1166 audit = self.audit_definitions[audit_name] 1167 metadata.extend( 1168 [ 1169 audit.query_.sql, 1170 audit.dialect, 1171 str(audit.skip), 1172 str(audit.blocking), 1173 ] 1174 ) 1175 1176 return metadata 1177 1178 def audit_metadata_hash(self) -> str: 1179 return hash_data(self._audit_metadata_hash_values()) 1180 1181 @property 1182 def metadata_hash(self) -> str: 1183 """ 1184 Computes the metadata hash for the node. 1185 1186 Returns: 1187 The metadata hash for the node. 1188 """ 1189 if self._metadata_hash is None: 1190 metadata = [ 1191 self.dialect, 1192 self.owner, 1193 self.description, 1194 json.dumps(self.column_descriptions, sort_keys=True), 1195 self.cron, 1196 self.cron_tz.key if self.cron_tz else None, 1197 str(self.start) if self.start else None, 1198 str(self.end) if self.end else None, 1199 str(self.retention) if self.retention else None, 1200 str(self.batch_size) if self.batch_size is not None else None, 1201 str(self.batch_concurrency) if self.batch_concurrency is not None else None, 1202 json.dumps(self.mapping_schema, sort_keys=True), 1203 *sorted(self.tags), 1204 *sorted(ref.json(sort_keys=True) for ref in self.all_references), 1205 *self.kind.metadata_hash_values, 1206 self.project, 1207 str(self.allow_partials), 1208 gen(self.session_properties_) if self.session_properties_ else None, 1209 *[gen(g) for g in self.grains], 1210 *self._audit_metadata_hash_values(), 1211 json.dumps(self.grants, sort_keys=True) if self.grants else None, 1212 self.grants_target_layer, 1213 ] 1214 1215 for key, value in (self.virtual_properties or {}).items(): 1216 metadata.append(key) 1217 metadata.append(gen(value)) 1218 1219 for signal_name, args in sorted(self.signals, key=lambda x: x[0]): 1220 metadata.append(signal_name) 1221 for k, v in sorted(args.items()): 1222 metadata.append(f"{k}:{gen(v)}") 1223 1224 if self.dbt_node_info: 1225 metadata.append(self.dbt_node_info.json(sort_keys=True)) 1226 1227 metadata.extend(self._additional_metadata) 1228 1229 self._metadata_hash = hash_data(metadata) 1230 return self._metadata_hash 1231 1232 @property 1233 def is_model(self) -> bool: 1234 """Return True if this is a model node""" 1235 return True 1236 1237 @property 1238 def grants_table_type(self) -> DataObjectType: 1239 """Get the table type for grants application (TABLE, VIEW, MATERIALIZED_VIEW). 1240 1241 Returns: 1242 The DataObjectType that should be used when applying grants to this model. 1243 """ 1244 from sqlmesh.core.engine_adapter.shared import DataObjectType 1245 1246 if self.kind.is_view: 1247 if hasattr(self.kind, "materialized") and getattr(self.kind, "materialized", False): 1248 return DataObjectType.MATERIALIZED_VIEW 1249 return DataObjectType.VIEW 1250 if self.kind.is_managed: 1251 return DataObjectType.MANAGED_TABLE 1252 # All other materialized models are tables 1253 return DataObjectType.TABLE 1254 1255 @property 1256 def _additional_metadata(self) -> t.List[str]: 1257 additional_metadata = [] 1258 1259 metadata_only_macros = [(k, v) for k, v in self.sorted_python_env if v.is_metadata] 1260 if metadata_only_macros: 1261 additional_metadata.append(str(metadata_only_macros)) 1262 1263 for statements in [self.pre_statements_, self.post_statements_, self.on_virtual_update_]: 1264 for statement in statements or []: 1265 additional_metadata.append(statement.sql) 1266 1267 return additional_metadata 1268 1269 def _is_metadata_statement(self, statement: exp.Expr) -> bool: 1270 if isinstance(statement, d.MacroDef): 1271 return True 1272 if isinstance(statement, d.MacroFunc): 1273 target_macro = macro.get_registry().get(statement.name) 1274 if target_macro: 1275 return target_macro.metadata_only 1276 target_macro = self.python_env.get(statement.name) 1277 if target_macro: 1278 return bool(target_macro.is_metadata) 1279 return False 1280 1281 @property 1282 def full_depends_on(self) -> t.Set[str]: 1283 if not self.extract_dependencies_from_query: 1284 return self.depends_on_ or set() 1285 if self._full_depends_on is None: 1286 depends_on = self.depends_on_ or set() 1287 1288 query = self.render_query(needs_optimization=False) 1289 if query is not None: 1290 depends_on |= d.find_tables( 1291 query, default_catalog=self.default_catalog, dialect=self.dialect 1292 ) 1293 self._full_depends_on = depends_on 1294 1295 return self._full_depends_on 1296 1297 @property 1298 def partitioned_by(self) -> t.List[exp.Expr]: 1299 """Columns to partition the model by, including the time column if it is not already included.""" 1300 if self.time_column and not self._is_time_column_in_partitioned_by: 1301 # This allows the user to opt out of automatic time_column injection 1302 # by setting `partition_by_time_column false` on the model kind 1303 if ( 1304 hasattr(self.kind, "partition_by_time_column") 1305 and self.kind.partition_by_time_column 1306 ): 1307 return [ 1308 TIME_COL_PARTITION_FUNC.get(self.dialect, lambda x, y: x)( 1309 self.time_column.column, self.columns_to_types 1310 ), 1311 *self.partitioned_by_, 1312 ] 1313 return self.partitioned_by_ 1314 1315 @property 1316 def partition_interval_unit(self) -> t.Optional[IntervalUnit]: 1317 """The interval unit to use for partitioning if applicable.""" 1318 # Only return the interval unit for partitioning if the partitioning 1319 # wasn't explicitly set by the user. Otherwise, the user-provided 1320 # value should always take precedence. 1321 if self.time_column and not self._is_time_column_in_partitioned_by: 1322 return self.interval_unit 1323 return None 1324 1325 @property 1326 def audits_with_args(self) -> t.List[t.Tuple[Audit, t.Dict[str, exp.Expr]]]: 1327 from sqlmesh.core.audit.builtin import BUILT_IN_AUDITS 1328 1329 audits_by_name = {**BUILT_IN_AUDITS, **self.audit_definitions} 1330 audits_with_args = [] 1331 added_audits = set() 1332 1333 for audit_name, audit_args in self.audits: 1334 audits_with_args.append((audits_by_name[audit_name], audit_args.copy())) 1335 added_audits.add(audit_name) 1336 1337 for audit_name in self.audit_definitions: 1338 if audit_name not in added_audits: 1339 audits_with_args.append((audits_by_name[audit_name], {})) 1340 1341 return audits_with_args 1342 1343 @property 1344 def _is_time_column_in_partitioned_by(self) -> bool: 1345 return self.time_column is not None and self.time_column.column in { 1346 col for expr in self.partitioned_by_ for col in expr.find_all(exp.Column) 1347 } 1348 1349 @property 1350 def violated_rules_for_query(self) -> t.Dict[type[Rule], t.Any]: 1351 return {} 1352 1353 1354class SqlModel(_Model): 1355 """The model definition which relies on a SQL query to fetch the data. 1356 1357 Args: 1358 query: The main query representing the model. 1359 pre_statements: The list of SQL statements that precede the model's query. 1360 post_statements: The list of SQL statements that follow after the model's query. 1361 on_virtual_update: The list of SQL statements to be executed after the virtual update. 1362 """ 1363 1364 query_: ParsableSql = Field(alias="query") 1365 source_type: t.Literal["sql"] = "sql" 1366 1367 _columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None 1368 1369 def __getstate__(self) -> t.Dict[t.Any, t.Any]: 1370 state = super().__getstate__() 1371 state["__dict__"] = state["__dict__"].copy() 1372 # query renderer is very expensive to serialize 1373 state["__dict__"].pop("_query_renderer", None) 1374 state["__dict__"].pop("column_descriptions", None) 1375 private = state[PRIVATE_FIELDS] 1376 private["_columns_to_types"] = None 1377 return state 1378 1379 def copy(self, **kwargs: t.Any) -> Self: 1380 model = super().copy(**kwargs) 1381 model.__dict__.pop("_query_renderer", None) 1382 model.__dict__.pop("column_descriptions", None) 1383 model._columns_to_types = None 1384 if kwargs.get("update", {}).keys() & {"depends_on_", "query"}: 1385 model._full_depends_on = None 1386 return model 1387 1388 @property 1389 def query(self) -> t.Union[exp.Query, d.JinjaQuery, d.MacroFunc]: 1390 parsed_query = self.query_.parse(self.dialect) 1391 return t.cast(t.Union[exp.Query, d.JinjaQuery, d.MacroFunc], parsed_query) 1392 1393 def render_query( 1394 self, 1395 *, 1396 start: t.Optional[TimeLike] = None, 1397 end: t.Optional[TimeLike] = None, 1398 execution_time: t.Optional[TimeLike] = None, 1399 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 1400 table_mapping: t.Optional[t.Dict[str, str]] = None, 1401 expand: t.Iterable[str] = tuple(), 1402 deployability_index: t.Optional[DeployabilityIndex] = None, 1403 engine_adapter: t.Optional[EngineAdapter] = None, 1404 **kwargs: t.Any, 1405 ) -> t.Optional[exp.Query]: 1406 query = self._query_renderer.render( 1407 start=start, 1408 end=end, 1409 execution_time=execution_time, 1410 snapshots=snapshots, 1411 table_mapping=table_mapping, 1412 expand=expand, 1413 deployability_index=deployability_index, 1414 engine_adapter=engine_adapter, 1415 **kwargs, 1416 ) 1417 1418 return query 1419 1420 def render_definition( 1421 self, 1422 include_python: bool = True, 1423 include_defaults: bool = False, 1424 render_query: bool = False, 1425 ) -> t.List[exp.Expr]: 1426 result: t.List[exp.Expr] = super().render_definition( 1427 include_python=include_python, include_defaults=include_defaults 1428 ) 1429 1430 if render_query: 1431 result.extend(self.render_pre_statements()) 1432 result.append(self.render_query() or self.query) 1433 result.extend(self.render_post_statements()) 1434 if virtual_update := self.render_on_virtual_update(): 1435 result.append(d.VirtualUpdateStatement(expressions=virtual_update)) 1436 else: 1437 result.extend(self.pre_statements) 1438 result.append(self.query) 1439 result.extend(self.post_statements) 1440 if self.on_virtual_update: 1441 result.append(d.VirtualUpdateStatement(expressions=self.on_virtual_update)) 1442 1443 return result 1444 1445 @property 1446 def is_sql(self) -> bool: 1447 return True 1448 1449 @property 1450 def columns_to_types(self) -> t.Optional[t.Dict[str, exp.DataType]]: 1451 if self.columns_to_types_ is not None: 1452 self._columns_to_types = self.columns_to_types_ 1453 elif self._columns_to_types is None: 1454 try: 1455 query = self._query_renderer.render() 1456 except Exception: 1457 logger.exception("Failed to render query for model %s", self.fqn) 1458 return None 1459 1460 if query is None: 1461 return None 1462 1463 unknown = exp.DataType.build("unknown") 1464 1465 columns_to_types = {} 1466 for select in query.selects: 1467 output_name = select.output_name 1468 1469 # If model validation is disabled, we cannot assume that projections 1470 # will have inferrable output names or even that they will be unique 1471 if not output_name or output_name in columns_to_types: 1472 return None 1473 1474 # copy data type because it is used in the engine to build CTAS and other queries 1475 # this can change the parent which will mess up the diffing algo 1476 columns_to_types[output_name] = (select.type or unknown).copy() 1477 1478 self._columns_to_types = columns_to_types 1479 1480 if "*" in self._columns_to_types: 1481 return None 1482 1483 return {**self._columns_to_types, **self.managed_columns} 1484 1485 @cached_property 1486 def column_descriptions(self) -> t.Dict[str, str]: 1487 if self.column_descriptions_ is not None: 1488 return self.column_descriptions_ 1489 1490 query = self.render_query() 1491 if query is None: 1492 return {} 1493 1494 return { 1495 select.alias_or_name: select.comments[-1].strip() 1496 for select in query.selects 1497 if select.comments 1498 } 1499 1500 def set_mapping_schema(self, schema: t.Dict) -> None: 1501 super().set_mapping_schema(schema) 1502 self._on_mapping_schema_set() 1503 1504 def update_schema(self, schema: MappingSchema) -> None: 1505 super().update_schema(schema) 1506 self._on_mapping_schema_set() 1507 1508 def _on_mapping_schema_set(self) -> None: 1509 self._columns_to_types = None 1510 self._query_renderer.update_schema(self.mapping_schema) 1511 1512 def validate_definition(self) -> None: 1513 query = self._query_renderer.render() 1514 if query is None: 1515 if self.depends_on_ is None: 1516 raise_config_error( 1517 "Dependencies must be provided explicitly for models that can be rendered only at runtime", 1518 self._path, 1519 ) 1520 return 1521 1522 if not isinstance(query, exp.Query): 1523 raise_config_error("Missing SELECT query in the model definition", self._path) 1524 1525 projection_list = query.selects 1526 if not projection_list: 1527 raise_config_error("Query missing select statements", self._path) 1528 1529 if self.depends_on_self and not self.annotated: 1530 raise_config_error( 1531 "Self-referencing models require inferrable column types. There are three options available to mitigate this issue: add explicit types to all projections in the outermost SELECT statement, leverage external models (https://sqlmesh.readthedocs.io/en/stable/concepts/models/external_models/), or use the `columns` model attribute (https://sqlmesh.readthedocs.io/en/stable/concepts/models/overview/#columns).", 1532 self._path, 1533 ) 1534 1535 super().validate_definition() 1536 1537 def is_breaking_change(self, previous: Model) -> t.Optional[bool]: 1538 if not isinstance(previous, SqlModel): 1539 return None 1540 1541 if self.lookback != previous.lookback: 1542 return None 1543 1544 try: 1545 # the previous model which comes from disk could be unrenderable 1546 previous_query = previous.render_query() 1547 except Exception: 1548 previous_query = None 1549 this_query = self.render_query() 1550 1551 if previous_query is None or this_query is None: 1552 # Can't determine if there's a breaking change if we can't render the query. 1553 return None 1554 1555 if previous_query is this_query: 1556 edits = [] 1557 else: 1558 edits = diff( 1559 previous_query, 1560 this_query, 1561 matchings=[(previous_query, this_query)], 1562 delta_only=True, 1563 dialect=self.dialect if self.dialect == previous.dialect else None, 1564 ) 1565 inserted_expressions = {e.expression for e in edits if isinstance(e, Insert)} 1566 1567 for edit in edits: 1568 if not isinstance(edit, Insert): 1569 return None 1570 1571 expr = edit.expression 1572 if isinstance(expr, exp.UDTF): 1573 # projection subqueries do not change cardinality, engines don't allow these to return 1574 # more than one row of data 1575 parent = expr.find_ancestor(exp.Subquery) 1576 1577 if not parent: 1578 return None 1579 1580 expr = parent 1581 1582 if not _is_projection(expr) and expr.parent not in inserted_expressions: 1583 return None 1584 1585 return False 1586 1587 def is_metadata_only_change(self, previous: _Node) -> bool: 1588 if self._is_metadata_only_change_cache.get(id(previous), None) is not None: 1589 return self._is_metadata_only_change_cache[id(previous)] 1590 1591 if not super().is_metadata_only_change(previous): 1592 return False 1593 1594 if not isinstance(previous, SqlModel): 1595 self._is_metadata_only_change_cache[id(previous)] = False 1596 return False 1597 1598 this_rendered_query = self.render_query() or self.query 1599 previous_rendered_query = previous.render_query() or previous.query 1600 is_metadata_change = this_rendered_query == previous_rendered_query 1601 1602 self._is_metadata_only_change_cache[id(previous)] = is_metadata_change 1603 return is_metadata_change 1604 1605 @cached_property 1606 def _query_renderer(self) -> QueryRenderer: 1607 no_quote_identifiers = self.kind.is_view and self.dialect in ("trino", "spark") 1608 return QueryRenderer( 1609 self.query, 1610 self.dialect, 1611 self.macro_definitions, 1612 schema=self.mapping_schema, 1613 path=self._path, 1614 jinja_macro_registry=self.jinja_macros, 1615 python_env=self.python_env, 1616 only_execution_time=self.kind.only_execution_time, 1617 default_catalog=self.default_catalog, 1618 quote_identifiers=not no_quote_identifiers, 1619 optimize_query=self.optimize_query, 1620 model=self, 1621 ) 1622 1623 @property 1624 def _data_hash_values_no_sql(self) -> t.List[str]: 1625 return [ 1626 *super()._data_hash_values_no_sql, 1627 *self.jinja_macros.data_hash_values, 1628 ] 1629 1630 @property 1631 def _data_hash_values_sql(self) -> t.List[str]: 1632 return [ 1633 *super()._data_hash_values_sql, 1634 self.query_.sql, 1635 ] 1636 1637 @property 1638 def _additional_metadata(self) -> t.List[str]: 1639 return [*super()._additional_metadata, self.query_.sql] 1640 1641 @property 1642 def violated_rules_for_query(self) -> t.Dict[type[Rule], t.Any]: 1643 self.render_query() 1644 return self._query_renderer._violated_rules 1645 1646 1647class SeedModel(_Model): 1648 """The model definition which uses a pre-built static dataset to source the data from. 1649 1650 Args: 1651 seed: The content of a pre-built static dataset. 1652 """ 1653 1654 kind: SeedKind 1655 seed: Seed 1656 column_hashes_: t.Optional[t.Dict[str, str]] = Field(default=None, alias="column_hashes") 1657 derived_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None 1658 is_hydrated: bool = True 1659 source_type: t.Literal["seed"] = "seed" 1660 1661 def __getstate__(self) -> t.Dict[t.Any, t.Any]: 1662 state = super().__getstate__() 1663 state["__dict__"] = state["__dict__"].copy() 1664 state["__dict__"].pop("_reader", None) 1665 return state 1666 1667 def copy(self, **kwargs: t.Any) -> Self: 1668 model = super().copy(**kwargs) 1669 model.__dict__.pop("_reader", None) 1670 return model 1671 1672 def render( 1673 self, 1674 *, 1675 context: ExecutionContext, 1676 start: t.Optional[TimeLike] = None, 1677 end: t.Optional[TimeLike] = None, 1678 execution_time: t.Optional[TimeLike] = None, 1679 **kwargs: t.Any, 1680 ) -> t.Iterator[QueryOrDF]: 1681 if not self.is_hydrated: 1682 return 1683 yield from self.render_seed() 1684 1685 def render_seed(self) -> t.Iterator[QueryOrDF]: 1686 import numpy as np 1687 1688 self._ensure_hydrated() 1689 1690 date_columns = [] 1691 datetime_columns = [] 1692 bool_columns = [] 1693 string_columns = [] 1694 1695 columns_to_types = self.columns_to_types_ or {} 1696 column_names_to_check = set(columns_to_types) 1697 for name, tpe in columns_to_types.items(): 1698 if tpe.this in (exp.DataType.Type.DATE, exp.DataType.Type.DATE32): 1699 date_columns.append(name) 1700 elif tpe.this in exp.DataType.TEMPORAL_TYPES: 1701 datetime_columns.append(name) 1702 elif tpe.is_type("boolean"): 1703 bool_columns.append(name) 1704 elif tpe.this in exp.DataType.TEXT_TYPES: 1705 string_columns.append(name) 1706 1707 for df in self._reader.read(batch_size=self.kind.batch_size): 1708 rename_dict = {} 1709 for column in columns_to_types: 1710 if column not in df: 1711 normalized_name = normalize_identifiers(column, dialect=self.dialect).name 1712 if normalized_name in df: 1713 rename_dict[normalized_name] = column 1714 if rename_dict: 1715 df.rename(columns=rename_dict, inplace=True) 1716 # These names have already been checked 1717 column_names_to_check -= set(rename_dict) 1718 1719 missing_columns = column_names_to_check - set(df.columns) 1720 if missing_columns: 1721 raise_config_error( 1722 f"Seed model '{self.name}' has missing columns: {missing_columns}", self._path 1723 ) 1724 1725 # convert all date/time types to native pandas timestamp 1726 for column in [*date_columns, *datetime_columns]: 1727 import pandas as pd 1728 1729 df[column] = pd.to_datetime(df[column], infer_datetime_format=True, errors="ignore") # type: ignore 1730 1731 # extract datetime.date from pandas timestamp for DATE columns 1732 for column in date_columns: 1733 try: 1734 df[column] = df[column].dt.date 1735 except Exception as ex: 1736 logger.error( 1737 "Failed to convert column '%s' to date in seed model '%s': %s", 1738 column, 1739 self.name, 1740 ex, 1741 ) 1742 1743 for column in bool_columns: 1744 df[column] = df[column].apply(lambda i: str_to_bool(str(i))) 1745 1746 df.loc[:, string_columns] = df[string_columns].mask( 1747 cond=lambda x: x.notna(), # type: ignore 1748 other=df[string_columns].astype(str), # type: ignore 1749 ) 1750 yield df.replace({np.nan: None}) 1751 1752 @property 1753 def columns_to_types(self) -> t.Optional[t.Dict[str, exp.DataType]]: 1754 if self.columns_to_types_ is not None: 1755 return self.columns_to_types_ 1756 if self.derived_columns_to_types is not None: 1757 return self.derived_columns_to_types 1758 if self.is_hydrated: 1759 return self._reader.columns_to_types 1760 return None 1761 1762 @property 1763 def column_hashes(self) -> t.Dict[str, str]: 1764 if self.column_hashes_ is not None: 1765 return self.column_hashes_ 1766 self._ensure_hydrated() 1767 return self._reader.column_hashes 1768 1769 @property 1770 def is_seed(self) -> bool: 1771 return True 1772 1773 @property 1774 def seed_path(self) -> Path: 1775 seed_path = Path(self.kind.path) 1776 if not seed_path.is_absolute(): 1777 if self._path is None: 1778 raise SQLMeshError(f"Seed model '{self.name}' has no path") 1779 return self._path.parent / seed_path 1780 return seed_path 1781 1782 @property 1783 def depends_on(self) -> t.Set[str]: 1784 return (self.depends_on_ or set()) - {self.fqn} 1785 1786 @property 1787 def depends_on_self(self) -> bool: 1788 return False 1789 1790 @property 1791 def batch_size(self) -> t.Optional[int]: 1792 # Unlike other model kinds, the batch size provided in the SEED kind represents the 1793 # maximum number of rows to insert in a single batch. 1794 # We should never batch intervals for seed models. 1795 return None 1796 1797 def to_dehydrated(self) -> SeedModel: 1798 """Creates a dehydrated copy of this model. 1799 1800 The dehydrated seed model will not contain the seed content, but will contain 1801 the column hashes. This is useful for comparing two seed models without 1802 having to read the seed content from disk. 1803 1804 Returns: 1805 A dehydrated copy of this model. 1806 """ 1807 if not self.is_hydrated: 1808 return self 1809 1810 return self.copy( 1811 update={ 1812 "seed": Seed(content=""), 1813 "is_hydrated": False, 1814 "column_hashes_": self.column_hashes, 1815 "derived_columns_to_types": self.columns_to_types 1816 if self.columns_to_types_ is None 1817 else None, 1818 } 1819 ) 1820 1821 def to_hydrated(self, content: str) -> SeedModel: 1822 """Creates a hydrated copy of this model with the given seed content. 1823 1824 Returns: 1825 A hydrated copy of this model. 1826 """ 1827 if self.is_hydrated: 1828 return self 1829 1830 return self.copy( 1831 update={ 1832 "seed": Seed(content=content), 1833 "is_hydrated": True, 1834 "column_hashes_": None, 1835 }, 1836 ) 1837 1838 def is_breaking_change(self, previous: Model) -> t.Optional[bool]: 1839 if not isinstance(previous, SeedModel): 1840 return None 1841 1842 new_columns = set(self.column_hashes) 1843 old_columns = set(previous.column_hashes) 1844 1845 if not new_columns.issuperset(old_columns): 1846 return None 1847 1848 for col in old_columns: 1849 if self.column_hashes[col] != previous.column_hashes[col]: 1850 return None 1851 1852 return False 1853 1854 def _ensure_hydrated(self) -> None: 1855 if not self.is_hydrated: 1856 raise SQLMeshError(f"Seed model '{self.name}' is not hydrated.") 1857 1858 @cached_property 1859 def _reader(self) -> CsvSeedReader: 1860 return self.seed.reader(dialect=self.dialect, settings=self.kind.csv_settings) 1861 1862 @property 1863 def _data_hash_values_no_sql(self) -> t.List[str]: 1864 data = super()._data_hash_values_no_sql 1865 for column_name, column_hash in self.column_hashes.items(): 1866 data.append(column_name) 1867 data.append(column_hash) 1868 1869 # Include grants in data hash for seed models to force recreation on grant changes 1870 # since seed models don't support migration 1871 data.append(json.dumps(self.grants, sort_keys=True) if self.grants else "") 1872 data.append(self.grants_target_layer) 1873 1874 return data 1875 1876 1877class PythonModel(_Model): 1878 """The model definition which relies on a Python script to fetch the data. 1879 1880 Args: 1881 entrypoint: The name of a Python function which contains the data fetching / transformation logic. 1882 """ 1883 1884 kind: ModelKind = FullKind() 1885 entrypoint: str 1886 source_type: t.Literal["python"] = "python" 1887 1888 def validate_definition(self) -> None: 1889 super().validate_definition() 1890 1891 if self.kind and not self.kind.supports_python_models: 1892 raise_config_error( 1893 f"Cannot create Python model '{self.name}' as the '{self.kind.name}' kind doesn't support Python models", 1894 self._path, 1895 ) 1896 1897 def render( 1898 self, 1899 *, 1900 context: ExecutionContext, 1901 start: t.Optional[TimeLike] = None, 1902 end: t.Optional[TimeLike] = None, 1903 execution_time: t.Optional[TimeLike] = None, 1904 **kwargs: t.Any, 1905 ) -> t.Iterator[QueryOrDF]: 1906 env = prepare_env(self.python_env) 1907 start, end = make_inclusive(start or c.EPOCH, end or c.EPOCH, self.dialect) 1908 execution_time = to_datetime(execution_time or c.EPOCH) 1909 1910 variables = { 1911 **env.get(c.SQLMESH_VARS, {}), 1912 **env.get(c.SQLMESH_VARS_METADATA, {}), 1913 **kwargs.pop("variables", {}), 1914 } 1915 blueprint_variables = { 1916 k: d.parse_one(v.sql, dialect=self.dialect) if isinstance(v, SqlValue) else v 1917 for k, v in { 1918 **env.get(c.SQLMESH_BLUEPRINT_VARS, {}), 1919 **env.get(c.SQLMESH_BLUEPRINT_VARS_METADATA, {}), 1920 }.items() 1921 } 1922 try: 1923 kwargs = { 1924 **variables, 1925 **kwargs, 1926 "start": start, 1927 "end": end, 1928 "execution_time": execution_time, 1929 "latest": execution_time, # TODO: Preserved for backward compatibility. Remove in 1.0.0. 1930 } 1931 df_or_iter = env[self.entrypoint]( 1932 context=context.with_variables(variables, blueprint_variables=blueprint_variables), 1933 **kwargs, 1934 ) 1935 1936 if not isinstance(df_or_iter, types.GeneratorType): 1937 df_or_iter = [df_or_iter] 1938 1939 for df in df_or_iter: 1940 yield df 1941 except Exception as e: 1942 raise PythonModelEvalError(format_evaluated_code_exception(e, self.python_env)) 1943 1944 def render_definition( 1945 self, 1946 include_python: bool = True, 1947 include_defaults: bool = False, 1948 render_query: bool = False, 1949 ) -> t.List[exp.Expr]: 1950 # Ignore the provided value for the include_python flag, since the Pyhon model's 1951 # definition without Python code is meaningless. 1952 return super().render_definition( 1953 include_python=True, include_defaults=include_defaults, render_query=render_query 1954 ) 1955 1956 @property 1957 def is_python(self) -> bool: 1958 return True 1959 1960 def is_breaking_change(self, previous: Model) -> t.Optional[bool]: 1961 return None 1962 1963 @property 1964 def _data_hash_values_no_sql(self) -> t.List[str]: 1965 data = super()._data_hash_values_no_sql 1966 data.append(self.entrypoint) 1967 return data 1968 1969 1970class ExternalModel(_Model): 1971 """The model definition which represents an external source/table.""" 1972 1973 kind: ModelKind = ExternalKind() 1974 source_type: t.Literal["external"] = "external" 1975 1976 def is_breaking_change(self, previous: Model) -> t.Optional[bool]: 1977 if not isinstance(previous, ExternalModel): 1978 return None 1979 if not previous.columns_to_types_or_raise.items() - self.columns_to_types_or_raise.items(): 1980 return False 1981 return None 1982 1983 @property 1984 def depends_on(self) -> t.Set[str]: 1985 return set() 1986 1987 @property 1988 def depends_on_self(self) -> bool: 1989 return False 1990 1991 1992Model = t.Union[SqlModel, SeedModel, PythonModel, ExternalModel] 1993 1994 1995class AuditResult(PydanticModel): 1996 audit: Audit 1997 """The audit this result is for.""" 1998 audit_args: t.Dict[t.Any, t.Any] 1999 """Arguments passed to the audit.""" 2000 model: t.Optional[_Model] = None 2001 """The model this audit is for.""" 2002 count: t.Optional[int] = None 2003 """The number of records returned by the audit query. This could be None if the audit was skipped.""" 2004 query: t.Optional[exp.Expr] = None 2005 """The rendered query used by the audit. This could be None if the audit was skipped.""" 2006 skipped: bool = False 2007 """Whether or not the audit was blocking. This can be overriden by the user.""" 2008 blocking: bool = True 2009 2010 2011class EvaluatableSignals(PydanticModel): 2012 signals_to_kwargs: t.Dict[str, t.Dict[str, t.Optional[exp.Expr]]] 2013 """A mapping of signal names to the kwargs passed to the signal.""" 2014 python_env: t.Dict[str, Executable] 2015 """The Python environment that should be used to evaluated the rendered signal calls.""" 2016 prepared_python_env: t.Dict[str, t.Any] 2017 """The prepared Python environment that should be used to evaluated the rendered signal calls.""" 2018 2019 2020def _extract_blueprints(blueprints: t.Any, path: Path) -> t.List[t.Any]: 2021 if not blueprints: 2022 return [None] 2023 if isinstance(blueprints, exp.Paren): 2024 return [blueprints.unnest()] 2025 if isinstance(blueprints, (exp.Tuple, exp.Array)): 2026 return blueprints.expressions 2027 if isinstance(blueprints, list): 2028 return blueprints 2029 2030 raise_config_error( 2031 "Expected a list or tuple consisting of key-value mappings for " 2032 f"the 'blueprints' property, got '{blueprints}' instead", 2033 path, 2034 ) 2035 return [] # This is unreachable, but is done to satisfy mypy 2036 2037 2038def _extract_blueprint_variables(blueprint: t.Any, path: Path) -> t.Dict[str, t.Any]: 2039 if not blueprint: 2040 return {} 2041 if isinstance(blueprint, (exp.Paren, exp.PropertyEQ)): 2042 blueprint = blueprint.unnest() 2043 return {blueprint.left.name.lower(): blueprint.right} 2044 if isinstance(blueprint, (exp.Tuple, exp.Array)): 2045 return {e.left.name.lower(): e.right for e in blueprint.expressions} 2046 if isinstance(blueprint, dict): 2047 return {k.lower(): v for k, v in blueprint.items()} 2048 2049 raise_config_error( 2050 f"Expected a key-value mapping for the blueprint value, got '{blueprint}' instead", 2051 path, 2052 ) 2053 return {} # This is unreachable, but is done to satisfy mypy 2054 2055 2056def create_models_from_blueprints( 2057 gateway: t.Optional[str | exp.Expr], 2058 blueprints: t.Any, 2059 get_variables: t.Callable[[t.Optional[str]], t.Dict[str, str]], 2060 loader: t.Callable[..., Model], 2061 path: Path = Path(), 2062 module_path: Path = Path(), 2063 dialect: DialectType = None, 2064 default_catalog_per_gateway: t.Optional[t.Dict[str, str]] = None, 2065 **loader_kwargs: t.Any, 2066) -> t.List[Model]: 2067 model_blueprints: t.List[Model] = [] 2068 for blueprint in _extract_blueprints(blueprints, path): 2069 blueprint_variables = _extract_blueprint_variables(blueprint, path) 2070 2071 if gateway: 2072 rendered_gateway = render_expression( 2073 expression=exp.maybe_parse(gateway, dialect=dialect), 2074 module_path=module_path, 2075 macros=loader_kwargs.get("macros"), 2076 jinja_macros=loader_kwargs.get("jinja_macros"), 2077 path=path, 2078 dialect=dialect, 2079 default_catalog=loader_kwargs.get("default_catalog"), 2080 blueprint_variables=blueprint_variables, 2081 ) 2082 gateway_name = rendered_gateway[0].name if rendered_gateway else None 2083 else: 2084 gateway_name = None 2085 2086 if ( 2087 default_catalog_per_gateway 2088 and gateway_name 2089 and (catalog := default_catalog_per_gateway.get(gateway_name)) is not None 2090 ): 2091 loader_kwargs["default_catalog"] = catalog 2092 2093 model_blueprints.append( 2094 loader( 2095 path=path, 2096 module_path=module_path, 2097 dialect=dialect, 2098 variables=get_variables(gateway_name), 2099 blueprint_variables=blueprint_variables, 2100 **loader_kwargs, 2101 ) 2102 ) 2103 2104 return model_blueprints 2105 2106 2107def load_sql_based_models( 2108 expressions: t.List[exp.Expr], 2109 get_variables: t.Callable[[t.Optional[str]], t.Dict[str, str]], 2110 path: Path = Path(), 2111 module_path: Path = Path(), 2112 dialect: DialectType = None, 2113 default_catalog_per_gateway: t.Optional[t.Dict[str, str]] = None, 2114 **loader_kwargs: t.Any, 2115) -> t.List[Model]: 2116 gateway: t.Optional[exp.Expr] = None 2117 blueprints: t.Optional[exp.Expr] = None 2118 2119 model_meta = seq_get(expressions, 0) 2120 for prop in (isinstance(model_meta, d.Model) and model_meta.expressions) or []: 2121 if prop.name == "gateway": 2122 gateway = prop.args["value"] 2123 elif prop.name == "blueprints": 2124 # We pop the `blueprints` here to avoid walking large lists when rendering the meta 2125 blueprints = prop.pop().args["value"] 2126 2127 if isinstance(blueprints, d.MacroFunc): 2128 rendered_blueprints = render_expression( 2129 expression=blueprints, 2130 module_path=module_path, 2131 macros=loader_kwargs.get("macros"), 2132 jinja_macros=loader_kwargs.get("jinja_macros"), 2133 variables=get_variables(None), 2134 path=path, 2135 dialect=dialect, 2136 default_catalog=loader_kwargs.get("default_catalog"), 2137 ) 2138 if not rendered_blueprints: 2139 raise_config_error("Failed to render blueprints property", path) 2140 2141 # Help mypy see that rendered_blueprints can't be None 2142 assert rendered_blueprints 2143 2144 if len(rendered_blueprints) > 1: 2145 rendered_blueprints = [exp.Tuple(expressions=rendered_blueprints)] 2146 2147 blueprints = rendered_blueprints[0] 2148 2149 return create_models_from_blueprints( 2150 gateway=gateway, 2151 blueprints=blueprints, 2152 get_variables=get_variables, 2153 loader=partial(load_sql_based_model, expressions), 2154 path=path, 2155 module_path=module_path, 2156 dialect=dialect, 2157 default_catalog_per_gateway=default_catalog_per_gateway, 2158 **loader_kwargs, 2159 ) 2160 2161 2162def load_sql_based_model( 2163 expressions: t.List[exp.Expr], 2164 *, 2165 defaults: t.Optional[t.Dict[str, t.Any]] = None, 2166 path: t.Optional[Path] = None, 2167 module_path: Path = Path(), 2168 time_column_format: str = c.DEFAULT_TIME_COLUMN_FORMAT, 2169 macros: t.Optional[MacroRegistry] = None, 2170 jinja_macros: t.Optional[JinjaMacroRegistry] = None, 2171 audits: t.Optional[t.Dict[str, ModelAudit]] = None, 2172 python_env: t.Optional[t.Dict[str, Executable]] = None, 2173 dialect: t.Optional[str] = None, 2174 physical_schema_mapping: t.Optional[t.Dict[re.Pattern, str]] = None, 2175 default_catalog: t.Optional[str] = None, 2176 variables: t.Optional[t.Dict[str, t.Any]] = None, 2177 infer_names: t.Optional[bool] = False, 2178 blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None, 2179 **kwargs: t.Any, 2180) -> Model: 2181 """Load a model from a parsed SQLMesh model SQL file. 2182 2183 Args: 2184 expressions: Model, *Statements, Query. 2185 defaults: Definition default values. 2186 path: An optional path to the model definition file. 2187 module_path: The python module path to serialize macros for. 2188 time_column_format: The default time column format to use if no model time column is configured. 2189 macros: The custom registry of macros. If not provided the default registry will be used. 2190 jinja_macros: The registry of Jinja macros. 2191 python_env: The custom Python environment for macros. If not provided the environment will be constructed 2192 from the macro registry. 2193 dialect: The default dialect if no model dialect is configured. 2194 The format must adhere to Python's strftime codes. 2195 physical_schema_mapping: A mapping of regular expressions to match against the model schema to produce the corresponding physical schema 2196 default_catalog: The default catalog if no model catalog is configured. 2197 variables: The variables to pass to the model. 2198 kwargs: Additional kwargs to pass to the loader. 2199 """ 2200 missing_model_msg = f"""Please add a MODEL block at the top of the file. Example: 2201 2202MODEL ( 2203 name sqlmesh_example.full_model, --model name 2204 kind FULL, --materialization 2205 cron '@daily', --schedule 2206); 2207 2208Learn more at https://sqlmesh.readthedocs.io/en/stable/concepts/models/overview 2209""" 2210 2211 if not expressions: 2212 raise_config_error(missing_model_msg) 2213 2214 dialect = dialect or "" 2215 meta = expressions[0] 2216 if not isinstance(meta, d.Model): 2217 if not infer_names: 2218 raise_config_error(missing_model_msg) 2219 meta = d.Model(expressions=[]) # Dummy meta node 2220 expressions.insert(0, meta) 2221 2222 # We deliberately hold off rendering some properties at load time because there is not enough information available 2223 # at load time to render them. They will get rendered later at evaluation time 2224 unrendered_properties = {} 2225 unrendered_merge_filter = None 2226 2227 for prop in meta.expressions: 2228 # Macro functions that programmaticaly generate the key-value pair properties should be rendered 2229 # This is needed in the odd case where a macro shares the name of one of the properties 2230 # eg `@session_properties()` Test: `test_macros_in_model_statement` Reference PR: #2574 2231 if isinstance(prop, d.MacroFunc): 2232 continue 2233 2234 prop_name = prop.name.lower() 2235 if prop_name in {"signals", "audits"} | PROPERTIES: 2236 unrendered_properties[prop_name] = prop.args.get("value") 2237 elif ( 2238 prop.name.lower() == "kind" 2239 and (value := prop.args.get("value")) 2240 and value.name.lower() == "incremental_by_unique_key" 2241 ): 2242 for kind_prop in value.expressions: 2243 if kind_prop.name.lower() == "merge_filter": 2244 unrendered_merge_filter = kind_prop 2245 2246 rendered_meta_exprs = render_expression( 2247 expression=meta, 2248 module_path=module_path, 2249 macros=macros, 2250 jinja_macros=jinja_macros, 2251 variables=variables, 2252 path=path, 2253 dialect=dialect, 2254 default_catalog=default_catalog, 2255 blueprint_variables=blueprint_variables, 2256 ) 2257 2258 if rendered_meta_exprs is None or len(rendered_meta_exprs) != 1: 2259 raise_config_error( 2260 f"Invalid MODEL statement:\n{meta.sql(dialect=dialect, pretty=True)}", 2261 path, 2262 ) 2263 raise 2264 2265 rendered_meta = rendered_meta_exprs[0] 2266 2267 rendered_defaults = ( 2268 render_model_defaults( 2269 defaults=defaults, 2270 module_path=module_path, 2271 macros=macros, 2272 jinja_macros=jinja_macros, 2273 variables=variables, 2274 path=path, 2275 dialect=dialect, 2276 default_catalog=default_catalog, 2277 ) 2278 if defaults 2279 else {} 2280 ) 2281 2282 rendered_defaults = parse_defaults_properties(rendered_defaults, dialect=dialect) 2283 2284 # Extract the query and any pre/post statements 2285 query_or_seed_insert, pre_statements, post_statements, on_virtual_update, inline_audits = ( 2286 _split_sql_model_statements(expressions[1:], path, dialect=dialect) 2287 ) 2288 2289 meta_fields: t.Dict[str, t.Any] = { 2290 "dialect": dialect, 2291 "description": ( 2292 "\n".join(comment.strip() for comment in rendered_meta.comments) 2293 if rendered_meta.comments 2294 else None 2295 ), 2296 **{prop.name.lower(): prop.args.get("value") for prop in rendered_meta.expressions}, 2297 **kwargs, 2298 } 2299 2300 # Discard the potentially half-rendered versions of these properties and replace them with the 2301 # original unrendered versions. They will get rendered properly at evaluation time 2302 meta_fields.update(unrendered_properties) 2303 2304 if unrendered_merge_filter: 2305 for idx, kind_prop in enumerate(meta_fields["kind"].expressions): 2306 if kind_prop.name.lower() == "merge_filter": 2307 meta_fields["kind"].expressions[idx] = unrendered_merge_filter 2308 2309 if isinstance(meta_fields.get("dialect"), exp.Expr): 2310 meta_fields["dialect"] = meta_fields["dialect"].name 2311 2312 # The name of the model will be inferred from its path relative to `models/`, if it's not explicitly specified 2313 name = meta_fields.pop("name", "") 2314 if not name and infer_names: 2315 if path is None: 2316 raise ValueError(f"Model {name} must have a name") 2317 name = get_model_name(path) 2318 2319 if not name: 2320 raise_config_error( 2321 "Please add the required 'name' field to the MODEL block at the top of the file.\n\n" 2322 + "Learn more at https://sqlmesh.readthedocs.io/en/stable/concepts/models/overview" 2323 ) 2324 if "default_catalog" in meta_fields: 2325 raise_config_error( 2326 "`default_catalog` cannot be set on a per-model basis. It must be set at the connection level.", 2327 path, 2328 ) 2329 2330 common_kwargs = dict( 2331 pre_statements=pre_statements, 2332 post_statements=post_statements, 2333 on_virtual_update=on_virtual_update, 2334 defaults=rendered_defaults, 2335 path=path, 2336 module_path=module_path, 2337 macros=macros, 2338 python_env=python_env, 2339 jinja_macros=jinja_macros, 2340 physical_schema_mapping=physical_schema_mapping, 2341 default_catalog=default_catalog, 2342 variables=variables, 2343 inline_audits=inline_audits, 2344 blueprint_variables=blueprint_variables, 2345 use_original_sql=True, 2346 **meta_fields, 2347 ) 2348 2349 kind = common_kwargs.pop("kind", ModelMeta.all_field_infos()["kind"].default) 2350 2351 if kind.name != ModelKindName.SEED: 2352 return create_sql_model( 2353 name, 2354 query_or_seed_insert, 2355 kind=kind, 2356 time_column_format=time_column_format, 2357 **common_kwargs, 2358 ) 2359 2360 seed_properties = {p.name.lower(): p.args.get("value") for p in kind.expressions} 2361 return create_seed_model( 2362 name, 2363 SeedKind(**seed_properties), 2364 **common_kwargs, 2365 ) 2366 2367 2368def create_sql_model( 2369 name: TableName, 2370 query: t.Optional[exp.Expr], 2371 **kwargs: t.Any, 2372) -> Model: 2373 """Creates a SQL model. 2374 2375 Args: 2376 name: The name of the model, which is of the form [catalog].[db].table. 2377 The catalog and db are optional. 2378 query: The model's logic in a form of a SELECT query. 2379 """ 2380 if not isinstance(query, (exp.Query, d.JinjaQuery, d.MacroFunc)): 2381 raise_config_error( 2382 "A query is required and must be a SELECT statement, a UNION statement, or a JINJA_QUERY block", 2383 kwargs.get("path"), 2384 ) 2385 assert isinstance(query, (exp.Query, d.JinjaQuery, d.MacroFunc)) 2386 2387 return _create_model(SqlModel, name, query=query, **kwargs) 2388 2389 2390def create_seed_model( 2391 name: TableName, 2392 seed_kind: SeedKind, 2393 *, 2394 path: t.Optional[Path] = None, 2395 module_path: Path = Path(), 2396 **kwargs: t.Any, 2397) -> Model: 2398 """Creates a Seed model. 2399 2400 Args: 2401 name: The name of the model, which is of the form [catalog].[db].table. 2402 The catalog and db are optional. 2403 seed_kind: The information about the location of a seed and other related configuration. 2404 path: An optional path to the model definition file. 2405 from the macro registry. 2406 """ 2407 seed_path = Path(seed_kind.path) 2408 marker, *subdirs = seed_path.parts 2409 if marker.lower() == "$root": 2410 seed_path = module_path.joinpath(*subdirs) 2411 seed_kind.path = str(seed_path) 2412 elif not seed_path.is_absolute(): 2413 if path is None: 2414 seed_path = seed_path 2415 elif path.is_dir(): 2416 seed_path = path / seed_path 2417 else: 2418 seed_path = path.parent / seed_path 2419 2420 seed = create_seed(seed_path) 2421 2422 return _create_model( 2423 SeedModel, 2424 name, 2425 path=path, 2426 seed=seed, 2427 kind=seed_kind, 2428 depends_on=kwargs.pop("depends_on", None), 2429 module_path=module_path, 2430 **kwargs, 2431 ) 2432 2433 2434def create_python_model( 2435 name: str, 2436 entrypoint: str, 2437 python_env: t.Dict[str, Executable], 2438 *, 2439 macros: t.Optional[MacroRegistry] = None, 2440 jinja_macros: t.Optional[JinjaMacroRegistry] = None, 2441 path: Path = Path(), 2442 module_path: Path = Path(), 2443 depends_on: t.Optional[t.Set[str]] = None, 2444 variables: t.Optional[t.Dict[str, t.Any]] = None, 2445 blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None, 2446 **kwargs: t.Any, 2447) -> Model: 2448 """Creates a Python model. 2449 2450 Args: 2451 name: The name of the model, which is of the form [catalog].[db].table. 2452 The catalog and db are optional. 2453 entrypoint: The name of a Python function which contains the data fetching / transformation logic. 2454 python_env: The Python environment of all objects referenced by the model implementation. 2455 path: An optional path to the model definition file. 2456 depends_on: The custom set of model's upstream dependencies. 2457 variables: The variables to pass to the model. 2458 blueprint_variables: The blueprint's variables to pass to the model. 2459 """ 2460 # Find dependencies for python models by parsing code if they are not explicitly defined 2461 # Also remove self-references that are found 2462 2463 dialect = kwargs.get("dialect") 2464 2465 dependencies_unspecified = depends_on is None 2466 2467 parsed_depends_on, referenced_variables = ( 2468 parse_dependencies( 2469 python_env, 2470 entrypoint, 2471 strict_resolution=dependencies_unspecified, 2472 variables=variables, 2473 blueprint_variables=blueprint_variables, 2474 ) 2475 if python_env is not None 2476 else (set(), set()) 2477 ) 2478 if dependencies_unspecified: 2479 depends_on = parsed_depends_on - {name} 2480 else: 2481 depends_on_rendered = render_expression( 2482 expression=exp.Array( 2483 expressions=[exp.maybe_parse(dep, dialect=dialect) for dep in depends_on or []] 2484 ), 2485 module_path=module_path, 2486 macros=macros, 2487 jinja_macros=jinja_macros, 2488 variables=variables, 2489 path=path, 2490 dialect=dialect, 2491 default_catalog=kwargs.get("default_catalog"), 2492 ) 2493 depends_on = { 2494 dep.sql(dialect=dialect) 2495 for dep in t.cast(t.List[exp.Expr], depends_on_rendered)[0].expressions 2496 } 2497 2498 used_variables = {k: v for k, v in (variables or {}).items() if k in referenced_variables} 2499 if used_variables: 2500 python_env[c.SQLMESH_VARS] = Executable.value(used_variables, sort_root_dict=True) 2501 2502 return _create_model( 2503 PythonModel, 2504 name, 2505 path=path, 2506 depends_on=depends_on, 2507 entrypoint=entrypoint, 2508 python_env=python_env, 2509 macros=macros, 2510 jinja_macros=jinja_macros, 2511 module_path=module_path, 2512 variables=variables, 2513 blueprint_variables=blueprint_variables, 2514 **kwargs, 2515 ) 2516 2517 2518def create_external_model( 2519 name: TableName, 2520 *, 2521 dialect: t.Optional[str] = None, 2522 path: Path = Path(), 2523 defaults: t.Optional[t.Dict[str, t.Any]] = None, 2524 **kwargs: t.Any, 2525) -> ExternalModel: 2526 """Creates an external model. 2527 2528 Args: 2529 name: The name of the model, which is of the form [catalog].[db].table. 2530 The catalog and db are optional. 2531 dialect: The dialect to serialize. 2532 path: An optional path to the model definition file. 2533 """ 2534 return t.cast( 2535 ExternalModel, 2536 _create_model( 2537 ExternalModel, 2538 name, 2539 defaults=defaults, 2540 dialect=dialect, 2541 path=path, 2542 kind=ModelKindName.EXTERNAL.value, 2543 **kwargs, 2544 ), 2545 ) 2546 2547 2548def _create_model( 2549 klass: t.Type[_Model], 2550 name: TableName, 2551 *, 2552 defaults: t.Optional[t.Dict[str, t.Any]] = None, 2553 path: t.Optional[Path] = None, 2554 time_column_format: str = c.DEFAULT_TIME_COLUMN_FORMAT, 2555 jinja_macros: t.Optional[JinjaMacroRegistry] = None, 2556 jinja_macro_references: t.Optional[t.Set[MacroReference]] = None, 2557 depends_on: t.Optional[t.Set[str]] = None, 2558 dialect: t.Optional[str] = None, 2559 physical_schema_mapping: t.Optional[t.Dict[re.Pattern, str]] = None, 2560 python_env: t.Optional[t.Dict[str, Executable]] = None, 2561 audit_definitions: t.Optional[t.Dict[str, ModelAudit]] = None, 2562 inline_audits: t.Optional[t.Dict[str, ModelAudit]] = None, 2563 module_path: Path = Path(), 2564 macros: t.Optional[MacroRegistry] = None, 2565 signal_definitions: t.Optional[SignalRegistry] = None, 2566 variables: t.Optional[t.Dict[str, t.Any]] = None, 2567 blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None, 2568 use_original_sql: bool = False, 2569 **kwargs: t.Any, 2570) -> Model: 2571 validate_extra_and_required_fields( 2572 klass, 2573 {"name", *kwargs} - {"grain", "table_properties"}, 2574 "MODEL block", 2575 path, 2576 ) 2577 2578 for prop in PROPERTIES: 2579 kwargs[prop] = _resolve_properties((defaults or {}).get(prop), kwargs.get(prop)) 2580 2581 dialect = dialect or "" 2582 2583 physical_schema_mapping = physical_schema_mapping or {} 2584 model_schema_name = exp.to_table(name, dialect=dialect).db 2585 physical_schema_override: t.Optional[str] = None 2586 2587 for re_pattern, override_schema in physical_schema_mapping.items(): 2588 if re.match(re_pattern, model_schema_name): 2589 physical_schema_override = override_schema 2590 break 2591 2592 raw_kind = kwargs.pop("kind", None) 2593 if raw_kind: 2594 kwargs["kind"] = create_model_kind(raw_kind, dialect, defaults or {}) 2595 2596 defaults = {k: v for k, v in (defaults or {}).items() if k in klass.all_fields()} 2597 if not issubclass(klass, SqlModel): 2598 defaults.pop("optimize_query", None) 2599 2600 statements: t.List[t.Union[exp.Expr, t.Tuple[exp.Expr, bool]]] = [] 2601 2602 if "query" in kwargs: 2603 statements.append(kwargs["query"]) 2604 kwargs["query"] = ParsableSql.from_parsed_expression( 2605 kwargs["query"], dialect, use_meta_sql=use_original_sql 2606 ) 2607 2608 # Merge default statements with model-specific statements 2609 for statement_field in ["pre_statements", "post_statements", "on_virtual_update"]: 2610 if statement_field in defaults: 2611 kwargs[statement_field] = [ 2612 exp.maybe_parse(stmt, dialect=dialect) for stmt in defaults[statement_field] 2613 ] + kwargs.get(statement_field, []) 2614 if statement_field in kwargs: 2615 # Macros extracted from these statements need to be treated as metadata only 2616 is_metadata = statement_field == "on_virtual_update" 2617 for stmt in kwargs[statement_field]: 2618 # Extract the expression if it's ParsableSql already 2619 expr = stmt.parse(dialect) if isinstance(stmt, ParsableSql) else stmt 2620 statements.append((expr, is_metadata)) 2621 kwargs[statement_field] = [ 2622 # this to retain the transaction information 2623 stmt 2624 if isinstance(stmt, ParsableSql) 2625 else ParsableSql.from_parsed_expression( 2626 stmt, dialect, use_meta_sql=use_original_sql 2627 ) 2628 for stmt in kwargs[statement_field] 2629 ] 2630 2631 # This is done to allow variables like @gateway to be used in these properties 2632 # since rendering shifted from load time to run time. 2633 # Note: we check for Tuple since that's what we expect from _resolve_properties 2634 for property_name in PROPERTIES: 2635 property_values = kwargs.get(property_name) 2636 if isinstance(property_values, exp.Tuple): 2637 statements.extend(property_values.expressions) 2638 2639 if isinstance(getattr(kwargs.get("kind"), "merge_filter", None), exp.Expr): 2640 statements.append(kwargs["kind"].merge_filter) 2641 2642 jinja_macro_references, referenced_variables = extract_macro_references_and_variables( 2643 *(gen(e if isinstance(e, exp.Expr) else e[0]) for e in statements) 2644 ) 2645 2646 if jinja_macros: 2647 jinja_macros = ( 2648 jinja_macros if jinja_macros.trimmed else jinja_macros.trim(jinja_macro_references) 2649 ) 2650 else: 2651 jinja_macros = JinjaMacroRegistry() 2652 2653 for jinja_macro in jinja_macros.root_macros.values(): 2654 referenced_variables.update( 2655 extract_macro_references_and_variables(jinja_macro.definition)[1] 2656 ) 2657 2658 # Merge model-specific audits with default audits 2659 if default_audits := defaults.pop("audits", None): 2660 kwargs["audits"] = default_audits + d.extract_function_calls(kwargs.pop("audits", [])) 2661 2662 model = klass( 2663 name=name, 2664 **{ 2665 **(defaults or {}), 2666 "jinja_macros": jinja_macros or JinjaMacroRegistry(), 2667 "dialect": dialect, 2668 "depends_on": depends_on, 2669 "physical_schema_override": physical_schema_override, 2670 **kwargs, 2671 }, 2672 ) 2673 2674 audit_definitions = { 2675 **(audit_definitions or {}), 2676 **(inline_audits or {}), 2677 } 2678 2679 used_audits: t.Set[str] = {audit_name for audit_name, _ in model.audits} 2680 2681 audit_definitions = { 2682 audit_name: audit_definitions[audit_name] 2683 for audit_name in used_audits 2684 if audit_name in audit_definitions 2685 } 2686 2687 model.audit_definitions.update(audit_definitions) 2688 2689 # Any macro referenced in audits or signals needs to be treated as metadata-only 2690 statements.extend((audit.query, True) for audit in audit_definitions.values()) # type: ignore[misc] 2691 2692 # Ensure that all audits referenced in the model are defined 2693 from sqlmesh.core.audit.builtin import BUILT_IN_AUDITS 2694 2695 available_audits = BUILT_IN_AUDITS.keys() | model.audit_definitions.keys() 2696 for referenced_audit, audit_args in model.audits: 2697 if referenced_audit not in available_audits: 2698 raise_config_error(f"Audit '{referenced_audit}' is undefined", location=path) 2699 2700 statements.extend( 2701 (audit_arg_expression, True) for audit_arg_expression in audit_args.values() 2702 ) 2703 2704 signal_definitions = signal_definitions or UniqueKeyDict("signals") 2705 2706 for referenced_signal, kwargs in model.signals: 2707 if referenced_signal and referenced_signal not in signal_definitions: 2708 raise_config_error(f"Signal '{referenced_signal}' is undefined", location=path) 2709 2710 statements.extend((signal_kwarg, True) for signal_kwarg in kwargs.values()) 2711 2712 python_env = make_python_env( 2713 statements, 2714 jinja_macro_references, 2715 module_path, 2716 macros or macro.get_registry(), 2717 variables=variables, 2718 referenced_variables=referenced_variables, 2719 path=path, 2720 python_env=python_env, 2721 strict_resolution=depends_on is None, 2722 blueprint_variables=blueprint_variables, 2723 dialect=dialect, 2724 ) 2725 2726 env: t.Dict[str, t.Tuple[t.Any, t.Optional[bool]]] = {} 2727 2728 for signal_name, _ in model.signals: 2729 if signal_name and signal_name in signal_definitions: 2730 func = signal_definitions[signal_name].func 2731 setattr(func, c.SQLMESH_METADATA, True) 2732 build_env(func, env=env, name=signal_name, path=module_path) 2733 2734 model.python_env.update(python_env) 2735 model.python_env.update(serialize_env(env, path=module_path)) 2736 model._path = path 2737 model.set_time_format(time_column_format) 2738 2739 return t.cast(Model, model) 2740 2741 2742INSERT_SEED_MACRO_CALL = d.parse_one("@INSERT_SEED()") 2743 2744 2745def _split_sql_model_statements( 2746 expressions: t.List[exp.Expr], 2747 path: t.Optional[Path], 2748 dialect: t.Optional[str] = None, 2749) -> t.Tuple[ 2750 t.Optional[exp.Expr], 2751 t.List[exp.Expr], 2752 t.List[exp.Expr], 2753 t.List[exp.Expr], 2754 UniqueKeyDict[str, ModelAudit], 2755]: 2756 """Extracts the SELECT query from a sequence of expressions. 2757 2758 Args: 2759 expressions: The list of all SQL statements in the model definition. 2760 2761 Returns: 2762 A tuple containing the extracted SELECT query or the `@INSERT_SEED()` call, the statements before the it, 2763 the statements after it, and the inline audit definitions. 2764 2765 Raises: 2766 ConfigError: If the model definition contains more than one SELECT query or `@INSERT_SEED()` call. 2767 """ 2768 from sqlmesh.core.audit import ModelAudit, load_audit 2769 2770 query_positions = [] 2771 sql_statements = [] 2772 on_virtual_update = [] 2773 inline_audits: UniqueKeyDict[str, ModelAudit] = UniqueKeyDict("inline_audits") 2774 2775 idx = 0 2776 length = len(expressions) 2777 while idx < length: 2778 expr = expressions[idx] 2779 2780 if isinstance(expr, d.Audit): 2781 loaded_audit = load_audit([expr, expressions[idx + 1]], dialect=dialect) 2782 assert isinstance(loaded_audit, ModelAudit) 2783 inline_audits[loaded_audit.name] = loaded_audit 2784 idx += 2 2785 elif isinstance(expr, d.VirtualUpdateStatement): 2786 for statement in expr.expressions: 2787 on_virtual_update.append(statement) 2788 idx += 1 2789 else: 2790 if ( 2791 isinstance(expr, (exp.Query, d.JinjaQuery)) 2792 or expr == INSERT_SEED_MACRO_CALL 2793 or ( 2794 isinstance(expr, d.MacroFunc) 2795 and (expr.this.name.lower() == "union" or length == 1) 2796 ) 2797 ): 2798 query_positions.append((expr, idx)) 2799 sql_statements.append(expr) 2800 idx += 1 2801 2802 if not query_positions: 2803 return None, sql_statements, [], on_virtual_update, inline_audits 2804 2805 if len(query_positions) > 1: 2806 raise_config_error("Only one SELECT query is allowed per model", path) 2807 2808 query, pos = query_positions[0] 2809 return query, sql_statements[:pos], sql_statements[pos + 1 :], on_virtual_update, inline_audits 2810 2811 2812def _resolve_properties( 2813 default: t.Optional[t.Dict[str, t.Any]], 2814 provided: t.Optional[exp.Expr | t.Dict[str, t.Any]], 2815) -> t.Optional[exp.Expr]: 2816 if isinstance(provided, dict): 2817 properties = {k: exp.Literal.string(k).eq(v) for k, v in provided.items()} 2818 elif provided: 2819 if isinstance(provided, exp.Paren): 2820 provided = exp.Tuple(expressions=[provided.this]) 2821 properties = {expr.this.name: expr for expr in provided} 2822 else: 2823 properties = {} 2824 2825 for k, v in (default or {}).items(): 2826 if k not in properties: 2827 properties[k] = exp.Literal.string(k).eq(v) 2828 elif properties[k].expression.sql().lower() in {"none", "null"}: 2829 del properties[k] 2830 2831 if properties: 2832 return exp.Tuple(expressions=list(properties.values())) 2833 2834 return None 2835 2836 2837def _list_of_calls_to_exp(value: t.List[t.Tuple[str, t.Dict[str, t.Any]]]) -> exp.Expr: 2838 return exp.Tuple( 2839 expressions=[ 2840 exp.Anonymous( 2841 this=v[0], 2842 expressions=[ 2843 exp.EQ(this=exp.convert(left), expression=exp.convert(right)) 2844 for left, right in v[1].items() 2845 ], 2846 ) 2847 for v in value 2848 ] 2849 ) 2850 2851 2852def _is_projection(expr: exp.Expr) -> bool: 2853 parent = expr.parent 2854 return isinstance(parent, exp.Select) and expr.arg_key == "expressions" 2855 2856 2857def _single_expr_or_tuple(values: t.Sequence[exp.Expr]) -> exp.Expr | exp.Tuple: 2858 return values[0] if len(values) == 1 else exp.Tuple(expressions=values) 2859 2860 2861def _refs_to_sql(values: t.Any) -> exp.Expr: 2862 return exp.Tuple(expressions=values) 2863 2864 2865def render_meta_fields( 2866 fields: t.Dict[str, t.Any], 2867 module_path: Path, 2868 path: t.Optional[Path], 2869 jinja_macros: t.Optional[JinjaMacroRegistry], 2870 macros: t.Optional[MacroRegistry], 2871 dialect: DialectType, 2872 variables: t.Optional[t.Dict[str, t.Any]], 2873 default_catalog: t.Optional[str], 2874 blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None, 2875) -> t.Dict[str, t.Any]: 2876 def render_field_value(value: t.Any) -> t.Any: 2877 if isinstance(value, exp.Expr) or (isinstance(value, str) and "@" in value): 2878 expression = exp.maybe_parse(value, dialect=dialect) 2879 rendered_expr = render_expression( 2880 expression=expression, 2881 module_path=module_path, 2882 macros=macros, 2883 jinja_macros=jinja_macros, 2884 variables=variables, 2885 path=path, 2886 dialect=dialect, 2887 default_catalog=default_catalog, 2888 blueprint_variables=blueprint_variables, 2889 ) 2890 if not rendered_expr: 2891 raise SQLMeshError( 2892 f"Rendering `{expression.sql(dialect=dialect)}` did not return an expression" 2893 ) 2894 2895 if len(rendered_expr) != 1: 2896 raise SQLMeshError( 2897 f"Rendering `{expression.sql(dialect=dialect)}` must return one result, but got {len(rendered_expr)}" 2898 ) 2899 2900 # For cases where a property is conditionally assigned 2901 if rendered_expr[0].sql().lower() in {"none", "null"}: 2902 return None 2903 2904 return rendered_expr[0] 2905 2906 return value 2907 2908 for field_name, field_info in ModelMeta.all_field_infos().items(): 2909 field = field_info.alias or field_name 2910 field_value = fields.get(field) 2911 2912 # We don't want to parse python model cron="@..." kwargs (e.g. @daily) into MacroVar 2913 if ( 2914 field == "cron" 2915 and isinstance(field_value, str) 2916 and field_value.lower() in CRON_SHORTCUTS 2917 ) or field_value is None: 2918 continue 2919 2920 if field in RUNTIME_RENDERED_MODEL_FIELDS: 2921 fields[field] = parse_strings_with_macro_refs(field_value, dialect) 2922 continue 2923 2924 if isinstance(field_value, dict): 2925 rendered_dict = {} 2926 for key, value in field_value.items(): 2927 if key in RUNTIME_RENDERED_MODEL_FIELDS: 2928 rendered_dict[key] = parse_strings_with_macro_refs(value, dialect) 2929 elif ( 2930 # don't parse kind auto_restatement_cron="@..." kwargs (e.g. @daily) into MacroVar 2931 key == "auto_restatement_cron" 2932 and isinstance(value, str) 2933 and value.lower() in CRON_SHORTCUTS 2934 ): 2935 rendered_dict[key] = value 2936 elif (rendered := render_field_value(value)) is not None: 2937 rendered_dict[key] = rendered 2938 2939 if rendered_dict: 2940 fields[field] = rendered_dict 2941 else: 2942 fields.pop(field) 2943 elif isinstance(field_value, list): 2944 rendered_list = [ 2945 rendered 2946 for value in field_value 2947 if (rendered := render_field_value(value)) is not None 2948 ] 2949 if rendered_list: 2950 fields[field] = rendered_list 2951 else: 2952 fields.pop(field) 2953 else: 2954 rendered_field = render_field_value(field_value) 2955 if rendered_field is not None: 2956 fields[field] = rendered_field 2957 else: 2958 fields.pop(field) 2959 2960 return fields 2961 2962 2963def render_model_defaults( 2964 defaults: t.Dict[str, t.Any], 2965 module_path: Path, 2966 path: t.Optional[Path], 2967 jinja_macros: t.Optional[JinjaMacroRegistry], 2968 macros: t.Optional[MacroRegistry], 2969 dialect: DialectType, 2970 variables: t.Optional[t.Dict[str, t.Any]], 2971 default_catalog: t.Optional[str], 2972) -> t.Dict[str, t.Any]: 2973 rendered_defaults = render_meta_fields( 2974 fields=defaults, 2975 module_path=module_path, 2976 macros=macros, 2977 jinja_macros=jinja_macros, 2978 variables=variables, 2979 path=path, 2980 dialect=dialect, 2981 default_catalog=default_catalog, 2982 ) 2983 2984 # Validate defaults that have macros are rendered to boolean 2985 for boolean in {"optimize_query", "allow_partials", "enabled"}: 2986 var = rendered_defaults.get(boolean) 2987 if var is not None and not isinstance(var, (exp.Boolean, bool)): 2988 raise ConfigError(f"Expected boolean for '{var}', got '{type(var)}' instead") 2989 2990 # Validate the 'interval_unit' if present is an Interval Unit 2991 var = rendered_defaults.get("interval_unit") 2992 if isinstance(var, str): 2993 try: 2994 rendered_defaults["interval_unit"] = IntervalUnit(var) 2995 except ValueError as e: 2996 raise ConfigError(f"Invalid interval unit: {var}") from e 2997 2998 return rendered_defaults 2999 3000 3001def parse_defaults_properties( 3002 defaults: t.Dict[str, t.Any], dialect: DialectType 3003) -> t.Dict[str, t.Any]: 3004 for prop in PROPERTIES: 3005 default_properties = defaults.get(prop) 3006 for key, value in (default_properties or {}).items(): 3007 if isinstance(key, str) and d.SQLMESH_MACRO_PREFIX in str(value): 3008 defaults[prop][key] = exp.maybe_parse(value, dialect=dialect) 3009 3010 return defaults 3011 3012 3013def render_expression( 3014 expression: exp.Expr, 3015 module_path: Path, 3016 path: t.Optional[Path], 3017 jinja_macros: t.Optional[JinjaMacroRegistry] = None, 3018 macros: t.Optional[MacroRegistry] = None, 3019 dialect: DialectType = None, 3020 variables: t.Optional[t.Dict[str, t.Any]] = None, 3021 default_catalog: t.Optional[str] = None, 3022 blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None, 3023) -> t.Optional[t.List[exp.Expr]]: 3024 meta_python_env = make_python_env( 3025 expressions=expression, 3026 jinja_macro_references=None, 3027 module_path=module_path, 3028 macros=macros or macro.get_registry(), 3029 variables=variables, 3030 path=path, 3031 blueprint_variables=blueprint_variables, 3032 ) 3033 return ExpressionRenderer( 3034 expression, 3035 dialect, 3036 [], 3037 path=path, 3038 jinja_macro_registry=jinja_macros, 3039 python_env=meta_python_env, 3040 default_catalog=default_catalog, 3041 quote_identifiers=False, 3042 normalize_identifiers=False, 3043 ).render() 3044 3045 3046META_FIELD_CONVERTER: t.Dict[str, t.Callable] = { 3047 "start": lambda value: exp.Literal.string(value), 3048 "cron": lambda value: exp.Literal.string(value), 3049 "cron_tz": lambda value: exp.Literal.string(value), 3050 "partitioned_by_": _single_expr_or_tuple, 3051 "clustered_by": _single_expr_or_tuple, 3052 "depends_on_": lambda value: exp.Tuple(expressions=sorted(value)), 3053 "pre": _list_of_calls_to_exp, 3054 "post": _list_of_calls_to_exp, 3055 "audits": _list_of_calls_to_exp, 3056 "columns_to_types_": lambda value: exp.Schema( 3057 expressions=[exp.ColumnDef(this=exp.to_column(c), kind=t) for c, t in value.items()] 3058 ), 3059 "column_descriptions_": lambda value: exp.Schema( 3060 expressions=[exp.to_column(c).eq(d) for c, d in value.items()] 3061 ), 3062 "tags": single_value_or_tuple, 3063 "grains": _refs_to_sql, 3064 "references": _refs_to_sql, 3065 "physical_properties_": lambda value: value, 3066 "virtual_properties_": lambda value: value, 3067 "session_properties_": lambda value: value, 3068 "allow_partials": exp.convert, 3069 "signals": lambda values: exp.tuple_( 3070 *( 3071 exp.func( 3072 name, *(exp.PropertyEQ(this=exp.var(k), expression=v) for k, v in args.items()) 3073 ) 3074 if name 3075 else exp.Tuple(expressions=[exp.var(k).eq(v) for k, v in args.items()]) 3076 for name, args in values 3077 ) 3078 ), 3079 "formatting": str, 3080 "optimize_query": str, 3081 "virtual_environment_mode": lambda value: exp.Literal.string(value.value), 3082 "dbt_node_info_": lambda value: value.to_expression(), 3083 "grants_": lambda value: value, 3084 "grants_target_layer": lambda value: exp.Literal.string(value.value), 3085} 3086 3087 3088def get_model_name(path: Path) -> str: 3089 path_parts = list(path.parts[path.parts.index("models") + 1 : -1]) + [path.stem] 3090 return ".".join(path_parts[-3:]) 3091 3092 3093# function applied to time column when automatically used for partitioning in INCREMENTAL_BY_TIME_RANGE models 3094def clickhouse_partition_func( 3095 column: exp.Expr, columns_to_types: t.Optional[t.Dict[str, exp.DataType]] 3096) -> exp.Expr: 3097 # `toMonday()` function accepts a Date or DateTime type column 3098 3099 col_type = (columns_to_types and columns_to_types.get(column.name)) or exp.DataType.build( 3100 "UNKNOWN" 3101 ) 3102 col_type_is_conformable = col_type.is_type( 3103 exp.DataType.Type.DATE, 3104 exp.DataType.Type.DATE32, 3105 exp.DataType.Type.DATETIME, 3106 exp.DataType.Type.DATETIME64, 3107 ) 3108 3109 # if input column is already a conformable type, just pass the column 3110 if col_type_is_conformable: 3111 return exp.func("toMonday", column, dialect="clickhouse") 3112 3113 # if input column type is not known, cast input to DateTime64 3114 if col_type.is_type(exp.DataType.Type.UNKNOWN): 3115 return exp.func( 3116 "toMonday", 3117 exp.cast(column, exp.DataType.build("DateTime64(9, 'UTC')", dialect="clickhouse")), 3118 dialect="clickhouse", 3119 ) 3120 3121 # if input column type is known but not conformable, cast input to DateTime64 and cast output back to original type 3122 return exp.cast( 3123 exp.func( 3124 "toMonday", 3125 exp.cast(column, exp.DataType.build("DateTime64(9, 'UTC')", dialect="clickhouse")), 3126 dialect="clickhouse", 3127 ), 3128 col_type, 3129 ) 3130 3131 3132TIME_COL_PARTITION_FUNC = {"clickhouse": clickhouse_partition_func}
1355class SqlModel(_Model): 1356 """The model definition which relies on a SQL query to fetch the data. 1357 1358 Args: 1359 query: The main query representing the model. 1360 pre_statements: The list of SQL statements that precede the model's query. 1361 post_statements: The list of SQL statements that follow after the model's query. 1362 on_virtual_update: The list of SQL statements to be executed after the virtual update. 1363 """ 1364 1365 query_: ParsableSql = Field(alias="query") 1366 source_type: t.Literal["sql"] = "sql" 1367 1368 _columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None 1369 1370 def __getstate__(self) -> t.Dict[t.Any, t.Any]: 1371 state = super().__getstate__() 1372 state["__dict__"] = state["__dict__"].copy() 1373 # query renderer is very expensive to serialize 1374 state["__dict__"].pop("_query_renderer", None) 1375 state["__dict__"].pop("column_descriptions", None) 1376 private = state[PRIVATE_FIELDS] 1377 private["_columns_to_types"] = None 1378 return state 1379 1380 def copy(self, **kwargs: t.Any) -> Self: 1381 model = super().copy(**kwargs) 1382 model.__dict__.pop("_query_renderer", None) 1383 model.__dict__.pop("column_descriptions", None) 1384 model._columns_to_types = None 1385 if kwargs.get("update", {}).keys() & {"depends_on_", "query"}: 1386 model._full_depends_on = None 1387 return model 1388 1389 @property 1390 def query(self) -> t.Union[exp.Query, d.JinjaQuery, d.MacroFunc]: 1391 parsed_query = self.query_.parse(self.dialect) 1392 return t.cast(t.Union[exp.Query, d.JinjaQuery, d.MacroFunc], parsed_query) 1393 1394 def render_query( 1395 self, 1396 *, 1397 start: t.Optional[TimeLike] = None, 1398 end: t.Optional[TimeLike] = None, 1399 execution_time: t.Optional[TimeLike] = None, 1400 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 1401 table_mapping: t.Optional[t.Dict[str, str]] = None, 1402 expand: t.Iterable[str] = tuple(), 1403 deployability_index: t.Optional[DeployabilityIndex] = None, 1404 engine_adapter: t.Optional[EngineAdapter] = None, 1405 **kwargs: t.Any, 1406 ) -> t.Optional[exp.Query]: 1407 query = self._query_renderer.render( 1408 start=start, 1409 end=end, 1410 execution_time=execution_time, 1411 snapshots=snapshots, 1412 table_mapping=table_mapping, 1413 expand=expand, 1414 deployability_index=deployability_index, 1415 engine_adapter=engine_adapter, 1416 **kwargs, 1417 ) 1418 1419 return query 1420 1421 def render_definition( 1422 self, 1423 include_python: bool = True, 1424 include_defaults: bool = False, 1425 render_query: bool = False, 1426 ) -> t.List[exp.Expr]: 1427 result: t.List[exp.Expr] = super().render_definition( 1428 include_python=include_python, include_defaults=include_defaults 1429 ) 1430 1431 if render_query: 1432 result.extend(self.render_pre_statements()) 1433 result.append(self.render_query() or self.query) 1434 result.extend(self.render_post_statements()) 1435 if virtual_update := self.render_on_virtual_update(): 1436 result.append(d.VirtualUpdateStatement(expressions=virtual_update)) 1437 else: 1438 result.extend(self.pre_statements) 1439 result.append(self.query) 1440 result.extend(self.post_statements) 1441 if self.on_virtual_update: 1442 result.append(d.VirtualUpdateStatement(expressions=self.on_virtual_update)) 1443 1444 return result 1445 1446 @property 1447 def is_sql(self) -> bool: 1448 return True 1449 1450 @property 1451 def columns_to_types(self) -> t.Optional[t.Dict[str, exp.DataType]]: 1452 if self.columns_to_types_ is not None: 1453 self._columns_to_types = self.columns_to_types_ 1454 elif self._columns_to_types is None: 1455 try: 1456 query = self._query_renderer.render() 1457 except Exception: 1458 logger.exception("Failed to render query for model %s", self.fqn) 1459 return None 1460 1461 if query is None: 1462 return None 1463 1464 unknown = exp.DataType.build("unknown") 1465 1466 columns_to_types = {} 1467 for select in query.selects: 1468 output_name = select.output_name 1469 1470 # If model validation is disabled, we cannot assume that projections 1471 # will have inferrable output names or even that they will be unique 1472 if not output_name or output_name in columns_to_types: 1473 return None 1474 1475 # copy data type because it is used in the engine to build CTAS and other queries 1476 # this can change the parent which will mess up the diffing algo 1477 columns_to_types[output_name] = (select.type or unknown).copy() 1478 1479 self._columns_to_types = columns_to_types 1480 1481 if "*" in self._columns_to_types: 1482 return None 1483 1484 return {**self._columns_to_types, **self.managed_columns} 1485 1486 @cached_property 1487 def column_descriptions(self) -> t.Dict[str, str]: 1488 if self.column_descriptions_ is not None: 1489 return self.column_descriptions_ 1490 1491 query = self.render_query() 1492 if query is None: 1493 return {} 1494 1495 return { 1496 select.alias_or_name: select.comments[-1].strip() 1497 for select in query.selects 1498 if select.comments 1499 } 1500 1501 def set_mapping_schema(self, schema: t.Dict) -> None: 1502 super().set_mapping_schema(schema) 1503 self._on_mapping_schema_set() 1504 1505 def update_schema(self, schema: MappingSchema) -> None: 1506 super().update_schema(schema) 1507 self._on_mapping_schema_set() 1508 1509 def _on_mapping_schema_set(self) -> None: 1510 self._columns_to_types = None 1511 self._query_renderer.update_schema(self.mapping_schema) 1512 1513 def validate_definition(self) -> None: 1514 query = self._query_renderer.render() 1515 if query is None: 1516 if self.depends_on_ is None: 1517 raise_config_error( 1518 "Dependencies must be provided explicitly for models that can be rendered only at runtime", 1519 self._path, 1520 ) 1521 return 1522 1523 if not isinstance(query, exp.Query): 1524 raise_config_error("Missing SELECT query in the model definition", self._path) 1525 1526 projection_list = query.selects 1527 if not projection_list: 1528 raise_config_error("Query missing select statements", self._path) 1529 1530 if self.depends_on_self and not self.annotated: 1531 raise_config_error( 1532 "Self-referencing models require inferrable column types. There are three options available to mitigate this issue: add explicit types to all projections in the outermost SELECT statement, leverage external models (https://sqlmesh.readthedocs.io/en/stable/concepts/models/external_models/), or use the `columns` model attribute (https://sqlmesh.readthedocs.io/en/stable/concepts/models/overview/#columns).", 1533 self._path, 1534 ) 1535 1536 super().validate_definition() 1537 1538 def is_breaking_change(self, previous: Model) -> t.Optional[bool]: 1539 if not isinstance(previous, SqlModel): 1540 return None 1541 1542 if self.lookback != previous.lookback: 1543 return None 1544 1545 try: 1546 # the previous model which comes from disk could be unrenderable 1547 previous_query = previous.render_query() 1548 except Exception: 1549 previous_query = None 1550 this_query = self.render_query() 1551 1552 if previous_query is None or this_query is None: 1553 # Can't determine if there's a breaking change if we can't render the query. 1554 return None 1555 1556 if previous_query is this_query: 1557 edits = [] 1558 else: 1559 edits = diff( 1560 previous_query, 1561 this_query, 1562 matchings=[(previous_query, this_query)], 1563 delta_only=True, 1564 dialect=self.dialect if self.dialect == previous.dialect else None, 1565 ) 1566 inserted_expressions = {e.expression for e in edits if isinstance(e, Insert)} 1567 1568 for edit in edits: 1569 if not isinstance(edit, Insert): 1570 return None 1571 1572 expr = edit.expression 1573 if isinstance(expr, exp.UDTF): 1574 # projection subqueries do not change cardinality, engines don't allow these to return 1575 # more than one row of data 1576 parent = expr.find_ancestor(exp.Subquery) 1577 1578 if not parent: 1579 return None 1580 1581 expr = parent 1582 1583 if not _is_projection(expr) and expr.parent not in inserted_expressions: 1584 return None 1585 1586 return False 1587 1588 def is_metadata_only_change(self, previous: _Node) -> bool: 1589 if self._is_metadata_only_change_cache.get(id(previous), None) is not None: 1590 return self._is_metadata_only_change_cache[id(previous)] 1591 1592 if not super().is_metadata_only_change(previous): 1593 return False 1594 1595 if not isinstance(previous, SqlModel): 1596 self._is_metadata_only_change_cache[id(previous)] = False 1597 return False 1598 1599 this_rendered_query = self.render_query() or self.query 1600 previous_rendered_query = previous.render_query() or previous.query 1601 is_metadata_change = this_rendered_query == previous_rendered_query 1602 1603 self._is_metadata_only_change_cache[id(previous)] = is_metadata_change 1604 return is_metadata_change 1605 1606 @cached_property 1607 def _query_renderer(self) -> QueryRenderer: 1608 no_quote_identifiers = self.kind.is_view and self.dialect in ("trino", "spark") 1609 return QueryRenderer( 1610 self.query, 1611 self.dialect, 1612 self.macro_definitions, 1613 schema=self.mapping_schema, 1614 path=self._path, 1615 jinja_macro_registry=self.jinja_macros, 1616 python_env=self.python_env, 1617 only_execution_time=self.kind.only_execution_time, 1618 default_catalog=self.default_catalog, 1619 quote_identifiers=not no_quote_identifiers, 1620 optimize_query=self.optimize_query, 1621 model=self, 1622 ) 1623 1624 @property 1625 def _data_hash_values_no_sql(self) -> t.List[str]: 1626 return [ 1627 *super()._data_hash_values_no_sql, 1628 *self.jinja_macros.data_hash_values, 1629 ] 1630 1631 @property 1632 def _data_hash_values_sql(self) -> t.List[str]: 1633 return [ 1634 *super()._data_hash_values_sql, 1635 self.query_.sql, 1636 ] 1637 1638 @property 1639 def _additional_metadata(self) -> t.List[str]: 1640 return [*super()._additional_metadata, self.query_.sql] 1641 1642 @property 1643 def violated_rules_for_query(self) -> t.Dict[type[Rule], t.Any]: 1644 self.render_query() 1645 return self._query_renderer._violated_rules
The model definition which relies on a SQL query to fetch the data.
Arguments:
- query: The main query representing the model.
- pre_statements: The list of SQL statements that precede the model's query.
- post_statements: The list of SQL statements that follow after the model's query.
- on_virtual_update: The list of SQL statements to be executed after the virtual update.
1380 def copy(self, **kwargs: t.Any) -> Self: 1381 model = super().copy(**kwargs) 1382 model.__dict__.pop("_query_renderer", None) 1383 model.__dict__.pop("column_descriptions", None) 1384 model._columns_to_types = None 1385 if kwargs.get("update", {}).keys() & {"depends_on_", "query"}: 1386 model._full_depends_on = None 1387 return model
Returns a copy of the model.
!!! warning "Deprecated"
This method is now deprecated; use model_copy instead.
If you need include or exclude, use:
python {test="skip" lint="skip"}
data = self.model_dump(include=include, exclude=exclude, round_trip=True)
data = {**data, **(update or {})}
copied = self.model_validate(data)
Arguments:
- include: Optional set or mapping specifying which fields to include in the copied model.
- exclude: Optional set or mapping specifying which fields to exclude in the copied model.
- update: Optional dictionary of field-value pairs to override field values in the copied model.
- deep: If True, the values of fields that are Pydantic models will be deep-copied.
Returns:
A copy of the model with included, excluded and updated fields as specified.
1394 def render_query( 1395 self, 1396 *, 1397 start: t.Optional[TimeLike] = None, 1398 end: t.Optional[TimeLike] = None, 1399 execution_time: t.Optional[TimeLike] = None, 1400 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 1401 table_mapping: t.Optional[t.Dict[str, str]] = None, 1402 expand: t.Iterable[str] = tuple(), 1403 deployability_index: t.Optional[DeployabilityIndex] = None, 1404 engine_adapter: t.Optional[EngineAdapter] = None, 1405 **kwargs: t.Any, 1406 ) -> t.Optional[exp.Query]: 1407 query = self._query_renderer.render( 1408 start=start, 1409 end=end, 1410 execution_time=execution_time, 1411 snapshots=snapshots, 1412 table_mapping=table_mapping, 1413 expand=expand, 1414 deployability_index=deployability_index, 1415 engine_adapter=engine_adapter, 1416 **kwargs, 1417 ) 1418 1419 return query
Renders a model's query, expanding macros with provided kwargs, and optionally expanding referenced models.
Arguments:
- start: The start datetime to render. Defaults to epoch start.
- end: The end datetime to render. Defaults to epoch start.
- execution_time: The date/time time reference to use for execution time.
- snapshots: All upstream snapshots (by name) to use for expansion and mapping of physical locations.
- table_mapping: Table mapping of physical locations. Takes precedence over snapshot mappings.
- expand: Expand referenced models as subqueries. This is used to bypass backfills when running queries that depend on materialized tables. Model definitions are inlined and can thus be run end to end on the fly.
- deployability_index: Determines snapshots that are deployable in the context of this render.
- kwargs: Additional kwargs to pass to the renderer.
Returns:
The rendered expression.
1421 def render_definition( 1422 self, 1423 include_python: bool = True, 1424 include_defaults: bool = False, 1425 render_query: bool = False, 1426 ) -> t.List[exp.Expr]: 1427 result: t.List[exp.Expr] = super().render_definition( 1428 include_python=include_python, include_defaults=include_defaults 1429 ) 1430 1431 if render_query: 1432 result.extend(self.render_pre_statements()) 1433 result.append(self.render_query() or self.query) 1434 result.extend(self.render_post_statements()) 1435 if virtual_update := self.render_on_virtual_update(): 1436 result.append(d.VirtualUpdateStatement(expressions=virtual_update)) 1437 else: 1438 result.extend(self.pre_statements) 1439 result.append(self.query) 1440 result.extend(self.post_statements) 1441 if self.on_virtual_update: 1442 result.append(d.VirtualUpdateStatement(expressions=self.on_virtual_update)) 1443 1444 return result
Returns the original list of sql expressions comprising the model definition.
Arguments:
- include_python: Whether or not to include Python code in the rendered definition.
1450 @property 1451 def columns_to_types(self) -> t.Optional[t.Dict[str, exp.DataType]]: 1452 if self.columns_to_types_ is not None: 1453 self._columns_to_types = self.columns_to_types_ 1454 elif self._columns_to_types is None: 1455 try: 1456 query = self._query_renderer.render() 1457 except Exception: 1458 logger.exception("Failed to render query for model %s", self.fqn) 1459 return None 1460 1461 if query is None: 1462 return None 1463 1464 unknown = exp.DataType.build("unknown") 1465 1466 columns_to_types = {} 1467 for select in query.selects: 1468 output_name = select.output_name 1469 1470 # If model validation is disabled, we cannot assume that projections 1471 # will have inferrable output names or even that they will be unique 1472 if not output_name or output_name in columns_to_types: 1473 return None 1474 1475 # copy data type because it is used in the engine to build CTAS and other queries 1476 # this can change the parent which will mess up the diffing algo 1477 columns_to_types[output_name] = (select.type or unknown).copy() 1478 1479 self._columns_to_types = columns_to_types 1480 1481 if "*" in self._columns_to_types: 1482 return None 1483 1484 return {**self._columns_to_types, **self.managed_columns}
Returns the mapping of column names to types of this model.
1486 @cached_property 1487 def column_descriptions(self) -> t.Dict[str, str]: 1488 if self.column_descriptions_ is not None: 1489 return self.column_descriptions_ 1490 1491 query = self.render_query() 1492 if query is None: 1493 return {} 1494 1495 return { 1496 select.alias_or_name: select.comments[-1].strip() 1497 for select in query.selects 1498 if select.comments 1499 }
A dictionary of column names to annotation comments.
1505 def update_schema(self, schema: MappingSchema) -> None: 1506 super().update_schema(schema) 1507 self._on_mapping_schema_set()
Updates the schema for this model's dependencies based on the given mapping schema.
1513 def validate_definition(self) -> None: 1514 query = self._query_renderer.render() 1515 if query is None: 1516 if self.depends_on_ is None: 1517 raise_config_error( 1518 "Dependencies must be provided explicitly for models that can be rendered only at runtime", 1519 self._path, 1520 ) 1521 return 1522 1523 if not isinstance(query, exp.Query): 1524 raise_config_error("Missing SELECT query in the model definition", self._path) 1525 1526 projection_list = query.selects 1527 if not projection_list: 1528 raise_config_error("Query missing select statements", self._path) 1529 1530 if self.depends_on_self and not self.annotated: 1531 raise_config_error( 1532 "Self-referencing models require inferrable column types. There are three options available to mitigate this issue: add explicit types to all projections in the outermost SELECT statement, leverage external models (https://sqlmesh.readthedocs.io/en/stable/concepts/models/external_models/), or use the `columns` model attribute (https://sqlmesh.readthedocs.io/en/stable/concepts/models/overview/#columns).", 1533 self._path, 1534 ) 1535 1536 super().validate_definition()
Validates the model's definition.
Raises:
- ConfigError
1538 def is_breaking_change(self, previous: Model) -> t.Optional[bool]: 1539 if not isinstance(previous, SqlModel): 1540 return None 1541 1542 if self.lookback != previous.lookback: 1543 return None 1544 1545 try: 1546 # the previous model which comes from disk could be unrenderable 1547 previous_query = previous.render_query() 1548 except Exception: 1549 previous_query = None 1550 this_query = self.render_query() 1551 1552 if previous_query is None or this_query is None: 1553 # Can't determine if there's a breaking change if we can't render the query. 1554 return None 1555 1556 if previous_query is this_query: 1557 edits = [] 1558 else: 1559 edits = diff( 1560 previous_query, 1561 this_query, 1562 matchings=[(previous_query, this_query)], 1563 delta_only=True, 1564 dialect=self.dialect if self.dialect == previous.dialect else None, 1565 ) 1566 inserted_expressions = {e.expression for e in edits if isinstance(e, Insert)} 1567 1568 for edit in edits: 1569 if not isinstance(edit, Insert): 1570 return None 1571 1572 expr = edit.expression 1573 if isinstance(expr, exp.UDTF): 1574 # projection subqueries do not change cardinality, engines don't allow these to return 1575 # more than one row of data 1576 parent = expr.find_ancestor(exp.Subquery) 1577 1578 if not parent: 1579 return None 1580 1581 expr = parent 1582 1583 if not _is_projection(expr) and expr.parent not in inserted_expressions: 1584 return None 1585 1586 return False
Determines whether this model is a breaking change in relation to the previous model.
Arguments:
- previous: The previous model to compare against.
Returns:
True if this model instance represents a breaking change, False if it's a non-breaking change and None if the nature of the change can't be determined.
1588 def is_metadata_only_change(self, previous: _Node) -> bool: 1589 if self._is_metadata_only_change_cache.get(id(previous), None) is not None: 1590 return self._is_metadata_only_change_cache[id(previous)] 1591 1592 if not super().is_metadata_only_change(previous): 1593 return False 1594 1595 if not isinstance(previous, SqlModel): 1596 self._is_metadata_only_change_cache[id(previous)] = False 1597 return False 1598 1599 this_rendered_query = self.render_query() or self.query 1600 previous_rendered_query = previous.render_query() or previous.query 1601 is_metadata_change = this_rendered_query == previous_rendered_query 1602 1603 self._is_metadata_only_change_cache[id(previous)] = is_metadata_change 1604 return is_metadata_change
Determines if this node is a metadata only change in relation to the previous node.
Arguments:
- previous: The previous node to compare against.
Returns:
True if this node is a metadata only change, False otherwise.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
358def init_private_attributes(self: BaseModel, context: Any, /) -> None: 359 """This function is meant to behave like a BaseModel method to initialise private attributes. 360 361 It takes context as an argument since that's what pydantic-core passes when calling it. 362 363 Args: 364 self: The BaseModel instance. 365 context: The context. 366 """ 367 if getattr(self, '__pydantic_private__', None) is None: 368 pydantic_private = {} 369 for name, private_attr in self.__private_attributes__.items(): 370 default = private_attr.get_default() 371 if default is not PydanticUndefined: 372 pydantic_private[name] = default 373 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
- _Model
- python_env
- jinja_macros
- audit_definitions
- mapping_schema
- extract_dependencies_from_query
- pre_statements_
- post_statements_
- on_virtual_update_
- render
- render_query_or_raise
- render_pre_statements
- render_post_statements
- render_on_virtual_update
- render_audit_query
- pre_statements
- post_statements
- on_virtual_update
- macro_definitions
- render_signals
- render_signal_calls
- render_merge_filter
- render_physical_properties
- render_virtual_properties
- render_session_properties
- ctas_query
- text_diff
- set_time_format
- convert_to_time_column
- depends_on
- columns_to_types_or_raise
- annotated
- sorted_python_env
- view_name
- schema_name
- physical_schema
- is_python
- is_seed
- depends_on_self
- forward_only
- disable_restatement
- auto_restatement_intervals
- auto_restatement_cron
- auto_restatement_croniter
- wap_supported
- data_hash
- audit_metadata_hash
- metadata_hash
- is_model
- grants_table_type
- full_depends_on
- partitioned_by
- partition_interval_unit
- audits_with_args
- sqlmesh.core.model.meta.ModelMeta
- dialect
- name
- kind
- retention
- table_format
- storage_format
- partitioned_by_
- clustered_by
- default_catalog
- depends_on_
- columns_to_types_
- column_descriptions_
- audits
- grains
- references
- physical_schema_override
- physical_properties_
- virtual_properties_
- session_properties_
- allow_partials
- signals
- enabled
- physical_version
- gateway
- optimize_query
- ignored_rules_
- formatting
- virtual_environment_mode
- grants_
- grants_target_layer
- ignored_rules_validator
- session_properties_validator
- time_column
- unique_key
- lookback
- lookback_start
- batch_size
- batch_concurrency
- physical_properties
- virtual_properties
- session_properties
- custom_materialization_properties
- grants
- all_references
- on
- managed_columns
- when_matched
- merge_filter
- catalog
- fully_qualified_table
- fqn
- on_destructive_change
- on_additive_change
- ignored_rules
1648class SeedModel(_Model): 1649 """The model definition which uses a pre-built static dataset to source the data from. 1650 1651 Args: 1652 seed: The content of a pre-built static dataset. 1653 """ 1654 1655 kind: SeedKind 1656 seed: Seed 1657 column_hashes_: t.Optional[t.Dict[str, str]] = Field(default=None, alias="column_hashes") 1658 derived_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None 1659 is_hydrated: bool = True 1660 source_type: t.Literal["seed"] = "seed" 1661 1662 def __getstate__(self) -> t.Dict[t.Any, t.Any]: 1663 state = super().__getstate__() 1664 state["__dict__"] = state["__dict__"].copy() 1665 state["__dict__"].pop("_reader", None) 1666 return state 1667 1668 def copy(self, **kwargs: t.Any) -> Self: 1669 model = super().copy(**kwargs) 1670 model.__dict__.pop("_reader", None) 1671 return model 1672 1673 def render( 1674 self, 1675 *, 1676 context: ExecutionContext, 1677 start: t.Optional[TimeLike] = None, 1678 end: t.Optional[TimeLike] = None, 1679 execution_time: t.Optional[TimeLike] = None, 1680 **kwargs: t.Any, 1681 ) -> t.Iterator[QueryOrDF]: 1682 if not self.is_hydrated: 1683 return 1684 yield from self.render_seed() 1685 1686 def render_seed(self) -> t.Iterator[QueryOrDF]: 1687 import numpy as np 1688 1689 self._ensure_hydrated() 1690 1691 date_columns = [] 1692 datetime_columns = [] 1693 bool_columns = [] 1694 string_columns = [] 1695 1696 columns_to_types = self.columns_to_types_ or {} 1697 column_names_to_check = set(columns_to_types) 1698 for name, tpe in columns_to_types.items(): 1699 if tpe.this in (exp.DataType.Type.DATE, exp.DataType.Type.DATE32): 1700 date_columns.append(name) 1701 elif tpe.this in exp.DataType.TEMPORAL_TYPES: 1702 datetime_columns.append(name) 1703 elif tpe.is_type("boolean"): 1704 bool_columns.append(name) 1705 elif tpe.this in exp.DataType.TEXT_TYPES: 1706 string_columns.append(name) 1707 1708 for df in self._reader.read(batch_size=self.kind.batch_size): 1709 rename_dict = {} 1710 for column in columns_to_types: 1711 if column not in df: 1712 normalized_name = normalize_identifiers(column, dialect=self.dialect).name 1713 if normalized_name in df: 1714 rename_dict[normalized_name] = column 1715 if rename_dict: 1716 df.rename(columns=rename_dict, inplace=True) 1717 # These names have already been checked 1718 column_names_to_check -= set(rename_dict) 1719 1720 missing_columns = column_names_to_check - set(df.columns) 1721 if missing_columns: 1722 raise_config_error( 1723 f"Seed model '{self.name}' has missing columns: {missing_columns}", self._path 1724 ) 1725 1726 # convert all date/time types to native pandas timestamp 1727 for column in [*date_columns, *datetime_columns]: 1728 import pandas as pd 1729 1730 df[column] = pd.to_datetime(df[column], infer_datetime_format=True, errors="ignore") # type: ignore 1731 1732 # extract datetime.date from pandas timestamp for DATE columns 1733 for column in date_columns: 1734 try: 1735 df[column] = df[column].dt.date 1736 except Exception as ex: 1737 logger.error( 1738 "Failed to convert column '%s' to date in seed model '%s': %s", 1739 column, 1740 self.name, 1741 ex, 1742 ) 1743 1744 for column in bool_columns: 1745 df[column] = df[column].apply(lambda i: str_to_bool(str(i))) 1746 1747 df.loc[:, string_columns] = df[string_columns].mask( 1748 cond=lambda x: x.notna(), # type: ignore 1749 other=df[string_columns].astype(str), # type: ignore 1750 ) 1751 yield df.replace({np.nan: None}) 1752 1753 @property 1754 def columns_to_types(self) -> t.Optional[t.Dict[str, exp.DataType]]: 1755 if self.columns_to_types_ is not None: 1756 return self.columns_to_types_ 1757 if self.derived_columns_to_types is not None: 1758 return self.derived_columns_to_types 1759 if self.is_hydrated: 1760 return self._reader.columns_to_types 1761 return None 1762 1763 @property 1764 def column_hashes(self) -> t.Dict[str, str]: 1765 if self.column_hashes_ is not None: 1766 return self.column_hashes_ 1767 self._ensure_hydrated() 1768 return self._reader.column_hashes 1769 1770 @property 1771 def is_seed(self) -> bool: 1772 return True 1773 1774 @property 1775 def seed_path(self) -> Path: 1776 seed_path = Path(self.kind.path) 1777 if not seed_path.is_absolute(): 1778 if self._path is None: 1779 raise SQLMeshError(f"Seed model '{self.name}' has no path") 1780 return self._path.parent / seed_path 1781 return seed_path 1782 1783 @property 1784 def depends_on(self) -> t.Set[str]: 1785 return (self.depends_on_ or set()) - {self.fqn} 1786 1787 @property 1788 def depends_on_self(self) -> bool: 1789 return False 1790 1791 @property 1792 def batch_size(self) -> t.Optional[int]: 1793 # Unlike other model kinds, the batch size provided in the SEED kind represents the 1794 # maximum number of rows to insert in a single batch. 1795 # We should never batch intervals for seed models. 1796 return None 1797 1798 def to_dehydrated(self) -> SeedModel: 1799 """Creates a dehydrated copy of this model. 1800 1801 The dehydrated seed model will not contain the seed content, but will contain 1802 the column hashes. This is useful for comparing two seed models without 1803 having to read the seed content from disk. 1804 1805 Returns: 1806 A dehydrated copy of this model. 1807 """ 1808 if not self.is_hydrated: 1809 return self 1810 1811 return self.copy( 1812 update={ 1813 "seed": Seed(content=""), 1814 "is_hydrated": False, 1815 "column_hashes_": self.column_hashes, 1816 "derived_columns_to_types": self.columns_to_types 1817 if self.columns_to_types_ is None 1818 else None, 1819 } 1820 ) 1821 1822 def to_hydrated(self, content: str) -> SeedModel: 1823 """Creates a hydrated copy of this model with the given seed content. 1824 1825 Returns: 1826 A hydrated copy of this model. 1827 """ 1828 if self.is_hydrated: 1829 return self 1830 1831 return self.copy( 1832 update={ 1833 "seed": Seed(content=content), 1834 "is_hydrated": True, 1835 "column_hashes_": None, 1836 }, 1837 ) 1838 1839 def is_breaking_change(self, previous: Model) -> t.Optional[bool]: 1840 if not isinstance(previous, SeedModel): 1841 return None 1842 1843 new_columns = set(self.column_hashes) 1844 old_columns = set(previous.column_hashes) 1845 1846 if not new_columns.issuperset(old_columns): 1847 return None 1848 1849 for col in old_columns: 1850 if self.column_hashes[col] != previous.column_hashes[col]: 1851 return None 1852 1853 return False 1854 1855 def _ensure_hydrated(self) -> None: 1856 if not self.is_hydrated: 1857 raise SQLMeshError(f"Seed model '{self.name}' is not hydrated.") 1858 1859 @cached_property 1860 def _reader(self) -> CsvSeedReader: 1861 return self.seed.reader(dialect=self.dialect, settings=self.kind.csv_settings) 1862 1863 @property 1864 def _data_hash_values_no_sql(self) -> t.List[str]: 1865 data = super()._data_hash_values_no_sql 1866 for column_name, column_hash in self.column_hashes.items(): 1867 data.append(column_name) 1868 data.append(column_hash) 1869 1870 # Include grants in data hash for seed models to force recreation on grant changes 1871 # since seed models don't support migration 1872 data.append(json.dumps(self.grants, sort_keys=True) if self.grants else "") 1873 data.append(self.grants_target_layer) 1874 1875 return data
The model definition which uses a pre-built static dataset to source the data from.
Arguments:
- seed: The content of a pre-built static dataset.
1668 def copy(self, **kwargs: t.Any) -> Self: 1669 model = super().copy(**kwargs) 1670 model.__dict__.pop("_reader", None) 1671 return model
Returns a copy of the model.
!!! warning "Deprecated"
This method is now deprecated; use model_copy instead.
If you need include or exclude, use:
python {test="skip" lint="skip"}
data = self.model_dump(include=include, exclude=exclude, round_trip=True)
data = {**data, **(update or {})}
copied = self.model_validate(data)
Arguments:
- include: Optional set or mapping specifying which fields to include in the copied model.
- exclude: Optional set or mapping specifying which fields to exclude in the copied model.
- update: Optional dictionary of field-value pairs to override field values in the copied model.
- deep: If True, the values of fields that are Pydantic models will be deep-copied.
Returns:
A copy of the model with included, excluded and updated fields as specified.
1673 def render( 1674 self, 1675 *, 1676 context: ExecutionContext, 1677 start: t.Optional[TimeLike] = None, 1678 end: t.Optional[TimeLike] = None, 1679 execution_time: t.Optional[TimeLike] = None, 1680 **kwargs: t.Any, 1681 ) -> t.Iterator[QueryOrDF]: 1682 if not self.is_hydrated: 1683 return 1684 yield from self.render_seed()
Renders the content of this model in a form of either a SELECT query, executing which the data for this model can be fetched, or a dataframe object which contains the data itself.
The type of the returned object (query or dataframe) depends on whether the model was sourced from a SQL query, a Python script or a pre-built dataset (seed).
Arguments:
- context: The execution context used for fetching data.
- start: The start date/time of the run.
- end: The end date/time of the run.
- execution_time: The date/time time reference to use for execution time.
Returns:
A generator which yields either a query object or one of the supported dataframe objects.
1686 def render_seed(self) -> t.Iterator[QueryOrDF]: 1687 import numpy as np 1688 1689 self._ensure_hydrated() 1690 1691 date_columns = [] 1692 datetime_columns = [] 1693 bool_columns = [] 1694 string_columns = [] 1695 1696 columns_to_types = self.columns_to_types_ or {} 1697 column_names_to_check = set(columns_to_types) 1698 for name, tpe in columns_to_types.items(): 1699 if tpe.this in (exp.DataType.Type.DATE, exp.DataType.Type.DATE32): 1700 date_columns.append(name) 1701 elif tpe.this in exp.DataType.TEMPORAL_TYPES: 1702 datetime_columns.append(name) 1703 elif tpe.is_type("boolean"): 1704 bool_columns.append(name) 1705 elif tpe.this in exp.DataType.TEXT_TYPES: 1706 string_columns.append(name) 1707 1708 for df in self._reader.read(batch_size=self.kind.batch_size): 1709 rename_dict = {} 1710 for column in columns_to_types: 1711 if column not in df: 1712 normalized_name = normalize_identifiers(column, dialect=self.dialect).name 1713 if normalized_name in df: 1714 rename_dict[normalized_name] = column 1715 if rename_dict: 1716 df.rename(columns=rename_dict, inplace=True) 1717 # These names have already been checked 1718 column_names_to_check -= set(rename_dict) 1719 1720 missing_columns = column_names_to_check - set(df.columns) 1721 if missing_columns: 1722 raise_config_error( 1723 f"Seed model '{self.name}' has missing columns: {missing_columns}", self._path 1724 ) 1725 1726 # convert all date/time types to native pandas timestamp 1727 for column in [*date_columns, *datetime_columns]: 1728 import pandas as pd 1729 1730 df[column] = pd.to_datetime(df[column], infer_datetime_format=True, errors="ignore") # type: ignore 1731 1732 # extract datetime.date from pandas timestamp for DATE columns 1733 for column in date_columns: 1734 try: 1735 df[column] = df[column].dt.date 1736 except Exception as ex: 1737 logger.error( 1738 "Failed to convert column '%s' to date in seed model '%s': %s", 1739 column, 1740 self.name, 1741 ex, 1742 ) 1743 1744 for column in bool_columns: 1745 df[column] = df[column].apply(lambda i: str_to_bool(str(i))) 1746 1747 df.loc[:, string_columns] = df[string_columns].mask( 1748 cond=lambda x: x.notna(), # type: ignore 1749 other=df[string_columns].astype(str), # type: ignore 1750 ) 1751 yield df.replace({np.nan: None})
1753 @property 1754 def columns_to_types(self) -> t.Optional[t.Dict[str, exp.DataType]]: 1755 if self.columns_to_types_ is not None: 1756 return self.columns_to_types_ 1757 if self.derived_columns_to_types is not None: 1758 return self.derived_columns_to_types 1759 if self.is_hydrated: 1760 return self._reader.columns_to_types 1761 return None
Returns the mapping of column names to types of this model.
1783 @property 1784 def depends_on(self) -> t.Set[str]: 1785 return (self.depends_on_ or set()) - {self.fqn}
All of the upstream dependencies referenced in the model's query, excluding self references.
Returns:
A list of all the upstream table names.
1791 @property 1792 def batch_size(self) -> t.Optional[int]: 1793 # Unlike other model kinds, the batch size provided in the SEED kind represents the 1794 # maximum number of rows to insert in a single batch. 1795 # We should never batch intervals for seed models. 1796 return None
The maximal number of units in a single task for a backfill.
1798 def to_dehydrated(self) -> SeedModel: 1799 """Creates a dehydrated copy of this model. 1800 1801 The dehydrated seed model will not contain the seed content, but will contain 1802 the column hashes. This is useful for comparing two seed models without 1803 having to read the seed content from disk. 1804 1805 Returns: 1806 A dehydrated copy of this model. 1807 """ 1808 if not self.is_hydrated: 1809 return self 1810 1811 return self.copy( 1812 update={ 1813 "seed": Seed(content=""), 1814 "is_hydrated": False, 1815 "column_hashes_": self.column_hashes, 1816 "derived_columns_to_types": self.columns_to_types 1817 if self.columns_to_types_ is None 1818 else None, 1819 } 1820 )
Creates a dehydrated copy of this model.
The dehydrated seed model will not contain the seed content, but will contain the column hashes. This is useful for comparing two seed models without having to read the seed content from disk.
Returns:
A dehydrated copy of this model.
1822 def to_hydrated(self, content: str) -> SeedModel: 1823 """Creates a hydrated copy of this model with the given seed content. 1824 1825 Returns: 1826 A hydrated copy of this model. 1827 """ 1828 if self.is_hydrated: 1829 return self 1830 1831 return self.copy( 1832 update={ 1833 "seed": Seed(content=content), 1834 "is_hydrated": True, 1835 "column_hashes_": None, 1836 }, 1837 )
Creates a hydrated copy of this model with the given seed content.
Returns:
A hydrated copy of this model.
1839 def is_breaking_change(self, previous: Model) -> t.Optional[bool]: 1840 if not isinstance(previous, SeedModel): 1841 return None 1842 1843 new_columns = set(self.column_hashes) 1844 old_columns = set(previous.column_hashes) 1845 1846 if not new_columns.issuperset(old_columns): 1847 return None 1848 1849 for col in old_columns: 1850 if self.column_hashes[col] != previous.column_hashes[col]: 1851 return None 1852 1853 return False
Determines whether this model is a breaking change in relation to the previous model.
Arguments:
- previous: The previous model to compare against.
Returns:
True if this model instance represents a breaking change, False if it's a non-breaking change and None if the nature of the change can't be determined.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
358def init_private_attributes(self: BaseModel, context: Any, /) -> None: 359 """This function is meant to behave like a BaseModel method to initialise private attributes. 360 361 It takes context as an argument since that's what pydantic-core passes when calling it. 362 363 Args: 364 self: The BaseModel instance. 365 context: The context. 366 """ 367 if getattr(self, '__pydantic_private__', None) is None: 368 pydantic_private = {} 369 for name, private_attr in self.__private_attributes__.items(): 370 default = private_attr.get_default() 371 if default is not PydanticUndefined: 372 pydantic_private[name] = default 373 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
- _Model
- python_env
- jinja_macros
- audit_definitions
- mapping_schema
- extract_dependencies_from_query
- pre_statements_
- post_statements_
- on_virtual_update_
- render_definition
- render_query
- render_query_or_raise
- render_pre_statements
- render_post_statements
- render_on_virtual_update
- render_audit_query
- pre_statements
- post_statements
- on_virtual_update
- macro_definitions
- render_signals
- render_signal_calls
- render_merge_filter
- render_physical_properties
- render_virtual_properties
- render_session_properties
- ctas_query
- text_diff
- set_time_format
- convert_to_time_column
- set_mapping_schema
- update_schema
- columns_to_types_or_raise
- annotated
- sorted_python_env
- view_name
- schema_name
- physical_schema
- is_sql
- is_python
- forward_only
- disable_restatement
- auto_restatement_intervals
- auto_restatement_cron
- auto_restatement_croniter
- wap_supported
- validate_definition
- is_metadata_only_change
- data_hash
- audit_metadata_hash
- metadata_hash
- is_model
- grants_table_type
- full_depends_on
- partitioned_by
- partition_interval_unit
- audits_with_args
- violated_rules_for_query
- sqlmesh.core.model.meta.ModelMeta
- dialect
- name
- retention
- table_format
- storage_format
- partitioned_by_
- clustered_by
- default_catalog
- depends_on_
- columns_to_types_
- column_descriptions_
- audits
- grains
- references
- physical_schema_override
- physical_properties_
- virtual_properties_
- session_properties_
- allow_partials
- signals
- enabled
- physical_version
- gateway
- optimize_query
- ignored_rules_
- formatting
- virtual_environment_mode
- grants_
- grants_target_layer
- ignored_rules_validator
- session_properties_validator
- time_column
- unique_key
- column_descriptions
- lookback
- lookback_start
- batch_concurrency
- physical_properties
- virtual_properties
- session_properties
- custom_materialization_properties
- grants
- all_references
- on
- managed_columns
- when_matched
- merge_filter
- catalog
- fully_qualified_table
- fqn
- on_destructive_change
- on_additive_change
- ignored_rules
1878class PythonModel(_Model): 1879 """The model definition which relies on a Python script to fetch the data. 1880 1881 Args: 1882 entrypoint: The name of a Python function which contains the data fetching / transformation logic. 1883 """ 1884 1885 kind: ModelKind = FullKind() 1886 entrypoint: str 1887 source_type: t.Literal["python"] = "python" 1888 1889 def validate_definition(self) -> None: 1890 super().validate_definition() 1891 1892 if self.kind and not self.kind.supports_python_models: 1893 raise_config_error( 1894 f"Cannot create Python model '{self.name}' as the '{self.kind.name}' kind doesn't support Python models", 1895 self._path, 1896 ) 1897 1898 def render( 1899 self, 1900 *, 1901 context: ExecutionContext, 1902 start: t.Optional[TimeLike] = None, 1903 end: t.Optional[TimeLike] = None, 1904 execution_time: t.Optional[TimeLike] = None, 1905 **kwargs: t.Any, 1906 ) -> t.Iterator[QueryOrDF]: 1907 env = prepare_env(self.python_env) 1908 start, end = make_inclusive(start or c.EPOCH, end or c.EPOCH, self.dialect) 1909 execution_time = to_datetime(execution_time or c.EPOCH) 1910 1911 variables = { 1912 **env.get(c.SQLMESH_VARS, {}), 1913 **env.get(c.SQLMESH_VARS_METADATA, {}), 1914 **kwargs.pop("variables", {}), 1915 } 1916 blueprint_variables = { 1917 k: d.parse_one(v.sql, dialect=self.dialect) if isinstance(v, SqlValue) else v 1918 for k, v in { 1919 **env.get(c.SQLMESH_BLUEPRINT_VARS, {}), 1920 **env.get(c.SQLMESH_BLUEPRINT_VARS_METADATA, {}), 1921 }.items() 1922 } 1923 try: 1924 kwargs = { 1925 **variables, 1926 **kwargs, 1927 "start": start, 1928 "end": end, 1929 "execution_time": execution_time, 1930 "latest": execution_time, # TODO: Preserved for backward compatibility. Remove in 1.0.0. 1931 } 1932 df_or_iter = env[self.entrypoint]( 1933 context=context.with_variables(variables, blueprint_variables=blueprint_variables), 1934 **kwargs, 1935 ) 1936 1937 if not isinstance(df_or_iter, types.GeneratorType): 1938 df_or_iter = [df_or_iter] 1939 1940 for df in df_or_iter: 1941 yield df 1942 except Exception as e: 1943 raise PythonModelEvalError(format_evaluated_code_exception(e, self.python_env)) 1944 1945 def render_definition( 1946 self, 1947 include_python: bool = True, 1948 include_defaults: bool = False, 1949 render_query: bool = False, 1950 ) -> t.List[exp.Expr]: 1951 # Ignore the provided value for the include_python flag, since the Pyhon model's 1952 # definition without Python code is meaningless. 1953 return super().render_definition( 1954 include_python=True, include_defaults=include_defaults, render_query=render_query 1955 ) 1956 1957 @property 1958 def is_python(self) -> bool: 1959 return True 1960 1961 def is_breaking_change(self, previous: Model) -> t.Optional[bool]: 1962 return None 1963 1964 @property 1965 def _data_hash_values_no_sql(self) -> t.List[str]: 1966 data = super()._data_hash_values_no_sql 1967 data.append(self.entrypoint) 1968 return data
The model definition which relies on a Python script to fetch the data.
Arguments:
- entrypoint: The name of a Python function which contains the data fetching / transformation logic.
1889 def validate_definition(self) -> None: 1890 super().validate_definition() 1891 1892 if self.kind and not self.kind.supports_python_models: 1893 raise_config_error( 1894 f"Cannot create Python model '{self.name}' as the '{self.kind.name}' kind doesn't support Python models", 1895 self._path, 1896 )
Validates the model's definition.
Raises:
- ConfigError
1898 def render( 1899 self, 1900 *, 1901 context: ExecutionContext, 1902 start: t.Optional[TimeLike] = None, 1903 end: t.Optional[TimeLike] = None, 1904 execution_time: t.Optional[TimeLike] = None, 1905 **kwargs: t.Any, 1906 ) -> t.Iterator[QueryOrDF]: 1907 env = prepare_env(self.python_env) 1908 start, end = make_inclusive(start or c.EPOCH, end or c.EPOCH, self.dialect) 1909 execution_time = to_datetime(execution_time or c.EPOCH) 1910 1911 variables = { 1912 **env.get(c.SQLMESH_VARS, {}), 1913 **env.get(c.SQLMESH_VARS_METADATA, {}), 1914 **kwargs.pop("variables", {}), 1915 } 1916 blueprint_variables = { 1917 k: d.parse_one(v.sql, dialect=self.dialect) if isinstance(v, SqlValue) else v 1918 for k, v in { 1919 **env.get(c.SQLMESH_BLUEPRINT_VARS, {}), 1920 **env.get(c.SQLMESH_BLUEPRINT_VARS_METADATA, {}), 1921 }.items() 1922 } 1923 try: 1924 kwargs = { 1925 **variables, 1926 **kwargs, 1927 "start": start, 1928 "end": end, 1929 "execution_time": execution_time, 1930 "latest": execution_time, # TODO: Preserved for backward compatibility. Remove in 1.0.0. 1931 } 1932 df_or_iter = env[self.entrypoint]( 1933 context=context.with_variables(variables, blueprint_variables=blueprint_variables), 1934 **kwargs, 1935 ) 1936 1937 if not isinstance(df_or_iter, types.GeneratorType): 1938 df_or_iter = [df_or_iter] 1939 1940 for df in df_or_iter: 1941 yield df 1942 except Exception as e: 1943 raise PythonModelEvalError(format_evaluated_code_exception(e, self.python_env))
Renders the content of this model in a form of either a SELECT query, executing which the data for this model can be fetched, or a dataframe object which contains the data itself.
The type of the returned object (query or dataframe) depends on whether the model was sourced from a SQL query, a Python script or a pre-built dataset (seed).
Arguments:
- context: The execution context used for fetching data.
- start: The start date/time of the run.
- end: The end date/time of the run.
- execution_time: The date/time time reference to use for execution time.
Returns:
A generator which yields either a query object or one of the supported dataframe objects.
1945 def render_definition( 1946 self, 1947 include_python: bool = True, 1948 include_defaults: bool = False, 1949 render_query: bool = False, 1950 ) -> t.List[exp.Expr]: 1951 # Ignore the provided value for the include_python flag, since the Pyhon model's 1952 # definition without Python code is meaningless. 1953 return super().render_definition( 1954 include_python=True, include_defaults=include_defaults, render_query=render_query 1955 )
Returns the original list of sql expressions comprising the model definition.
Arguments:
- include_python: Whether or not to include Python code in the rendered definition.
Determines whether this model is a breaking change in relation to the previous model.
Arguments:
- previous: The previous model to compare against.
Returns:
True if this model instance represents a breaking change, False if it's a non-breaking change and None if the nature of the change can't be determined.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
358def init_private_attributes(self: BaseModel, context: Any, /) -> None: 359 """This function is meant to behave like a BaseModel method to initialise private attributes. 360 361 It takes context as an argument since that's what pydantic-core passes when calling it. 362 363 Args: 364 self: The BaseModel instance. 365 context: The context. 366 """ 367 if getattr(self, '__pydantic_private__', None) is None: 368 pydantic_private = {} 369 for name, private_attr in self.__private_attributes__.items(): 370 default = private_attr.get_default() 371 if default is not PydanticUndefined: 372 pydantic_private[name] = default 373 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
- _Model
- python_env
- jinja_macros
- audit_definitions
- mapping_schema
- extract_dependencies_from_query
- pre_statements_
- post_statements_
- on_virtual_update_
- copy
- render_query
- render_query_or_raise
- render_pre_statements
- render_post_statements
- render_on_virtual_update
- render_audit_query
- pre_statements
- post_statements
- on_virtual_update
- macro_definitions
- render_signals
- render_signal_calls
- render_merge_filter
- render_physical_properties
- render_virtual_properties
- render_session_properties
- ctas_query
- text_diff
- set_time_format
- convert_to_time_column
- set_mapping_schema
- update_schema
- depends_on
- columns_to_types
- columns_to_types_or_raise
- annotated
- sorted_python_env
- view_name
- schema_name
- physical_schema
- is_sql
- is_seed
- depends_on_self
- forward_only
- disable_restatement
- auto_restatement_intervals
- auto_restatement_cron
- auto_restatement_croniter
- wap_supported
- is_metadata_only_change
- data_hash
- audit_metadata_hash
- metadata_hash
- is_model
- grants_table_type
- full_depends_on
- partitioned_by
- partition_interval_unit
- audits_with_args
- violated_rules_for_query
- sqlmesh.core.model.meta.ModelMeta
- dialect
- name
- retention
- table_format
- storage_format
- partitioned_by_
- clustered_by
- default_catalog
- depends_on_
- columns_to_types_
- column_descriptions_
- audits
- grains
- references
- physical_schema_override
- physical_properties_
- virtual_properties_
- session_properties_
- allow_partials
- signals
- enabled
- physical_version
- gateway
- optimize_query
- ignored_rules_
- formatting
- virtual_environment_mode
- grants_
- grants_target_layer
- ignored_rules_validator
- session_properties_validator
- time_column
- unique_key
- column_descriptions
- lookback
- lookback_start
- batch_size
- batch_concurrency
- physical_properties
- virtual_properties
- session_properties
- custom_materialization_properties
- grants
- all_references
- on
- managed_columns
- when_matched
- merge_filter
- catalog
- fully_qualified_table
- fqn
- on_destructive_change
- on_additive_change
- ignored_rules
1971class ExternalModel(_Model): 1972 """The model definition which represents an external source/table.""" 1973 1974 kind: ModelKind = ExternalKind() 1975 source_type: t.Literal["external"] = "external" 1976 1977 def is_breaking_change(self, previous: Model) -> t.Optional[bool]: 1978 if not isinstance(previous, ExternalModel): 1979 return None 1980 if not previous.columns_to_types_or_raise.items() - self.columns_to_types_or_raise.items(): 1981 return False 1982 return None 1983 1984 @property 1985 def depends_on(self) -> t.Set[str]: 1986 return set() 1987 1988 @property 1989 def depends_on_self(self) -> bool: 1990 return False
The model definition which represents an external source/table.
1977 def is_breaking_change(self, previous: Model) -> t.Optional[bool]: 1978 if not isinstance(previous, ExternalModel): 1979 return None 1980 if not previous.columns_to_types_or_raise.items() - self.columns_to_types_or_raise.items(): 1981 return False 1982 return None
Determines whether this model is a breaking change in relation to the previous model.
Arguments:
- previous: The previous model to compare against.
Returns:
True if this model instance represents a breaking change, False if it's a non-breaking change and None if the nature of the change can't be determined.
All of the upstream dependencies referenced in the model's query, excluding self references.
Returns:
A list of all the upstream table names.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
358def init_private_attributes(self: BaseModel, context: Any, /) -> None: 359 """This function is meant to behave like a BaseModel method to initialise private attributes. 360 361 It takes context as an argument since that's what pydantic-core passes when calling it. 362 363 Args: 364 self: The BaseModel instance. 365 context: The context. 366 """ 367 if getattr(self, '__pydantic_private__', None) is None: 368 pydantic_private = {} 369 for name, private_attr in self.__private_attributes__.items(): 370 default = private_attr.get_default() 371 if default is not PydanticUndefined: 372 pydantic_private[name] = default 373 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
- _Model
- python_env
- jinja_macros
- audit_definitions
- mapping_schema
- extract_dependencies_from_query
- pre_statements_
- post_statements_
- on_virtual_update_
- copy
- render
- render_definition
- render_query
- render_query_or_raise
- render_pre_statements
- render_post_statements
- render_on_virtual_update
- render_audit_query
- pre_statements
- post_statements
- on_virtual_update
- macro_definitions
- render_signals
- render_signal_calls
- render_merge_filter
- render_physical_properties
- render_virtual_properties
- render_session_properties
- ctas_query
- text_diff
- set_time_format
- convert_to_time_column
- set_mapping_schema
- update_schema
- columns_to_types
- columns_to_types_or_raise
- annotated
- sorted_python_env
- view_name
- schema_name
- physical_schema
- is_sql
- is_python
- is_seed
- forward_only
- disable_restatement
- auto_restatement_intervals
- auto_restatement_cron
- auto_restatement_croniter
- wap_supported
- validate_definition
- is_metadata_only_change
- data_hash
- audit_metadata_hash
- metadata_hash
- is_model
- grants_table_type
- full_depends_on
- partitioned_by
- partition_interval_unit
- audits_with_args
- violated_rules_for_query
- sqlmesh.core.model.meta.ModelMeta
- dialect
- name
- retention
- table_format
- storage_format
- partitioned_by_
- clustered_by
- default_catalog
- depends_on_
- columns_to_types_
- column_descriptions_
- audits
- grains
- references
- physical_schema_override
- physical_properties_
- virtual_properties_
- session_properties_
- allow_partials
- signals
- enabled
- physical_version
- gateway
- optimize_query
- ignored_rules_
- formatting
- virtual_environment_mode
- grants_
- grants_target_layer
- ignored_rules_validator
- session_properties_validator
- time_column
- unique_key
- column_descriptions
- lookback
- lookback_start
- batch_size
- batch_concurrency
- physical_properties
- virtual_properties
- session_properties
- custom_materialization_properties
- grants
- all_references
- on
- managed_columns
- when_matched
- merge_filter
- catalog
- fully_qualified_table
- fqn
- on_destructive_change
- on_additive_change
- ignored_rules
1996class AuditResult(PydanticModel): 1997 audit: Audit 1998 """The audit this result is for.""" 1999 audit_args: t.Dict[t.Any, t.Any] 2000 """Arguments passed to the audit.""" 2001 model: t.Optional[_Model] = None 2002 """The model this audit is for.""" 2003 count: t.Optional[int] = None 2004 """The number of records returned by the audit query. This could be None if the audit was skipped.""" 2005 query: t.Optional[exp.Expr] = None 2006 """The rendered query used by the audit. This could be None if the audit was skipped.""" 2007 skipped: bool = False 2008 """Whether or not the audit was blocking. This can be overriden by the user.""" 2009 blocking: bool = True
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of the class variables defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The synthesized
__init__[Signature][inspect.Signature] of the model. - __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The core schema of the model.
- __pydantic_custom_init__: Whether the model has a custom
__init__function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__andModel.__root_validators__from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a [
RootModel][pydantic.root_model.RootModel]. - __pydantic_serializer__: The
pydantic-coreSchemaSerializerused to dump instances of the model. - __pydantic_validator__: The
pydantic-coreSchemaValidatorused to validate instances of the model. - __pydantic_fields__: A dictionary of field names and their corresponding [
FieldInfo][pydantic.fields.FieldInfo] objects. - __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [
ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects. - __pydantic_extra__: A dictionary containing extra values, if [
extra][pydantic.config.ConfigDict.extra] is set to'allow'. - __pydantic_fields_set__: The names of fields explicitly set during instantiation.
- __pydantic_private__: Values of private attributes set on the model instance.
The audit this result is for.
The number of records returned by the audit query. This could be None if the audit was skipped.
The rendered query used by the audit. This could be None if the audit was skipped.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
2012class EvaluatableSignals(PydanticModel): 2013 signals_to_kwargs: t.Dict[str, t.Dict[str, t.Optional[exp.Expr]]] 2014 """A mapping of signal names to the kwargs passed to the signal.""" 2015 python_env: t.Dict[str, Executable] 2016 """The Python environment that should be used to evaluated the rendered signal calls.""" 2017 prepared_python_env: t.Dict[str, t.Any] 2018 """The prepared Python environment that should be used to evaluated the rendered signal calls."""
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of the class variables defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The synthesized
__init__[Signature][inspect.Signature] of the model. - __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The core schema of the model.
- __pydantic_custom_init__: Whether the model has a custom
__init__function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__andModel.__root_validators__from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a [
RootModel][pydantic.root_model.RootModel]. - __pydantic_serializer__: The
pydantic-coreSchemaSerializerused to dump instances of the model. - __pydantic_validator__: The
pydantic-coreSchemaValidatorused to validate instances of the model. - __pydantic_fields__: A dictionary of field names and their corresponding [
FieldInfo][pydantic.fields.FieldInfo] objects. - __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [
ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects. - __pydantic_extra__: A dictionary containing extra values, if [
extra][pydantic.config.ConfigDict.extra] is set to'allow'. - __pydantic_fields_set__: The names of fields explicitly set during instantiation.
- __pydantic_private__: Values of private attributes set on the model instance.
A mapping of signal names to the kwargs passed to the signal.
The Python environment that should be used to evaluated the rendered signal calls.
The prepared Python environment that should be used to evaluated the rendered signal calls.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
2057def create_models_from_blueprints( 2058 gateway: t.Optional[str | exp.Expr], 2059 blueprints: t.Any, 2060 get_variables: t.Callable[[t.Optional[str]], t.Dict[str, str]], 2061 loader: t.Callable[..., Model], 2062 path: Path = Path(), 2063 module_path: Path = Path(), 2064 dialect: DialectType = None, 2065 default_catalog_per_gateway: t.Optional[t.Dict[str, str]] = None, 2066 **loader_kwargs: t.Any, 2067) -> t.List[Model]: 2068 model_blueprints: t.List[Model] = [] 2069 for blueprint in _extract_blueprints(blueprints, path): 2070 blueprint_variables = _extract_blueprint_variables(blueprint, path) 2071 2072 if gateway: 2073 rendered_gateway = render_expression( 2074 expression=exp.maybe_parse(gateway, dialect=dialect), 2075 module_path=module_path, 2076 macros=loader_kwargs.get("macros"), 2077 jinja_macros=loader_kwargs.get("jinja_macros"), 2078 path=path, 2079 dialect=dialect, 2080 default_catalog=loader_kwargs.get("default_catalog"), 2081 blueprint_variables=blueprint_variables, 2082 ) 2083 gateway_name = rendered_gateway[0].name if rendered_gateway else None 2084 else: 2085 gateway_name = None 2086 2087 if ( 2088 default_catalog_per_gateway 2089 and gateway_name 2090 and (catalog := default_catalog_per_gateway.get(gateway_name)) is not None 2091 ): 2092 loader_kwargs["default_catalog"] = catalog 2093 2094 model_blueprints.append( 2095 loader( 2096 path=path, 2097 module_path=module_path, 2098 dialect=dialect, 2099 variables=get_variables(gateway_name), 2100 blueprint_variables=blueprint_variables, 2101 **loader_kwargs, 2102 ) 2103 ) 2104 2105 return model_blueprints
2108def load_sql_based_models( 2109 expressions: t.List[exp.Expr], 2110 get_variables: t.Callable[[t.Optional[str]], t.Dict[str, str]], 2111 path: Path = Path(), 2112 module_path: Path = Path(), 2113 dialect: DialectType = None, 2114 default_catalog_per_gateway: t.Optional[t.Dict[str, str]] = None, 2115 **loader_kwargs: t.Any, 2116) -> t.List[Model]: 2117 gateway: t.Optional[exp.Expr] = None 2118 blueprints: t.Optional[exp.Expr] = None 2119 2120 model_meta = seq_get(expressions, 0) 2121 for prop in (isinstance(model_meta, d.Model) and model_meta.expressions) or []: 2122 if prop.name == "gateway": 2123 gateway = prop.args["value"] 2124 elif prop.name == "blueprints": 2125 # We pop the `blueprints` here to avoid walking large lists when rendering the meta 2126 blueprints = prop.pop().args["value"] 2127 2128 if isinstance(blueprints, d.MacroFunc): 2129 rendered_blueprints = render_expression( 2130 expression=blueprints, 2131 module_path=module_path, 2132 macros=loader_kwargs.get("macros"), 2133 jinja_macros=loader_kwargs.get("jinja_macros"), 2134 variables=get_variables(None), 2135 path=path, 2136 dialect=dialect, 2137 default_catalog=loader_kwargs.get("default_catalog"), 2138 ) 2139 if not rendered_blueprints: 2140 raise_config_error("Failed to render blueprints property", path) 2141 2142 # Help mypy see that rendered_blueprints can't be None 2143 assert rendered_blueprints 2144 2145 if len(rendered_blueprints) > 1: 2146 rendered_blueprints = [exp.Tuple(expressions=rendered_blueprints)] 2147 2148 blueprints = rendered_blueprints[0] 2149 2150 return create_models_from_blueprints( 2151 gateway=gateway, 2152 blueprints=blueprints, 2153 get_variables=get_variables, 2154 loader=partial(load_sql_based_model, expressions), 2155 path=path, 2156 module_path=module_path, 2157 dialect=dialect, 2158 default_catalog_per_gateway=default_catalog_per_gateway, 2159 **loader_kwargs, 2160 )
2163def load_sql_based_model( 2164 expressions: t.List[exp.Expr], 2165 *, 2166 defaults: t.Optional[t.Dict[str, t.Any]] = None, 2167 path: t.Optional[Path] = None, 2168 module_path: Path = Path(), 2169 time_column_format: str = c.DEFAULT_TIME_COLUMN_FORMAT, 2170 macros: t.Optional[MacroRegistry] = None, 2171 jinja_macros: t.Optional[JinjaMacroRegistry] = None, 2172 audits: t.Optional[t.Dict[str, ModelAudit]] = None, 2173 python_env: t.Optional[t.Dict[str, Executable]] = None, 2174 dialect: t.Optional[str] = None, 2175 physical_schema_mapping: t.Optional[t.Dict[re.Pattern, str]] = None, 2176 default_catalog: t.Optional[str] = None, 2177 variables: t.Optional[t.Dict[str, t.Any]] = None, 2178 infer_names: t.Optional[bool] = False, 2179 blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None, 2180 **kwargs: t.Any, 2181) -> Model: 2182 """Load a model from a parsed SQLMesh model SQL file. 2183 2184 Args: 2185 expressions: Model, *Statements, Query. 2186 defaults: Definition default values. 2187 path: An optional path to the model definition file. 2188 module_path: The python module path to serialize macros for. 2189 time_column_format: The default time column format to use if no model time column is configured. 2190 macros: The custom registry of macros. If not provided the default registry will be used. 2191 jinja_macros: The registry of Jinja macros. 2192 python_env: The custom Python environment for macros. If not provided the environment will be constructed 2193 from the macro registry. 2194 dialect: The default dialect if no model dialect is configured. 2195 The format must adhere to Python's strftime codes. 2196 physical_schema_mapping: A mapping of regular expressions to match against the model schema to produce the corresponding physical schema 2197 default_catalog: The default catalog if no model catalog is configured. 2198 variables: The variables to pass to the model. 2199 kwargs: Additional kwargs to pass to the loader. 2200 """ 2201 missing_model_msg = f"""Please add a MODEL block at the top of the file. Example: 2202 2203MODEL ( 2204 name sqlmesh_example.full_model, --model name 2205 kind FULL, --materialization 2206 cron '@daily', --schedule 2207); 2208 2209Learn more at https://sqlmesh.readthedocs.io/en/stable/concepts/models/overview 2210""" 2211 2212 if not expressions: 2213 raise_config_error(missing_model_msg) 2214 2215 dialect = dialect or "" 2216 meta = expressions[0] 2217 if not isinstance(meta, d.Model): 2218 if not infer_names: 2219 raise_config_error(missing_model_msg) 2220 meta = d.Model(expressions=[]) # Dummy meta node 2221 expressions.insert(0, meta) 2222 2223 # We deliberately hold off rendering some properties at load time because there is not enough information available 2224 # at load time to render them. They will get rendered later at evaluation time 2225 unrendered_properties = {} 2226 unrendered_merge_filter = None 2227 2228 for prop in meta.expressions: 2229 # Macro functions that programmaticaly generate the key-value pair properties should be rendered 2230 # This is needed in the odd case where a macro shares the name of one of the properties 2231 # eg `@session_properties()` Test: `test_macros_in_model_statement` Reference PR: #2574 2232 if isinstance(prop, d.MacroFunc): 2233 continue 2234 2235 prop_name = prop.name.lower() 2236 if prop_name in {"signals", "audits"} | PROPERTIES: 2237 unrendered_properties[prop_name] = prop.args.get("value") 2238 elif ( 2239 prop.name.lower() == "kind" 2240 and (value := prop.args.get("value")) 2241 and value.name.lower() == "incremental_by_unique_key" 2242 ): 2243 for kind_prop in value.expressions: 2244 if kind_prop.name.lower() == "merge_filter": 2245 unrendered_merge_filter = kind_prop 2246 2247 rendered_meta_exprs = render_expression( 2248 expression=meta, 2249 module_path=module_path, 2250 macros=macros, 2251 jinja_macros=jinja_macros, 2252 variables=variables, 2253 path=path, 2254 dialect=dialect, 2255 default_catalog=default_catalog, 2256 blueprint_variables=blueprint_variables, 2257 ) 2258 2259 if rendered_meta_exprs is None or len(rendered_meta_exprs) != 1: 2260 raise_config_error( 2261 f"Invalid MODEL statement:\n{meta.sql(dialect=dialect, pretty=True)}", 2262 path, 2263 ) 2264 raise 2265 2266 rendered_meta = rendered_meta_exprs[0] 2267 2268 rendered_defaults = ( 2269 render_model_defaults( 2270 defaults=defaults, 2271 module_path=module_path, 2272 macros=macros, 2273 jinja_macros=jinja_macros, 2274 variables=variables, 2275 path=path, 2276 dialect=dialect, 2277 default_catalog=default_catalog, 2278 ) 2279 if defaults 2280 else {} 2281 ) 2282 2283 rendered_defaults = parse_defaults_properties(rendered_defaults, dialect=dialect) 2284 2285 # Extract the query and any pre/post statements 2286 query_or_seed_insert, pre_statements, post_statements, on_virtual_update, inline_audits = ( 2287 _split_sql_model_statements(expressions[1:], path, dialect=dialect) 2288 ) 2289 2290 meta_fields: t.Dict[str, t.Any] = { 2291 "dialect": dialect, 2292 "description": ( 2293 "\n".join(comment.strip() for comment in rendered_meta.comments) 2294 if rendered_meta.comments 2295 else None 2296 ), 2297 **{prop.name.lower(): prop.args.get("value") for prop in rendered_meta.expressions}, 2298 **kwargs, 2299 } 2300 2301 # Discard the potentially half-rendered versions of these properties and replace them with the 2302 # original unrendered versions. They will get rendered properly at evaluation time 2303 meta_fields.update(unrendered_properties) 2304 2305 if unrendered_merge_filter: 2306 for idx, kind_prop in enumerate(meta_fields["kind"].expressions): 2307 if kind_prop.name.lower() == "merge_filter": 2308 meta_fields["kind"].expressions[idx] = unrendered_merge_filter 2309 2310 if isinstance(meta_fields.get("dialect"), exp.Expr): 2311 meta_fields["dialect"] = meta_fields["dialect"].name 2312 2313 # The name of the model will be inferred from its path relative to `models/`, if it's not explicitly specified 2314 name = meta_fields.pop("name", "") 2315 if not name and infer_names: 2316 if path is None: 2317 raise ValueError(f"Model {name} must have a name") 2318 name = get_model_name(path) 2319 2320 if not name: 2321 raise_config_error( 2322 "Please add the required 'name' field to the MODEL block at the top of the file.\n\n" 2323 + "Learn more at https://sqlmesh.readthedocs.io/en/stable/concepts/models/overview" 2324 ) 2325 if "default_catalog" in meta_fields: 2326 raise_config_error( 2327 "`default_catalog` cannot be set on a per-model basis. It must be set at the connection level.", 2328 path, 2329 ) 2330 2331 common_kwargs = dict( 2332 pre_statements=pre_statements, 2333 post_statements=post_statements, 2334 on_virtual_update=on_virtual_update, 2335 defaults=rendered_defaults, 2336 path=path, 2337 module_path=module_path, 2338 macros=macros, 2339 python_env=python_env, 2340 jinja_macros=jinja_macros, 2341 physical_schema_mapping=physical_schema_mapping, 2342 default_catalog=default_catalog, 2343 variables=variables, 2344 inline_audits=inline_audits, 2345 blueprint_variables=blueprint_variables, 2346 use_original_sql=True, 2347 **meta_fields, 2348 ) 2349 2350 kind = common_kwargs.pop("kind", ModelMeta.all_field_infos()["kind"].default) 2351 2352 if kind.name != ModelKindName.SEED: 2353 return create_sql_model( 2354 name, 2355 query_or_seed_insert, 2356 kind=kind, 2357 time_column_format=time_column_format, 2358 **common_kwargs, 2359 ) 2360 2361 seed_properties = {p.name.lower(): p.args.get("value") for p in kind.expressions} 2362 return create_seed_model( 2363 name, 2364 SeedKind(**seed_properties), 2365 **common_kwargs, 2366 )
Load a model from a parsed SQLMesh model SQL file.
Arguments:
- expressions: Model, *Statements, Query.
- defaults: Definition default values.
- path: An optional path to the model definition file.
- module_path: The python module path to serialize macros for.
- time_column_format: The default time column format to use if no model time column is configured.
- macros: The custom registry of macros. If not provided the default registry will be used.
- jinja_macros: The registry of Jinja macros.
- python_env: The custom Python environment for macros. If not provided the environment will be constructed from the macro registry.
- dialect: The default dialect if no model dialect is configured. The format must adhere to Python's strftime codes.
- physical_schema_mapping: A mapping of regular expressions to match against the model schema to produce the corresponding physical schema
- default_catalog: The default catalog if no model catalog is configured.
- variables: The variables to pass to the model.
- kwargs: Additional kwargs to pass to the loader.
2369def create_sql_model( 2370 name: TableName, 2371 query: t.Optional[exp.Expr], 2372 **kwargs: t.Any, 2373) -> Model: 2374 """Creates a SQL model. 2375 2376 Args: 2377 name: The name of the model, which is of the form [catalog].[db].table. 2378 The catalog and db are optional. 2379 query: The model's logic in a form of a SELECT query. 2380 """ 2381 if not isinstance(query, (exp.Query, d.JinjaQuery, d.MacroFunc)): 2382 raise_config_error( 2383 "A query is required and must be a SELECT statement, a UNION statement, or a JINJA_QUERY block", 2384 kwargs.get("path"), 2385 ) 2386 assert isinstance(query, (exp.Query, d.JinjaQuery, d.MacroFunc)) 2387 2388 return _create_model(SqlModel, name, query=query, **kwargs)
Creates a SQL model.
Arguments:
- name: The name of the model, which is of the form [catalog].[db].table. The catalog and db are optional.
- query: The model's logic in a form of a SELECT query.
2391def create_seed_model( 2392 name: TableName, 2393 seed_kind: SeedKind, 2394 *, 2395 path: t.Optional[Path] = None, 2396 module_path: Path = Path(), 2397 **kwargs: t.Any, 2398) -> Model: 2399 """Creates a Seed model. 2400 2401 Args: 2402 name: The name of the model, which is of the form [catalog].[db].table. 2403 The catalog and db are optional. 2404 seed_kind: The information about the location of a seed and other related configuration. 2405 path: An optional path to the model definition file. 2406 from the macro registry. 2407 """ 2408 seed_path = Path(seed_kind.path) 2409 marker, *subdirs = seed_path.parts 2410 if marker.lower() == "$root": 2411 seed_path = module_path.joinpath(*subdirs) 2412 seed_kind.path = str(seed_path) 2413 elif not seed_path.is_absolute(): 2414 if path is None: 2415 seed_path = seed_path 2416 elif path.is_dir(): 2417 seed_path = path / seed_path 2418 else: 2419 seed_path = path.parent / seed_path 2420 2421 seed = create_seed(seed_path) 2422 2423 return _create_model( 2424 SeedModel, 2425 name, 2426 path=path, 2427 seed=seed, 2428 kind=seed_kind, 2429 depends_on=kwargs.pop("depends_on", None), 2430 module_path=module_path, 2431 **kwargs, 2432 )
Creates a Seed model.
Arguments:
- name: The name of the model, which is of the form [catalog].[db].table. The catalog and db are optional.
- seed_kind: The information about the location of a seed and other related configuration.
- path: An optional path to the model definition file. from the macro registry.
2435def create_python_model( 2436 name: str, 2437 entrypoint: str, 2438 python_env: t.Dict[str, Executable], 2439 *, 2440 macros: t.Optional[MacroRegistry] = None, 2441 jinja_macros: t.Optional[JinjaMacroRegistry] = None, 2442 path: Path = Path(), 2443 module_path: Path = Path(), 2444 depends_on: t.Optional[t.Set[str]] = None, 2445 variables: t.Optional[t.Dict[str, t.Any]] = None, 2446 blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None, 2447 **kwargs: t.Any, 2448) -> Model: 2449 """Creates a Python model. 2450 2451 Args: 2452 name: The name of the model, which is of the form [catalog].[db].table. 2453 The catalog and db are optional. 2454 entrypoint: The name of a Python function which contains the data fetching / transformation logic. 2455 python_env: The Python environment of all objects referenced by the model implementation. 2456 path: An optional path to the model definition file. 2457 depends_on: The custom set of model's upstream dependencies. 2458 variables: The variables to pass to the model. 2459 blueprint_variables: The blueprint's variables to pass to the model. 2460 """ 2461 # Find dependencies for python models by parsing code if they are not explicitly defined 2462 # Also remove self-references that are found 2463 2464 dialect = kwargs.get("dialect") 2465 2466 dependencies_unspecified = depends_on is None 2467 2468 parsed_depends_on, referenced_variables = ( 2469 parse_dependencies( 2470 python_env, 2471 entrypoint, 2472 strict_resolution=dependencies_unspecified, 2473 variables=variables, 2474 blueprint_variables=blueprint_variables, 2475 ) 2476 if python_env is not None 2477 else (set(), set()) 2478 ) 2479 if dependencies_unspecified: 2480 depends_on = parsed_depends_on - {name} 2481 else: 2482 depends_on_rendered = render_expression( 2483 expression=exp.Array( 2484 expressions=[exp.maybe_parse(dep, dialect=dialect) for dep in depends_on or []] 2485 ), 2486 module_path=module_path, 2487 macros=macros, 2488 jinja_macros=jinja_macros, 2489 variables=variables, 2490 path=path, 2491 dialect=dialect, 2492 default_catalog=kwargs.get("default_catalog"), 2493 ) 2494 depends_on = { 2495 dep.sql(dialect=dialect) 2496 for dep in t.cast(t.List[exp.Expr], depends_on_rendered)[0].expressions 2497 } 2498 2499 used_variables = {k: v for k, v in (variables or {}).items() if k in referenced_variables} 2500 if used_variables: 2501 python_env[c.SQLMESH_VARS] = Executable.value(used_variables, sort_root_dict=True) 2502 2503 return _create_model( 2504 PythonModel, 2505 name, 2506 path=path, 2507 depends_on=depends_on, 2508 entrypoint=entrypoint, 2509 python_env=python_env, 2510 macros=macros, 2511 jinja_macros=jinja_macros, 2512 module_path=module_path, 2513 variables=variables, 2514 blueprint_variables=blueprint_variables, 2515 **kwargs, 2516 )
Creates a Python model.
Arguments:
- name: The name of the model, which is of the form [catalog].[db].table. The catalog and db are optional.
- entrypoint: The name of a Python function which contains the data fetching / transformation logic.
- python_env: The Python environment of all objects referenced by the model implementation.
- path: An optional path to the model definition file.
- depends_on: The custom set of model's upstream dependencies.
- variables: The variables to pass to the model.
- blueprint_variables: The blueprint's variables to pass to the model.
2519def create_external_model( 2520 name: TableName, 2521 *, 2522 dialect: t.Optional[str] = None, 2523 path: Path = Path(), 2524 defaults: t.Optional[t.Dict[str, t.Any]] = None, 2525 **kwargs: t.Any, 2526) -> ExternalModel: 2527 """Creates an external model. 2528 2529 Args: 2530 name: The name of the model, which is of the form [catalog].[db].table. 2531 The catalog and db are optional. 2532 dialect: The dialect to serialize. 2533 path: An optional path to the model definition file. 2534 """ 2535 return t.cast( 2536 ExternalModel, 2537 _create_model( 2538 ExternalModel, 2539 name, 2540 defaults=defaults, 2541 dialect=dialect, 2542 path=path, 2543 kind=ModelKindName.EXTERNAL.value, 2544 **kwargs, 2545 ), 2546 )
Creates an external model.
Arguments:
- name: The name of the model, which is of the form [catalog].[db].table. The catalog and db are optional.
- dialect: The dialect to serialize.
- path: An optional path to the model definition file.
2866def render_meta_fields( 2867 fields: t.Dict[str, t.Any], 2868 module_path: Path, 2869 path: t.Optional[Path], 2870 jinja_macros: t.Optional[JinjaMacroRegistry], 2871 macros: t.Optional[MacroRegistry], 2872 dialect: DialectType, 2873 variables: t.Optional[t.Dict[str, t.Any]], 2874 default_catalog: t.Optional[str], 2875 blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None, 2876) -> t.Dict[str, t.Any]: 2877 def render_field_value(value: t.Any) -> t.Any: 2878 if isinstance(value, exp.Expr) or (isinstance(value, str) and "@" in value): 2879 expression = exp.maybe_parse(value, dialect=dialect) 2880 rendered_expr = render_expression( 2881 expression=expression, 2882 module_path=module_path, 2883 macros=macros, 2884 jinja_macros=jinja_macros, 2885 variables=variables, 2886 path=path, 2887 dialect=dialect, 2888 default_catalog=default_catalog, 2889 blueprint_variables=blueprint_variables, 2890 ) 2891 if not rendered_expr: 2892 raise SQLMeshError( 2893 f"Rendering `{expression.sql(dialect=dialect)}` did not return an expression" 2894 ) 2895 2896 if len(rendered_expr) != 1: 2897 raise SQLMeshError( 2898 f"Rendering `{expression.sql(dialect=dialect)}` must return one result, but got {len(rendered_expr)}" 2899 ) 2900 2901 # For cases where a property is conditionally assigned 2902 if rendered_expr[0].sql().lower() in {"none", "null"}: 2903 return None 2904 2905 return rendered_expr[0] 2906 2907 return value 2908 2909 for field_name, field_info in ModelMeta.all_field_infos().items(): 2910 field = field_info.alias or field_name 2911 field_value = fields.get(field) 2912 2913 # We don't want to parse python model cron="@..." kwargs (e.g. @daily) into MacroVar 2914 if ( 2915 field == "cron" 2916 and isinstance(field_value, str) 2917 and field_value.lower() in CRON_SHORTCUTS 2918 ) or field_value is None: 2919 continue 2920 2921 if field in RUNTIME_RENDERED_MODEL_FIELDS: 2922 fields[field] = parse_strings_with_macro_refs(field_value, dialect) 2923 continue 2924 2925 if isinstance(field_value, dict): 2926 rendered_dict = {} 2927 for key, value in field_value.items(): 2928 if key in RUNTIME_RENDERED_MODEL_FIELDS: 2929 rendered_dict[key] = parse_strings_with_macro_refs(value, dialect) 2930 elif ( 2931 # don't parse kind auto_restatement_cron="@..." kwargs (e.g. @daily) into MacroVar 2932 key == "auto_restatement_cron" 2933 and isinstance(value, str) 2934 and value.lower() in CRON_SHORTCUTS 2935 ): 2936 rendered_dict[key] = value 2937 elif (rendered := render_field_value(value)) is not None: 2938 rendered_dict[key] = rendered 2939 2940 if rendered_dict: 2941 fields[field] = rendered_dict 2942 else: 2943 fields.pop(field) 2944 elif isinstance(field_value, list): 2945 rendered_list = [ 2946 rendered 2947 for value in field_value 2948 if (rendered := render_field_value(value)) is not None 2949 ] 2950 if rendered_list: 2951 fields[field] = rendered_list 2952 else: 2953 fields.pop(field) 2954 else: 2955 rendered_field = render_field_value(field_value) 2956 if rendered_field is not None: 2957 fields[field] = rendered_field 2958 else: 2959 fields.pop(field) 2960 2961 return fields
2964def render_model_defaults( 2965 defaults: t.Dict[str, t.Any], 2966 module_path: Path, 2967 path: t.Optional[Path], 2968 jinja_macros: t.Optional[JinjaMacroRegistry], 2969 macros: t.Optional[MacroRegistry], 2970 dialect: DialectType, 2971 variables: t.Optional[t.Dict[str, t.Any]], 2972 default_catalog: t.Optional[str], 2973) -> t.Dict[str, t.Any]: 2974 rendered_defaults = render_meta_fields( 2975 fields=defaults, 2976 module_path=module_path, 2977 macros=macros, 2978 jinja_macros=jinja_macros, 2979 variables=variables, 2980 path=path, 2981 dialect=dialect, 2982 default_catalog=default_catalog, 2983 ) 2984 2985 # Validate defaults that have macros are rendered to boolean 2986 for boolean in {"optimize_query", "allow_partials", "enabled"}: 2987 var = rendered_defaults.get(boolean) 2988 if var is not None and not isinstance(var, (exp.Boolean, bool)): 2989 raise ConfigError(f"Expected boolean for '{var}', got '{type(var)}' instead") 2990 2991 # Validate the 'interval_unit' if present is an Interval Unit 2992 var = rendered_defaults.get("interval_unit") 2993 if isinstance(var, str): 2994 try: 2995 rendered_defaults["interval_unit"] = IntervalUnit(var) 2996 except ValueError as e: 2997 raise ConfigError(f"Invalid interval unit: {var}") from e 2998 2999 return rendered_defaults
3002def parse_defaults_properties( 3003 defaults: t.Dict[str, t.Any], dialect: DialectType 3004) -> t.Dict[str, t.Any]: 3005 for prop in PROPERTIES: 3006 default_properties = defaults.get(prop) 3007 for key, value in (default_properties or {}).items(): 3008 if isinstance(key, str) and d.SQLMESH_MACRO_PREFIX in str(value): 3009 defaults[prop][key] = exp.maybe_parse(value, dialect=dialect) 3010 3011 return defaults
3014def render_expression( 3015 expression: exp.Expr, 3016 module_path: Path, 3017 path: t.Optional[Path], 3018 jinja_macros: t.Optional[JinjaMacroRegistry] = None, 3019 macros: t.Optional[MacroRegistry] = None, 3020 dialect: DialectType = None, 3021 variables: t.Optional[t.Dict[str, t.Any]] = None, 3022 default_catalog: t.Optional[str] = None, 3023 blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None, 3024) -> t.Optional[t.List[exp.Expr]]: 3025 meta_python_env = make_python_env( 3026 expressions=expression, 3027 jinja_macro_references=None, 3028 module_path=module_path, 3029 macros=macros or macro.get_registry(), 3030 variables=variables, 3031 path=path, 3032 blueprint_variables=blueprint_variables, 3033 ) 3034 return ExpressionRenderer( 3035 expression, 3036 dialect, 3037 [], 3038 path=path, 3039 jinja_macro_registry=jinja_macros, 3040 python_env=meta_python_env, 3041 default_catalog=default_catalog, 3042 quote_identifiers=False, 3043 normalize_identifiers=False, 3044 ).render()
3095def clickhouse_partition_func( 3096 column: exp.Expr, columns_to_types: t.Optional[t.Dict[str, exp.DataType]] 3097) -> exp.Expr: 3098 # `toMonday()` function accepts a Date or DateTime type column 3099 3100 col_type = (columns_to_types and columns_to_types.get(column.name)) or exp.DataType.build( 3101 "UNKNOWN" 3102 ) 3103 col_type_is_conformable = col_type.is_type( 3104 exp.DataType.Type.DATE, 3105 exp.DataType.Type.DATE32, 3106 exp.DataType.Type.DATETIME, 3107 exp.DataType.Type.DATETIME64, 3108 ) 3109 3110 # if input column is already a conformable type, just pass the column 3111 if col_type_is_conformable: 3112 return exp.func("toMonday", column, dialect="clickhouse") 3113 3114 # if input column type is not known, cast input to DateTime64 3115 if col_type.is_type(exp.DataType.Type.UNKNOWN): 3116 return exp.func( 3117 "toMonday", 3118 exp.cast(column, exp.DataType.build("DateTime64(9, 'UTC')", dialect="clickhouse")), 3119 dialect="clickhouse", 3120 ) 3121 3122 # if input column type is known but not conformable, cast input to DateTime64 and cast output back to original type 3123 return exp.cast( 3124 exp.func( 3125 "toMonday", 3126 exp.cast(column, exp.DataType.build("DateTime64(9, 'UTC')", dialect="clickhouse")), 3127 dialect="clickhouse", 3128 ), 3129 col_type, 3130 )