Edit on GitHub

sqlmesh.core.model.definition

   1from __future__ import annotations
   2
   3import json
   4import logging
   5import types
   6import re
   7import typing as t
   8from functools import cached_property, partial
   9from pathlib import Path
  10
  11from pydantic import Field
  12from sqlglot import diff, exp
  13from sqlglot.diff import Insert
  14from sqlglot.helper import seq_get
  15from sqlglot.optimizer.qualify_columns import quote_identifiers
  16from sqlglot.optimizer.simplify import gen
  17from sqlglot.optimizer.normalize_identifiers import normalize_identifiers
  18from sqlglot.schema import MappingSchema, nested_set
  19from sqlglot.time import format_time
  20
  21from sqlmesh.core import constants as c
  22from sqlmesh.core import dialect as d
  23from sqlmesh.core.audit import Audit, ModelAudit
  24from sqlmesh.core.node import IntervalUnit
  25from sqlmesh.core.macros import MacroRegistry, macro
  26from sqlmesh.core.model.common import (
  27    ParsableSql,
  28    make_python_env,
  29    parse_dependencies,
  30    parse_strings_with_macro_refs,
  31    single_value_or_tuple,
  32    sorted_python_env_payloads,
  33    validate_extra_and_required_fields,
  34)
  35from sqlmesh.core.model.meta import ModelMeta
  36from sqlmesh.core.model.kind import (
  37    ExternalKind,
  38    ModelKindName,
  39    SeedKind,
  40    ModelKind,
  41    FullKind,
  42    create_model_kind,
  43    CustomKind,
  44)
  45from sqlmesh.core.model.seed import CsvSeedReader, Seed, create_seed
  46from sqlmesh.core.renderer import ExpressionRenderer, QueryRenderer
  47from sqlmesh.core.signal import SignalRegistry
  48from sqlmesh.utils import columns_to_types_all_known, str_to_bool, UniqueKeyDict
  49from sqlmesh.utils.cron import CroniterCache
  50from sqlmesh.utils.date import TimeLike, make_inclusive, to_datetime, to_time_column
  51from sqlmesh.utils.errors import ConfigError, SQLMeshError, raise_config_error, PythonModelEvalError
  52from sqlmesh.utils.hashing import hash_data
  53from sqlmesh.utils.jinja import JinjaMacroRegistry, extract_macro_references_and_variables
  54from sqlmesh.utils.pydantic import PydanticModel, PRIVATE_FIELDS
  55from sqlmesh.utils.metaprogramming import (
  56    Executable,
  57    SqlValue,
  58    build_env,
  59    prepare_env,
  60    serialize_env,
  61    format_evaluated_code_exception,
  62)
  63
  64if t.TYPE_CHECKING:
  65    from sqlglot.dialects.dialect import DialectType
  66    from sqlmesh.core.node import _Node
  67    from sqlmesh.core._typing import Self, TableName, SessionProperties
  68    from sqlmesh.core.context import ExecutionContext
  69    from sqlmesh.core.engine_adapter import EngineAdapter
  70    from sqlmesh.core.engine_adapter._typing import QueryOrDF
  71    from sqlmesh.core.engine_adapter.shared import DataObjectType
  72    from sqlmesh.core.linter.rule import Rule
  73    from sqlmesh.core.snapshot import DeployabilityIndex, Node, Snapshot
  74    from sqlmesh.utils.jinja import MacroReference
  75
  76
  77logger = logging.getLogger(__name__)
  78
  79
  80PROPERTIES = {"physical_properties", "session_properties", "virtual_properties"}
  81
  82RUNTIME_RENDERED_MODEL_FIELDS = {
  83    "audits",
  84    "signals",
  85    "merge_filter",
  86} | PROPERTIES
  87
  88CRON_SHORTCUTS = {
  89    "@midnight",
  90    "@hourly",
  91    "@daily",
  92    "@weekly",
  93    "@monthly",
  94    "@yearly",
  95    "@annually",
  96}
  97
  98
  99class _Model(ModelMeta, frozen=True):
 100    """Model is the core abstraction for user defined datasets.
 101
 102    A model consists of logic that fetches the data (a SQL query, a Python script or a seed) and metadata
 103    associated with it. Models can be run on arbitrary cadences and support incremental or full refreshes.
 104    Models can also be materialized into physical tables or shared across other models as temporary views.
 105
 106    Example:
 107        MODEL (
 108            name           sushi.order_items,
 109            owner          jen,
 110            cron           '@daily',
 111            start          '2020-01-01',
 112            partitioned_by ds
 113        );
 114
 115        @DEF(var, 'my_var');
 116
 117        SELECT
 118          1 AS column_a # my first column,
 119          @var AS my_column # my second column,
 120        ;
 121
 122    Args:
 123        name: The name of the model, which is of the form [catalog].[db].table.
 124            The catalog and db are optional.
 125        dialect: The SQL dialect that the model's query is written in. By default,
 126            this is assumed to be the dialect of the context.
 127        owner: The owner of the model.
 128        cron: A cron string specifying how often the model should be refreshed, leveraging the
 129            [croniter](https://github.com/kiorky/croniter) library.
 130        description: The optional model description.
 131        stamp: An optional arbitrary string sequence used to create new model versions without making
 132            changes to any of the functional components of the definition.
 133        start: The earliest date that the model will be backfilled for. If this is None,
 134            then the date is inferred by taking the most recent start date of its ancestors.
 135            The start date can be a static datetime or a relative datetime like "1 year ago"
 136        end: The date that the model will be backfilled up until. Follows the same syntax as 'start',
 137            should be omitted if there is no end date.
 138        lookback: The number of previous incremental intervals in the lookback window.
 139        table_format: The table format used to manage the physical table files defined by `storage_format`, only applicable in certain engines.
 140            (eg, 'iceberg', 'delta', 'hudi')
 141        storage_format: The storage format used to store the physical table, only applicable in certain engines.
 142            (eg. 'parquet', 'orc')
 143        partitioned_by: The partition columns or engine specific expressions, only applicable in certain engines. (eg. (ds, hour))
 144        clustered_by: The cluster columns or engine specific expressions, only applicable in certain engines. (eg. (ds, hour))
 145        python_env: Dictionary containing all global variables needed to render the model's macros.
 146        mapping_schema: The schema of table names to column and types.
 147        extract_dependencies_from_query: Whether to extract additional dependencies from the rendered model's query.
 148        physical_schema_override: The desired physical schema name override.
 149    """
 150
 151    python_env: t.Dict[str, Executable] = {}
 152    jinja_macros: JinjaMacroRegistry = JinjaMacroRegistry()
 153    audit_definitions: t.Dict[str, ModelAudit] = {}
 154    mapping_schema: t.Dict[str, t.Any] = {}
 155    extract_dependencies_from_query: bool = True
 156    pre_statements_: t.Optional[t.List[ParsableSql]] = Field(default=None, alias="pre_statements")
 157    post_statements_: t.Optional[t.List[ParsableSql]] = Field(default=None, alias="post_statements")
 158    on_virtual_update_: t.Optional[t.List[ParsableSql]] = Field(
 159        default=None, alias="on_virtual_update"
 160    )
 161
 162    _full_depends_on: t.Optional[t.Set[str]] = None
 163    _statement_renderer_cache: t.Dict[int, ExpressionRenderer] = {}
 164    _is_metadata_only_change_cache: t.Dict[int, bool] = {}
 165
 166    _expressions_validator = ParsableSql.validator()
 167
 168    def __getstate__(self) -> t.Dict[t.Any, t.Any]:
 169        state = super().__getstate__()
 170        private = state[PRIVATE_FIELDS]
 171        private["_statement_renderer_cache"] = {}
 172        return state
 173
 174    def copy(self, **kwargs: t.Any) -> Self:
 175        model = super().copy(**kwargs)
 176        model._statement_renderer_cache = {}
 177        return model
 178
 179    def render(
 180        self,
 181        *,
 182        context: ExecutionContext,
 183        start: t.Optional[TimeLike] = None,
 184        end: t.Optional[TimeLike] = None,
 185        execution_time: t.Optional[TimeLike] = None,
 186        **kwargs: t.Any,
 187    ) -> t.Iterator[QueryOrDF]:
 188        """Renders the content of this model in a form of either a SELECT query, executing which the data for this model can
 189        be fetched, or a dataframe object which contains the data itself.
 190
 191        The type of the returned object (query or dataframe) depends on whether the model was sourced from a SQL query,
 192        a Python script or a pre-built dataset (seed).
 193
 194        Args:
 195            context: The execution context used for fetching data.
 196            start: The start date/time of the run.
 197            end: The end date/time of the run.
 198            execution_time: The date/time time reference to use for execution time.
 199
 200        Returns:
 201            A generator which yields either a query object or one of the supported dataframe objects.
 202        """
 203        yield self.render_query_or_raise(
 204            start=start,
 205            end=end,
 206            execution_time=execution_time,
 207            snapshots=context.snapshots,
 208            deployability_index=context.deployability_index,
 209            engine_adapter=context.engine_adapter,
 210            **kwargs,
 211        )
 212
 213    def render_definition(
 214        self,
 215        include_python: bool = True,
 216        include_defaults: bool = False,
 217        render_query: bool = False,
 218    ) -> t.List[exp.Expr]:
 219        """Returns the original list of sql expressions comprising the model definition.
 220
 221        Args:
 222            include_python: Whether or not to include Python code in the rendered definition.
 223        """
 224        expressions = []
 225        comment = None
 226        for field_name, field_info in ModelMeta.all_field_infos().items():
 227            field_value = getattr(self, field_name)
 228
 229            if (include_defaults and field_value) or field_value != field_info.default:
 230                if field_name == "description":
 231                    comment = field_value
 232                elif field_name == "kind":
 233                    expressions.append(
 234                        exp.Property(
 235                            this="kind",
 236                            value=field_value.to_expression(dialect=self.dialect),
 237                        )
 238                    )
 239                elif field_name == "name":
 240                    expressions.append(
 241                        exp.Property(
 242                            this=field_name,
 243                            value=exp.to_table(field_value, dialect=self.dialect),
 244                        )
 245                    )
 246                elif field_name not in ("default_catalog", "enabled", "ignored_rules_"):
 247                    expressions.append(
 248                        exp.Property(
 249                            this=field_info.alias or field_name,
 250                            value=META_FIELD_CONVERTER.get(field_name, exp.to_identifier)(
 251                                field_value
 252                            ),
 253                        )
 254                    )
 255
 256        model = d.Model(expressions=expressions)
 257        model.comments = [comment] if comment else None
 258
 259        jinja_expressions = []
 260        python_expressions = []
 261        if include_python:
 262            python_env = d.PythonCode(expressions=sorted_python_env_payloads(self.python_env))
 263            if python_env.expressions:
 264                python_expressions.append(python_env)
 265
 266            jinja_expressions = self.jinja_macros.to_expressions()
 267
 268        return [
 269            model,
 270            *python_expressions,
 271            *jinja_expressions,
 272        ]
 273
 274    def render_query(
 275        self,
 276        *,
 277        start: t.Optional[TimeLike] = None,
 278        end: t.Optional[TimeLike] = None,
 279        execution_time: t.Optional[TimeLike] = None,
 280        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
 281        table_mapping: t.Optional[t.Dict[str, str]] = None,
 282        expand: t.Iterable[str] = tuple(),
 283        deployability_index: t.Optional[DeployabilityIndex] = None,
 284        engine_adapter: t.Optional[EngineAdapter] = None,
 285        **kwargs: t.Any,
 286    ) -> t.Optional[exp.Query]:
 287        """Renders a model's query, expanding macros with provided kwargs, and optionally expanding referenced models.
 288
 289        Args:
 290            start: The start datetime to render. Defaults to epoch start.
 291            end: The end datetime to render. Defaults to epoch start.
 292            execution_time: The date/time time reference to use for execution time.
 293            snapshots: All upstream snapshots (by name) to use for expansion and mapping of physical locations.
 294            table_mapping: Table mapping of physical locations. Takes precedence over snapshot mappings.
 295            expand: Expand referenced models as subqueries. This is used to bypass backfills when running queries
 296                that depend on materialized tables.  Model definitions are inlined and can thus be run end to
 297                end on the fly.
 298            deployability_index: Determines snapshots that are deployable in the context of this render.
 299            kwargs: Additional kwargs to pass to the renderer.
 300
 301        Returns:
 302            The rendered expression.
 303        """
 304        return exp.select(
 305            *(
 306                exp.cast(exp.Null(), column_type, copy=False).as_(name, copy=False, quoted=True)
 307                for name, column_type in (self.columns_to_types or {}).items()
 308            ),
 309            copy=False,
 310        ).from_(exp.values([tuple([1])], alias="t", columns=["dummy"]), copy=False)
 311
 312    def render_query_or_raise(
 313        self,
 314        *,
 315        start: t.Optional[TimeLike] = None,
 316        end: t.Optional[TimeLike] = None,
 317        execution_time: t.Optional[TimeLike] = None,
 318        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
 319        table_mapping: t.Optional[t.Dict[str, str]] = None,
 320        expand: t.Iterable[str] = tuple(),
 321        deployability_index: t.Optional[DeployabilityIndex] = None,
 322        engine_adapter: t.Optional[EngineAdapter] = None,
 323        **kwargs: t.Any,
 324    ) -> exp.Query:
 325        """Same as `render_query()` but raises an exception if the query can't be rendered.
 326
 327        Args:
 328            start: The start datetime to render. Defaults to epoch start.
 329            end: The end datetime to render. Defaults to epoch start.
 330            execution_time: The date/time time reference to use for execution time.
 331            snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations.
 332            table_mapping: Table mapping of physical locations. Takes precedence over snapshot mappings.
 333            expand: Expand referenced models as subqueries. This is used to bypass backfills when running queries
 334                that depend on materialized tables.  Model definitions are inlined and can thus be run end to
 335                end on the fly.
 336            deployability_index: Determines snapshots that are deployable in the context of this render.
 337            kwargs: Additional kwargs to pass to the renderer.
 338
 339        Returns:
 340            The rendered expression.
 341        """
 342        query = self.render_query(
 343            start=start,
 344            end=end,
 345            execution_time=execution_time,
 346            snapshots=snapshots,
 347            table_mapping=table_mapping,
 348            expand=expand,
 349            deployability_index=deployability_index,
 350            engine_adapter=engine_adapter,
 351            **kwargs,
 352        )
 353        if query is None:
 354            raise SQLMeshError(f"Failed to render query for model '{self.name}'.")
 355        return query
 356
 357    def render_pre_statements(
 358        self,
 359        *,
 360        start: t.Optional[TimeLike] = None,
 361        end: t.Optional[TimeLike] = None,
 362        execution_time: t.Optional[TimeLike] = None,
 363        snapshots: t.Optional[t.Collection[Snapshot]] = None,
 364        expand: t.Iterable[str] = tuple(),
 365        deployability_index: t.Optional[DeployabilityIndex] = None,
 366        engine_adapter: t.Optional[EngineAdapter] = None,
 367        inside_transaction: t.Optional[bool] = True,
 368        **kwargs: t.Any,
 369    ) -> t.List[exp.Expr]:
 370        """Renders pre-statements for a model.
 371
 372        Pre-statements are statements that preceded the model's SELECT query.
 373
 374        Args:
 375            start: The start datetime to render. Defaults to epoch start.
 376            end: The end datetime to render. Defaults to epoch start.
 377            execution_time: The date/time time reference to use for execution time.
 378            snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations.
 379            expand: Expand referenced models as subqueries. This is used to bypass backfills when running queries
 380                that depend on materialized tables.  Model definitions are inlined and can thus be run end to
 381                end on the fly.
 382            deployability_index: Determines snapshots that are deployable in the context of this render.
 383            kwargs: Additional kwargs to pass to the renderer.
 384
 385        Returns:
 386            The list of rendered expressions.
 387        """
 388        return self._render_statements(
 389            [
 390                stmt
 391                for stmt in self.pre_statements
 392                if stmt.args.get("transaction", True) == inside_transaction
 393            ],
 394            start=start,
 395            end=end,
 396            execution_time=execution_time,
 397            snapshots=snapshots,
 398            expand=expand,
 399            deployability_index=deployability_index,
 400            engine_adapter=engine_adapter,
 401            **kwargs,
 402        )
 403
 404    def render_post_statements(
 405        self,
 406        *,
 407        start: t.Optional[TimeLike] = None,
 408        end: t.Optional[TimeLike] = None,
 409        execution_time: t.Optional[TimeLike] = None,
 410        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
 411        expand: t.Iterable[str] = tuple(),
 412        deployability_index: t.Optional[DeployabilityIndex] = None,
 413        engine_adapter: t.Optional[EngineAdapter] = None,
 414        inside_transaction: t.Optional[bool] = True,
 415        **kwargs: t.Any,
 416    ) -> t.List[exp.Expr]:
 417        """Renders post-statements for a model.
 418
 419        Post-statements are statements that follow after the model's SELECT query.
 420
 421        Args:
 422            start: The start datetime to render. Defaults to epoch start.
 423            end: The end datetime to render. Defaults to epoch start.
 424            execution_time: The date/time time reference to use for execution time.
 425            snapshots: All upstream snapshots (by model name) to use for expansion and mapping of physical locations.
 426            expand: Expand referenced models as subqueries. This is used to bypass backfills when running queries
 427                that depend on materialized tables.  Model definitions are inlined and can thus be run end to
 428                end on the fly.
 429            deployability_index: Determines snapshots that are deployable in the context of this render.
 430            inside_transaction: Whether to render hooks with transaction=True (inside) or transaction=False (outside).
 431            kwargs: Additional kwargs to pass to the renderer.
 432
 433        Returns:
 434            The list of rendered expressions.
 435        """
 436        return self._render_statements(
 437            [
 438                stmt
 439                for stmt in self.post_statements
 440                if stmt.args.get("transaction", True) == inside_transaction
 441            ],
 442            start=start,
 443            end=end,
 444            execution_time=execution_time,
 445            snapshots=snapshots,
 446            expand=expand,
 447            deployability_index=deployability_index,
 448            engine_adapter=engine_adapter,
 449            **kwargs,
 450        )
 451
 452    def render_on_virtual_update(
 453        self,
 454        *,
 455        start: t.Optional[TimeLike] = None,
 456        end: t.Optional[TimeLike] = None,
 457        execution_time: t.Optional[TimeLike] = None,
 458        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
 459        expand: t.Iterable[str] = tuple(),
 460        deployability_index: t.Optional[DeployabilityIndex] = None,
 461        engine_adapter: t.Optional[EngineAdapter] = None,
 462        **kwargs: t.Any,
 463    ) -> t.List[exp.Expr]:
 464        return self._render_statements(
 465            self.on_virtual_update,
 466            start=start,
 467            end=end,
 468            execution_time=execution_time,
 469            snapshots=snapshots,
 470            expand=expand,
 471            deployability_index=deployability_index,
 472            engine_adapter=engine_adapter,
 473            **kwargs,
 474        )
 475
 476    def render_audit_query(
 477        self,
 478        audit: Audit,
 479        *,
 480        start: t.Optional[TimeLike] = None,
 481        end: t.Optional[TimeLike] = None,
 482        execution_time: t.Optional[TimeLike] = None,
 483        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
 484        deployability_index: t.Optional[DeployabilityIndex] = None,
 485        **kwargs: t.Any,
 486    ) -> exp.Query:
 487        from sqlmesh.core.snapshot import DeployabilityIndex
 488
 489        deployability_index = deployability_index or DeployabilityIndex.all_deployable()
 490        snapshot = (snapshots or {}).get(self.fqn)
 491
 492        this_model = kwargs.pop("this_model", None) or (
 493            snapshot.table_name(deployability_index.is_deployable(snapshot))
 494            if snapshot
 495            else self.fqn
 496        )
 497
 498        columns_to_types: t.Optional[t.Dict[str, t.Any]] = None
 499        if "engine_adapter" in kwargs:
 500            try:
 501                columns_to_types = kwargs["engine_adapter"].columns(this_model)
 502            except Exception:
 503                pass
 504
 505        if self.time_column:
 506            low, high = [
 507                self.convert_to_time_column(dt, columns_to_types)
 508                for dt in make_inclusive(start or c.EPOCH, end or c.EPOCH, self.dialect)
 509            ]
 510            where = self.time_column.column.between(low, high)
 511        else:
 512            where = None
 513
 514        # The model's name is already normalized, but in case of snapshots we also prepend a
 515        # case-sensitive physical schema name, so we quote here to ensure that we won't have
 516        # a broken schema reference after the resulting query is normalized in `render`.
 517        quoted_model_name = quote_identifiers(
 518            exp.to_table(this_model, dialect=self.dialect), dialect=self.dialect
 519        )
 520
 521        query_renderer = QueryRenderer(
 522            audit.query,
 523            audit.dialect or self.dialect,
 524            audit.macro_definitions,
 525            path=audit._path or Path(),
 526            jinja_macro_registry=audit.jinja_macros,
 527            python_env=self.python_env,
 528            only_execution_time=self.kind.only_execution_time,
 529            default_catalog=self.default_catalog,
 530        )
 531
 532        rendered_query = query_renderer.render(
 533            start=start,
 534            end=end,
 535            execution_time=execution_time,
 536            snapshots=snapshots,
 537            deployability_index=deployability_index,
 538            **{
 539                **audit.defaults,
 540                "this_model": exp.select("*").from_(quoted_model_name).where(where).subquery()
 541                if where is not None
 542                else quoted_model_name,
 543                **kwargs,
 544            },  # type: ignore
 545        )
 546
 547        if rendered_query is None:
 548            raise SQLMeshError(
 549                f"Failed to render query for audit '{audit.name}', model '{self.name}'."
 550            )
 551
 552        return rendered_query
 553
 554    @property
 555    def pre_statements(self) -> t.List[exp.Expr]:
 556        return self._get_parsed_statements("pre_statements_")
 557
 558    @property
 559    def post_statements(self) -> t.List[exp.Expr]:
 560        return self._get_parsed_statements("post_statements_")
 561
 562    @property
 563    def on_virtual_update(self) -> t.List[exp.Expr]:
 564        return self._get_parsed_statements("on_virtual_update_")
 565
 566    @property
 567    def macro_definitions(self) -> t.List[d.MacroDef]:
 568        """All macro definitions from the list of expressions."""
 569        return [
 570            s
 571            for s in self.pre_statements + self.post_statements + self.on_virtual_update
 572            if isinstance(s, d.MacroDef)
 573        ]
 574
 575    def _get_parsed_statements(self, attr_name: str) -> t.List[exp.Expr]:
 576        value = getattr(self, attr_name)
 577        if not value:
 578            return []
 579        result = []
 580        for v in value:
 581            parsed = v.parse(self.dialect)
 582            if getattr(v, "transaction", None) is not None:
 583                parsed.set("transaction", v.transaction)
 584            if not isinstance(parsed, exp.Semicolon):
 585                result.append(parsed)
 586        return result
 587
 588    def _render_statements(
 589        self,
 590        statements: t.Iterable[exp.Expr],
 591        **kwargs: t.Any,
 592    ) -> t.List[exp.Expr]:
 593        rendered = (
 594            self._statement_renderer(statement).render(**kwargs)
 595            for statement in statements
 596            if not isinstance(statement, d.MacroDef)
 597        )
 598        return [r for expressions in rendered if expressions for r in expressions]
 599
 600    def _statement_renderer(self, expression: exp.Expr) -> ExpressionRenderer:
 601        expression_key = id(expression)
 602        if expression_key not in self._statement_renderer_cache:
 603            self._statement_renderer_cache[expression_key] = ExpressionRenderer(
 604                expression,
 605                self.dialect,
 606                self.macro_definitions,
 607                path=self._path,
 608                jinja_macro_registry=self.jinja_macros,
 609                python_env=self.python_env,
 610                only_execution_time=False,
 611                default_catalog=self.default_catalog,
 612                model=self,
 613            )
 614        return self._statement_renderer_cache[expression_key]
 615
 616    def render_signals(
 617        self,
 618        *,
 619        start: t.Optional[TimeLike] = None,
 620        end: t.Optional[TimeLike] = None,
 621        execution_time: t.Optional[TimeLike] = None,
 622    ) -> t.List[t.Dict[str, str | int | float | bool]]:
 623        """Renders external; signals defined for this model.
 624
 625        Args:
 626            start: The start datetime to render. Defaults to epoch start.
 627            end: The end datetime to render. Defaults to epoch start.
 628            execution_time: The date/time time reference to use for execution time.
 629
 630        Returns:
 631            The list of rendered expressions.
 632        """
 633
 634        def _render(e: exp.Expr) -> str | int | float | bool:
 635            rendered_exprs = (
 636                self._create_renderer(e).render(start=start, end=end, execution_time=execution_time)
 637                or []
 638            )
 639            if len(rendered_exprs) != 1:
 640                raise SQLMeshError(f"Expected one expression but got {len(rendered_exprs)}")
 641
 642            rendered = rendered_exprs[0]
 643            if rendered.is_int:
 644                return int(rendered.this)
 645            if rendered.is_number:
 646                return float(rendered.this)
 647            if isinstance(rendered, (exp.Literal, exp.Boolean)):
 648                return rendered.this
 649            return rendered.sql(dialect=self.dialect)
 650
 651        # airflow only
 652        return [
 653            {k: _render(v) for k, v in signal.items()} for name, signal in self.signals if not name
 654        ]
 655
 656    def render_signal_calls(self) -> EvaluatableSignals:
 657        python_env = self.python_env
 658        env = prepare_env(python_env)
 659        signals_to_kwargs = {
 660            name: {
 661                k: seq_get(self._create_renderer(v).render() or [], 0) for k, v in kwargs.items()
 662            }
 663            for name, kwargs in self.signals
 664            if name
 665        }
 666
 667        return EvaluatableSignals(
 668            signals_to_kwargs=signals_to_kwargs,
 669            python_env=python_env,
 670            prepared_python_env=env,
 671        )
 672
 673    def render_merge_filter(
 674        self,
 675        *,
 676        start: t.Optional[TimeLike] = None,
 677        end: t.Optional[TimeLike] = None,
 678        execution_time: t.Optional[TimeLike] = None,
 679    ) -> t.Optional[exp.Expr]:
 680        if self.merge_filter is None:
 681            return None
 682        rendered_exprs = (
 683            self._create_renderer(self.merge_filter).render(
 684                start=start, end=end, execution_time=execution_time
 685            )
 686            or []
 687        )
 688        if len(rendered_exprs) != 1:
 689            raise SQLMeshError(f"Expected one expression but got {len(rendered_exprs)}")
 690        return rendered_exprs[0].transform(d.replace_merge_table_aliases, dialect=self.dialect)
 691
 692    def _render_properties(
 693        self, properties: t.Dict[str, exp.Expr] | SessionProperties, **render_kwargs: t.Any
 694    ) -> t.Dict[str, t.Any]:
 695        def _render(expression: exp.Expr) -> exp.Expr | None:
 696            # note: we use the _statement_renderer instead of _create_renderer because it sets model_fqn which
 697            # in turn makes @this_model available in the evaluation context
 698            rendered_exprs = self._statement_renderer(expression).render(**render_kwargs)
 699
 700            # Inform instead of raising for cases where a property is conditionally assigned
 701            if not rendered_exprs or rendered_exprs[0].sql().lower() in {"none", "null"}:
 702                logger.info(
 703                    f"Rendering '{expression.sql(dialect=self.dialect)}' did not return an expression"
 704                )
 705                return None
 706
 707            if len(rendered_exprs) != 1:
 708                raise SQLMeshError(
 709                    f"Expected one result when rendering '{expression.sql(dialect=self.dialect)}' but got {len(rendered_exprs)}"
 710                )
 711
 712            return rendered_exprs[0]
 713
 714        return {
 715            k: rendered
 716            for k, v in properties.items()
 717            if (rendered := (_render(v) if isinstance(v, exp.Expr) else v))
 718        }
 719
 720    def render_physical_properties(self, **render_kwargs: t.Any) -> t.Dict[str, t.Any]:
 721        return self._render_properties(properties=self.physical_properties, **render_kwargs)
 722
 723    def render_virtual_properties(self, **render_kwargs: t.Any) -> t.Dict[str, t.Any]:
 724        return self._render_properties(properties=self.virtual_properties, **render_kwargs)
 725
 726    def render_session_properties(self, **render_kwargs: t.Any) -> t.Dict[str, t.Any]:
 727        return self._render_properties(properties=self.session_properties, **render_kwargs)
 728
 729    def _create_renderer(self, expression: exp.Expr) -> ExpressionRenderer:
 730        return ExpressionRenderer(
 731            expression,
 732            self.dialect,
 733            [],
 734            path=self._path,
 735            jinja_macro_registry=self.jinja_macros,
 736            python_env=self.python_env,
 737            only_execution_time=False,
 738            quote_identifiers=False,
 739        )
 740
 741    def ctas_query(self, **render_kwarg: t.Any) -> exp.Query:
 742        """Return a dummy query to do a CTAS.
 743
 744        If a model's column types are unknown, the only way to create the table is to
 745        run the fully expanded query. This can be expensive so we add a WHERE FALSE to all
 746        SELECTS and hopefully the optimizer is smart enough to not do anything.
 747
 748        Args:
 749            render_kwarg: Additional kwargs to pass to the renderer.
 750        Return:
 751            The mocked out ctas query.
 752        """
 753        query = self.render_query_or_raise(**render_kwarg).limit(0)
 754
 755        for select_or_set_op in query.find_all(exp.Select, exp.SetOperation):
 756            if isinstance(select_or_set_op, exp.Select) and select_or_set_op.args.get("from_"):
 757                select_or_set_op.where(exp.false(), copy=False)
 758
 759        if self.managed_columns:
 760            query.select(
 761                *[
 762                    exp.alias_(exp.cast(exp.Null(), to=col_type), col)
 763                    for col, col_type in self.managed_columns.items()
 764                    if col not in query.named_selects
 765                ],
 766                append=True,
 767                copy=False,
 768            )
 769        return query
 770
 771    def text_diff(self, other: Node, rendered: bool = False) -> str:
 772        """Produce a text diff against another node.
 773
 774        Args:
 775            other: The node to diff against.
 776            rendered: Whether the diff should compare raw vs rendered models
 777
 778        Returns:
 779            A unified text diff showing additions and deletions.
 780        """
 781        if not isinstance(other, _Model):
 782            raise SQLMeshError(
 783                f"Cannot diff model '{self.name} against a non-model node '{other.name}'"
 784            )
 785
 786        text_diff = d.text_diff(
 787            self.render_definition(render_query=rendered),
 788            other.render_definition(render_query=rendered),
 789            self.dialect,
 790            other.dialect,
 791        ).strip()
 792
 793        if not text_diff and not rendered:
 794            text_diff = d.text_diff(
 795                self.render_definition(render_query=True),
 796                other.render_definition(render_query=True),
 797                self.dialect,
 798                other.dialect,
 799            ).strip()
 800
 801        return text_diff
 802
 803    def set_time_format(self, default_time_format: str = c.DEFAULT_TIME_COLUMN_FORMAT) -> None:
 804        """Sets the default time format for a model.
 805
 806        Args:
 807            default_time_format: A python time format used as the default format when none is provided.
 808        """
 809        if not self.time_column:
 810            return
 811
 812        if self.time_column.format:
 813            # Transpile the time column format into the generic dialect
 814            formatted_time = format_time(
 815                self.time_column.format,
 816                d.Dialect.get_or_raise(self.dialect).TIME_MAPPING,
 817            )
 818            assert formatted_time is not None
 819            self.time_column.format = formatted_time
 820        else:
 821            self.time_column.format = default_time_format
 822
 823    def convert_to_time_column(
 824        self, time: TimeLike, columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None
 825    ) -> exp.Expr:
 826        """Convert a TimeLike object to the same time format and type as the model's time column."""
 827        if self.time_column:
 828            if columns_to_types is None:
 829                columns_to_types = self.columns_to_types_or_raise
 830
 831            if self.time_column.column.name not in columns_to_types:
 832                raise ConfigError(
 833                    f"Time column '{self.time_column.column.sql(dialect=self.dialect)}' not found in model '{self.name}'."
 834                )
 835
 836            time_column_type = columns_to_types[self.time_column.column.name]
 837
 838            return to_time_column(
 839                time,
 840                time_column_type,
 841                self.dialect,
 842                self.time_column.format,
 843            )
 844        return exp.convert(time)
 845
 846    def set_mapping_schema(self, schema: t.Dict) -> None:
 847        self.mapping_schema.clear()
 848        self.mapping_schema.update(schema)
 849
 850    def update_schema(self, schema: MappingSchema) -> None:
 851        """Updates the schema for this model's dependencies based on the given mapping schema."""
 852        for dep in self.depends_on:
 853            table = exp.to_table(dep)
 854            mapping_schema = schema.find(table)
 855
 856            if mapping_schema:
 857                nested_set(
 858                    self.mapping_schema,
 859                    tuple(part.sql(copy=False) for part in table.parts),
 860                    {col: dtype.sql(dialect=self.dialect) for col, dtype in mapping_schema.items()},
 861                )
 862
 863    @property
 864    def depends_on(self) -> t.Set[str]:
 865        """All of the upstream dependencies referenced in the model's query, excluding self references.
 866
 867        Returns:
 868            A list of all the upstream table names.
 869        """
 870        return self.full_depends_on - {self.fqn}
 871
 872    @property
 873    def columns_to_types(self) -> t.Optional[t.Dict[str, exp.DataType]]:
 874        """Returns the mapping of column names to types of this model."""
 875        if self.columns_to_types_ is None:
 876            return None
 877        return {**self.columns_to_types_, **self.managed_columns}
 878
 879    @property
 880    def columns_to_types_or_raise(self) -> t.Dict[str, exp.DataType]:
 881        """Returns the mapping of column names to types of this model or raise if not available."""
 882        columns_to_types = self.columns_to_types
 883        if columns_to_types is None:
 884            raise SQLMeshError(f"Column information is not available for model '{self.name}'")
 885        return columns_to_types
 886
 887    @property
 888    def annotated(self) -> bool:
 889        """Checks if all column projection types of this model are known."""
 890        if self.columns_to_types is None:
 891            return False
 892        columns_to_types = {
 893            k: v for k, v in self.columns_to_types.items() if k not in self.managed_columns
 894        }
 895        if not columns_to_types:
 896            return False
 897        return columns_to_types_all_known(columns_to_types)
 898
 899    @property
 900    def sorted_python_env(self) -> t.List[t.Tuple[str, Executable]]:
 901        """Returns the python env sorted by executable kind and then var name."""
 902        return sorted(self.python_env.items(), key=lambda x: (x[1].kind, x[0]))
 903
 904    @property
 905    def view_name(self) -> str:
 906        return self.fully_qualified_table.name
 907
 908    @property
 909    def schema_name(self) -> str:
 910        return self.fully_qualified_table.db or c.DEFAULT_SCHEMA
 911
 912    @property
 913    def physical_schema(self) -> str:
 914        return self.physical_schema_override or f"{c.SQLMESH}__{self.schema_name}"
 915
 916    @property
 917    def is_sql(self) -> bool:
 918        return False
 919
 920    @property
 921    def is_python(self) -> bool:
 922        return False
 923
 924    @property
 925    def is_seed(self) -> bool:
 926        return False
 927
 928    @property
 929    def depends_on_self(self) -> bool:
 930        return self.fqn in self.full_depends_on
 931
 932    @property
 933    def forward_only(self) -> bool:
 934        return getattr(self.kind, "forward_only", False)
 935
 936    @property
 937    def disable_restatement(self) -> bool:
 938        return getattr(self.kind, "disable_restatement", False)
 939
 940    @property
 941    def auto_restatement_intervals(self) -> t.Optional[int]:
 942        return getattr(self.kind, "auto_restatement_intervals", None)
 943
 944    @property
 945    def auto_restatement_cron(self) -> t.Optional[str]:
 946        return getattr(self.kind, "auto_restatement_cron", None)
 947
 948    def auto_restatement_croniter(self, value: TimeLike) -> CroniterCache:
 949        cron = self.auto_restatement_cron
 950        if cron is None:
 951            raise SQLMeshError("Auto restatement cron is not set.")
 952        return CroniterCache(cron, value)
 953
 954    @property
 955    def wap_supported(self) -> bool:
 956        return self.kind.is_materialized and (self.storage_format or "").lower() == "iceberg"
 957
 958    def validate_definition(self) -> None:
 959        """Validates the model's definition.
 960
 961        Raises:
 962            ConfigError
 963        """
 964
 965        for field in ("partitioned_by", "clustered_by"):
 966            values = getattr(self, field)
 967
 968            if values:
 969                values = [
 970                    col.name
 971                    for expr in values
 972                    for col in t.cast(
 973                        exp.Expr, exp.maybe_parse(expr, dialect=self.dialect)
 974                    ).find_all(exp.Column)
 975                ]
 976
 977                unique_keys = set(values)
 978
 979                if len(values) != len(unique_keys):
 980                    raise_config_error(
 981                        f"All keys in '{field}' must be unique in the model definition",
 982                        self._path,
 983                    )
 984
 985                columns_to_types = self.columns_to_types
 986                if columns_to_types is not None:
 987                    missing_keys = unique_keys - set(columns_to_types)
 988                    if missing_keys:
 989                        missing_keys_str = ", ".join(f"'{k}'" for k in sorted(missing_keys))
 990                        raise_config_error(
 991                            f"{field} keys [{missing_keys_str}] are missing in the model definition",
 992                            self._path,
 993                        )
 994
 995        if self.kind.is_incremental_by_time_range and not self.time_column:
 996            raise_config_error(
 997                "Incremental by time range models must have a time_column field",
 998                self._path,
 999            )
1000
1001        if (
1002            self.kind.is_incremental_unmanaged
1003            and getattr(self.kind, "insert_overwrite", False)
1004            and not self.partitioned_by_
1005        ):
1006            raise_config_error(
1007                "Unmanaged incremental models with insert / overwrite enabled must specify the partitioned_by field",
1008                self._path,
1009            )
1010
1011        if self.kind.is_managed:
1012            # TODO: would this sort of logic be better off moved into the Kind?
1013            if self.dialect == "snowflake" and "target_lag" not in self.physical_properties:
1014                raise_config_error(
1015                    "Snowflake managed tables must specify the 'target_lag' physical property",
1016                    self._path,
1017                )
1018
1019        if self.physical_version is not None and not self.forward_only:
1020            raise_config_error(
1021                "Pinning a physical version is only supported for forward only models",
1022                self._path,
1023            )
1024
1025        # The following attributes should be set only for SQL models
1026        if not self.is_sql:
1027            if self.optimize_query:
1028                raise_config_error(
1029                    "SQLMesh query optimizer can only be enabled for SQL models",
1030                    self._path,
1031                )
1032
1033        if isinstance(self.kind, CustomKind):
1034            from sqlmesh.core.snapshot.evaluator import get_custom_materialization_type_or_raise
1035
1036            # Will raise if the custom materialization points to an invalid class
1037            get_custom_materialization_type_or_raise(self.kind.materialization)
1038
1039        # Embedded model kind shouldn't have audits
1040        if self.kind.name == ModelKindName.EMBEDDED and self.audits:
1041            raise_config_error(
1042                "Audits are not supported for embedded models",
1043                self._path,
1044            )
1045
1046    def is_breaking_change(self, previous: Model) -> t.Optional[bool]:
1047        """Determines whether this model is a breaking change in relation to the `previous` model.
1048
1049        Args:
1050            previous: The previous model to compare against.
1051
1052        Returns:
1053            True if this model instance represents a breaking change, False if it's a non-breaking change
1054            and None if the nature of the change can't be determined.
1055        """
1056        raise NotImplementedError
1057
1058    def is_metadata_only_change(self, other: _Node) -> bool:
1059        if self._is_metadata_only_change_cache.get(id(other), None) is not None:
1060            return self._is_metadata_only_change_cache[id(other)]
1061
1062        is_metadata_change = True
1063        if (
1064            not isinstance(other, _Model)
1065            or self.metadata_hash == other.metadata_hash
1066            or self._data_hash_values_no_sql != other._data_hash_values_no_sql
1067        ):
1068            is_metadata_change = False
1069        else:
1070            this_statements = [
1071                s
1072                for s in [*self.pre_statements, *self.post_statements]
1073                if not self._is_metadata_statement(s)
1074            ]
1075            other_statements = [
1076                s
1077                for s in [*other.pre_statements, *other.post_statements]
1078                if not other._is_metadata_statement(s)
1079            ]
1080            if len(this_statements) != len(other_statements):
1081                is_metadata_change = False
1082            else:
1083                for this_statement, other_statement in zip(this_statements, other_statements):
1084                    this_rendered = (
1085                        self._statement_renderer(this_statement).render() or this_statement
1086                    )
1087                    other_rendered = (
1088                        other._statement_renderer(other_statement).render() or other_statement
1089                    )
1090                    if this_rendered != other_rendered:
1091                        is_metadata_change = False
1092                        break
1093
1094        self._is_metadata_only_change_cache[id(other)] = is_metadata_change
1095        return is_metadata_change
1096
1097    @property
1098    def data_hash(self) -> str:
1099        """
1100        Computes the data hash for the node.
1101
1102        Returns:
1103            The data hash for the node.
1104        """
1105        if self._data_hash is None:
1106            self._data_hash = hash_data(self._data_hash_values)
1107        return self._data_hash
1108
1109    @property
1110    def _data_hash_values(self) -> t.List[str]:
1111        return self._data_hash_values_no_sql + self._data_hash_values_sql
1112
1113    @property
1114    def _data_hash_values_sql(self) -> t.List[str]:
1115        data = []
1116
1117        for statements in [self.pre_statements_, self.post_statements_]:
1118            for statement in statements or []:
1119                data.append(statement.sql)
1120
1121        return data
1122
1123    @property
1124    def _data_hash_values_no_sql(self) -> t.List[str]:
1125        data = [
1126            str(  # Exclude metadata only macro funcs
1127                [(k, v) for k, v in self.sorted_python_env if not v.is_metadata]
1128            ),
1129            *self.kind.data_hash_values,
1130            self.table_format,
1131            self.storage_format,
1132            str(self.lookback),
1133            *(gen(expr) for expr in (self.partitioned_by or [])),
1134            *(gen(expr) for expr in (self.clustered_by or [])),
1135            self.stamp,
1136            self.physical_schema,
1137            self.physical_version,
1138            self.gateway,
1139            self.interval_unit.value if self.interval_unit is not None else None,
1140            str(self.optimize_query) if self.optimize_query is not None else None,
1141            self.virtual_environment_mode.value,
1142        ]
1143
1144        for column_name, column_type in (self.columns_to_types_ or {}).items():
1145            data.append(column_name)
1146            data.append(column_type.sql(dialect=self.dialect))
1147
1148        for key, value in (self.physical_properties or {}).items():
1149            data.append(key)
1150            data.append(gen(value))
1151
1152        return data  # type: ignore
1153
1154    def _audit_metadata_hash_values(self) -> t.List[str]:
1155        from sqlmesh.core.audit.builtin import BUILT_IN_AUDITS
1156
1157        metadata = []
1158
1159        for audit_name, audit_args in sorted(self.audits, key=lambda a: a[0]):
1160            metadata.append(audit_name)
1161            if audit_name in BUILT_IN_AUDITS:
1162                for arg_name, arg_value in audit_args.items():
1163                    metadata.append(arg_name)
1164                    metadata.append(gen(arg_value))
1165            else:
1166                audit = self.audit_definitions[audit_name]
1167                metadata.extend(
1168                    [
1169                        audit.query_.sql,
1170                        audit.dialect,
1171                        str(audit.skip),
1172                        str(audit.blocking),
1173                    ]
1174                )
1175
1176        return metadata
1177
1178    def audit_metadata_hash(self) -> str:
1179        return hash_data(self._audit_metadata_hash_values())
1180
1181    @property
1182    def metadata_hash(self) -> str:
1183        """
1184        Computes the metadata hash for the node.
1185
1186        Returns:
1187            The metadata hash for the node.
1188        """
1189        if self._metadata_hash is None:
1190            metadata = [
1191                self.dialect,
1192                self.owner,
1193                self.description,
1194                json.dumps(self.column_descriptions, sort_keys=True),
1195                self.cron,
1196                self.cron_tz.key if self.cron_tz else None,
1197                str(self.start) if self.start else None,
1198                str(self.end) if self.end else None,
1199                str(self.retention) if self.retention else None,
1200                str(self.batch_size) if self.batch_size is not None else None,
1201                str(self.batch_concurrency) if self.batch_concurrency is not None else None,
1202                json.dumps(self.mapping_schema, sort_keys=True),
1203                *sorted(self.tags),
1204                *sorted(ref.json(sort_keys=True) for ref in self.all_references),
1205                *self.kind.metadata_hash_values,
1206                self.project,
1207                str(self.allow_partials),
1208                gen(self.session_properties_) if self.session_properties_ else None,
1209                *[gen(g) for g in self.grains],
1210                *self._audit_metadata_hash_values(),
1211                json.dumps(self.grants, sort_keys=True) if self.grants else None,
1212                self.grants_target_layer,
1213            ]
1214
1215            for key, value in (self.virtual_properties or {}).items():
1216                metadata.append(key)
1217                metadata.append(gen(value))
1218
1219            for signal_name, args in sorted(self.signals, key=lambda x: x[0]):
1220                metadata.append(signal_name)
1221                for k, v in sorted(args.items()):
1222                    metadata.append(f"{k}:{gen(v)}")
1223
1224            if self.dbt_node_info:
1225                metadata.append(self.dbt_node_info.json(sort_keys=True))
1226
1227            metadata.extend(self._additional_metadata)
1228
1229            self._metadata_hash = hash_data(metadata)
1230        return self._metadata_hash
1231
1232    @property
1233    def is_model(self) -> bool:
1234        """Return True if this is a model node"""
1235        return True
1236
1237    @property
1238    def grants_table_type(self) -> DataObjectType:
1239        """Get the table type for grants application (TABLE, VIEW, MATERIALIZED_VIEW).
1240
1241        Returns:
1242            The DataObjectType that should be used when applying grants to this model.
1243        """
1244        from sqlmesh.core.engine_adapter.shared import DataObjectType
1245
1246        if self.kind.is_view:
1247            if hasattr(self.kind, "materialized") and getattr(self.kind, "materialized", False):
1248                return DataObjectType.MATERIALIZED_VIEW
1249            return DataObjectType.VIEW
1250        if self.kind.is_managed:
1251            return DataObjectType.MANAGED_TABLE
1252        # All other materialized models are tables
1253        return DataObjectType.TABLE
1254
1255    @property
1256    def _additional_metadata(self) -> t.List[str]:
1257        additional_metadata = []
1258
1259        metadata_only_macros = [(k, v) for k, v in self.sorted_python_env if v.is_metadata]
1260        if metadata_only_macros:
1261            additional_metadata.append(str(metadata_only_macros))
1262
1263        for statements in [self.pre_statements_, self.post_statements_, self.on_virtual_update_]:
1264            for statement in statements or []:
1265                additional_metadata.append(statement.sql)
1266
1267        return additional_metadata
1268
1269    def _is_metadata_statement(self, statement: exp.Expr) -> bool:
1270        if isinstance(statement, d.MacroDef):
1271            return True
1272        if isinstance(statement, d.MacroFunc):
1273            target_macro = macro.get_registry().get(statement.name)
1274            if target_macro:
1275                return target_macro.metadata_only
1276            target_macro = self.python_env.get(statement.name)
1277            if target_macro:
1278                return bool(target_macro.is_metadata)
1279        return False
1280
1281    @property
1282    def full_depends_on(self) -> t.Set[str]:
1283        if not self.extract_dependencies_from_query:
1284            return self.depends_on_ or set()
1285        if self._full_depends_on is None:
1286            depends_on = self.depends_on_ or set()
1287
1288            query = self.render_query(needs_optimization=False)
1289            if query is not None:
1290                depends_on |= d.find_tables(
1291                    query, default_catalog=self.default_catalog, dialect=self.dialect
1292                )
1293            self._full_depends_on = depends_on
1294
1295        return self._full_depends_on
1296
1297    @property
1298    def partitioned_by(self) -> t.List[exp.Expr]:
1299        """Columns to partition the model by, including the time column if it is not already included."""
1300        if self.time_column and not self._is_time_column_in_partitioned_by:
1301            # This allows the user to opt out of automatic time_column injection
1302            # by setting `partition_by_time_column false` on the model kind
1303            if (
1304                hasattr(self.kind, "partition_by_time_column")
1305                and self.kind.partition_by_time_column
1306            ):
1307                return [
1308                    TIME_COL_PARTITION_FUNC.get(self.dialect, lambda x, y: x)(
1309                        self.time_column.column, self.columns_to_types
1310                    ),
1311                    *self.partitioned_by_,
1312                ]
1313        return self.partitioned_by_
1314
1315    @property
1316    def partition_interval_unit(self) -> t.Optional[IntervalUnit]:
1317        """The interval unit to use for partitioning if applicable."""
1318        # Only return the interval unit for partitioning if the partitioning
1319        # wasn't explicitly set by the user. Otherwise, the user-provided
1320        # value should always take precedence.
1321        if self.time_column and not self._is_time_column_in_partitioned_by:
1322            return self.interval_unit
1323        return None
1324
1325    @property
1326    def audits_with_args(self) -> t.List[t.Tuple[Audit, t.Dict[str, exp.Expr]]]:
1327        from sqlmesh.core.audit.builtin import BUILT_IN_AUDITS
1328
1329        audits_by_name = {**BUILT_IN_AUDITS, **self.audit_definitions}
1330        audits_with_args = []
1331        added_audits = set()
1332
1333        for audit_name, audit_args in self.audits:
1334            audits_with_args.append((audits_by_name[audit_name], audit_args.copy()))
1335            added_audits.add(audit_name)
1336
1337        for audit_name in self.audit_definitions:
1338            if audit_name not in added_audits:
1339                audits_with_args.append((audits_by_name[audit_name], {}))
1340
1341        return audits_with_args
1342
1343    @property
1344    def _is_time_column_in_partitioned_by(self) -> bool:
1345        return self.time_column is not None and self.time_column.column in {
1346            col for expr in self.partitioned_by_ for col in expr.find_all(exp.Column)
1347        }
1348
1349    @property
1350    def violated_rules_for_query(self) -> t.Dict[type[Rule], t.Any]:
1351        return {}
1352
1353
1354class SqlModel(_Model):
1355    """The model definition which relies on a SQL query to fetch the data.
1356
1357    Args:
1358        query: The main query representing the model.
1359        pre_statements: The list of SQL statements that precede the model's query.
1360        post_statements: The list of SQL statements that follow after the model's query.
1361        on_virtual_update: The list of SQL statements to be executed after the virtual update.
1362    """
1363
1364    query_: ParsableSql = Field(alias="query")
1365    source_type: t.Literal["sql"] = "sql"
1366
1367    _columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None
1368
1369    def __getstate__(self) -> t.Dict[t.Any, t.Any]:
1370        state = super().__getstate__()
1371        state["__dict__"] = state["__dict__"].copy()
1372        # query renderer is very expensive to serialize
1373        state["__dict__"].pop("_query_renderer", None)
1374        state["__dict__"].pop("column_descriptions", None)
1375        private = state[PRIVATE_FIELDS]
1376        private["_columns_to_types"] = None
1377        return state
1378
1379    def copy(self, **kwargs: t.Any) -> Self:
1380        model = super().copy(**kwargs)
1381        model.__dict__.pop("_query_renderer", None)
1382        model.__dict__.pop("column_descriptions", None)
1383        model._columns_to_types = None
1384        if kwargs.get("update", {}).keys() & {"depends_on_", "query"}:
1385            model._full_depends_on = None
1386        return model
1387
1388    @property
1389    def query(self) -> t.Union[exp.Query, d.JinjaQuery, d.MacroFunc]:
1390        parsed_query = self.query_.parse(self.dialect)
1391        return t.cast(t.Union[exp.Query, d.JinjaQuery, d.MacroFunc], parsed_query)
1392
1393    def render_query(
1394        self,
1395        *,
1396        start: t.Optional[TimeLike] = None,
1397        end: t.Optional[TimeLike] = None,
1398        execution_time: t.Optional[TimeLike] = None,
1399        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
1400        table_mapping: t.Optional[t.Dict[str, str]] = None,
1401        expand: t.Iterable[str] = tuple(),
1402        deployability_index: t.Optional[DeployabilityIndex] = None,
1403        engine_adapter: t.Optional[EngineAdapter] = None,
1404        **kwargs: t.Any,
1405    ) -> t.Optional[exp.Query]:
1406        query = self._query_renderer.render(
1407            start=start,
1408            end=end,
1409            execution_time=execution_time,
1410            snapshots=snapshots,
1411            table_mapping=table_mapping,
1412            expand=expand,
1413            deployability_index=deployability_index,
1414            engine_adapter=engine_adapter,
1415            **kwargs,
1416        )
1417
1418        return query
1419
1420    def render_definition(
1421        self,
1422        include_python: bool = True,
1423        include_defaults: bool = False,
1424        render_query: bool = False,
1425    ) -> t.List[exp.Expr]:
1426        result: t.List[exp.Expr] = super().render_definition(
1427            include_python=include_python, include_defaults=include_defaults
1428        )
1429
1430        if render_query:
1431            result.extend(self.render_pre_statements())
1432            result.append(self.render_query() or self.query)
1433            result.extend(self.render_post_statements())
1434            if virtual_update := self.render_on_virtual_update():
1435                result.append(d.VirtualUpdateStatement(expressions=virtual_update))
1436        else:
1437            result.extend(self.pre_statements)
1438            result.append(self.query)
1439            result.extend(self.post_statements)
1440            if self.on_virtual_update:
1441                result.append(d.VirtualUpdateStatement(expressions=self.on_virtual_update))
1442
1443        return result
1444
1445    @property
1446    def is_sql(self) -> bool:
1447        return True
1448
1449    @property
1450    def columns_to_types(self) -> t.Optional[t.Dict[str, exp.DataType]]:
1451        if self.columns_to_types_ is not None:
1452            self._columns_to_types = self.columns_to_types_
1453        elif self._columns_to_types is None:
1454            try:
1455                query = self._query_renderer.render()
1456            except Exception:
1457                logger.exception("Failed to render query for model %s", self.fqn)
1458                return None
1459
1460            if query is None:
1461                return None
1462
1463            unknown = exp.DataType.build("unknown")
1464
1465            columns_to_types = {}
1466            for select in query.selects:
1467                output_name = select.output_name
1468
1469                # If model validation is disabled, we cannot assume that projections
1470                # will have inferrable output names or even that they will be unique
1471                if not output_name or output_name in columns_to_types:
1472                    return None
1473
1474                # copy data type because it is used in the engine to build CTAS and other queries
1475                # this can change the parent which will mess up the diffing algo
1476                columns_to_types[output_name] = (select.type or unknown).copy()
1477
1478            self._columns_to_types = columns_to_types
1479
1480        if "*" in self._columns_to_types:
1481            return None
1482
1483        return {**self._columns_to_types, **self.managed_columns}
1484
1485    @cached_property
1486    def column_descriptions(self) -> t.Dict[str, str]:
1487        if self.column_descriptions_ is not None:
1488            return self.column_descriptions_
1489
1490        query = self.render_query()
1491        if query is None:
1492            return {}
1493
1494        return {
1495            select.alias_or_name: select.comments[-1].strip()
1496            for select in query.selects
1497            if select.comments
1498        }
1499
1500    def set_mapping_schema(self, schema: t.Dict) -> None:
1501        super().set_mapping_schema(schema)
1502        self._on_mapping_schema_set()
1503
1504    def update_schema(self, schema: MappingSchema) -> None:
1505        super().update_schema(schema)
1506        self._on_mapping_schema_set()
1507
1508    def _on_mapping_schema_set(self) -> None:
1509        self._columns_to_types = None
1510        self._query_renderer.update_schema(self.mapping_schema)
1511
1512    def validate_definition(self) -> None:
1513        query = self._query_renderer.render()
1514        if query is None:
1515            if self.depends_on_ is None:
1516                raise_config_error(
1517                    "Dependencies must be provided explicitly for models that can be rendered only at runtime",
1518                    self._path,
1519                )
1520            return
1521
1522        if not isinstance(query, exp.Query):
1523            raise_config_error("Missing SELECT query in the model definition", self._path)
1524
1525        projection_list = query.selects
1526        if not projection_list:
1527            raise_config_error("Query missing select statements", self._path)
1528
1529        if self.depends_on_self and not self.annotated:
1530            raise_config_error(
1531                "Self-referencing models require inferrable column types. There are three options available to mitigate this issue: add explicit types to all projections in the outermost SELECT statement, leverage external models (https://sqlmesh.readthedocs.io/en/stable/concepts/models/external_models/), or use the `columns` model attribute (https://sqlmesh.readthedocs.io/en/stable/concepts/models/overview/#columns).",
1532                self._path,
1533            )
1534
1535        super().validate_definition()
1536
1537    def is_breaking_change(self, previous: Model) -> t.Optional[bool]:
1538        if not isinstance(previous, SqlModel):
1539            return None
1540
1541        if self.lookback != previous.lookback:
1542            return None
1543
1544        try:
1545            # the previous model which comes from disk could be unrenderable
1546            previous_query = previous.render_query()
1547        except Exception:
1548            previous_query = None
1549        this_query = self.render_query()
1550
1551        if previous_query is None or this_query is None:
1552            # Can't determine if there's a breaking change if we can't render the query.
1553            return None
1554
1555        if previous_query is this_query:
1556            edits = []
1557        else:
1558            edits = diff(
1559                previous_query,
1560                this_query,
1561                matchings=[(previous_query, this_query)],
1562                delta_only=True,
1563                dialect=self.dialect if self.dialect == previous.dialect else None,
1564            )
1565        inserted_expressions = {e.expression for e in edits if isinstance(e, Insert)}
1566
1567        for edit in edits:
1568            if not isinstance(edit, Insert):
1569                return None
1570
1571            expr = edit.expression
1572            if isinstance(expr, exp.UDTF):
1573                # projection subqueries do not change cardinality, engines don't allow these to return
1574                # more than one row of data
1575                parent = expr.find_ancestor(exp.Subquery)
1576
1577                if not parent:
1578                    return None
1579
1580                expr = parent
1581
1582            if not _is_projection(expr) and expr.parent not in inserted_expressions:
1583                return None
1584
1585        return False
1586
1587    def is_metadata_only_change(self, previous: _Node) -> bool:
1588        if self._is_metadata_only_change_cache.get(id(previous), None) is not None:
1589            return self._is_metadata_only_change_cache[id(previous)]
1590
1591        if not super().is_metadata_only_change(previous):
1592            return False
1593
1594        if not isinstance(previous, SqlModel):
1595            self._is_metadata_only_change_cache[id(previous)] = False
1596            return False
1597
1598        this_rendered_query = self.render_query() or self.query
1599        previous_rendered_query = previous.render_query() or previous.query
1600        is_metadata_change = this_rendered_query == previous_rendered_query
1601
1602        self._is_metadata_only_change_cache[id(previous)] = is_metadata_change
1603        return is_metadata_change
1604
1605    @cached_property
1606    def _query_renderer(self) -> QueryRenderer:
1607        no_quote_identifiers = self.kind.is_view and self.dialect in ("trino", "spark")
1608        return QueryRenderer(
1609            self.query,
1610            self.dialect,
1611            self.macro_definitions,
1612            schema=self.mapping_schema,
1613            path=self._path,
1614            jinja_macro_registry=self.jinja_macros,
1615            python_env=self.python_env,
1616            only_execution_time=self.kind.only_execution_time,
1617            default_catalog=self.default_catalog,
1618            quote_identifiers=not no_quote_identifiers,
1619            optimize_query=self.optimize_query,
1620            model=self,
1621        )
1622
1623    @property
1624    def _data_hash_values_no_sql(self) -> t.List[str]:
1625        return [
1626            *super()._data_hash_values_no_sql,
1627            *self.jinja_macros.data_hash_values,
1628        ]
1629
1630    @property
1631    def _data_hash_values_sql(self) -> t.List[str]:
1632        return [
1633            *super()._data_hash_values_sql,
1634            self.query_.sql,
1635        ]
1636
1637    @property
1638    def _additional_metadata(self) -> t.List[str]:
1639        return [*super()._additional_metadata, self.query_.sql]
1640
1641    @property
1642    def violated_rules_for_query(self) -> t.Dict[type[Rule], t.Any]:
1643        self.render_query()
1644        return self._query_renderer._violated_rules
1645
1646
1647class SeedModel(_Model):
1648    """The model definition which uses a pre-built static dataset to source the data from.
1649
1650    Args:
1651        seed: The content of a pre-built static dataset.
1652    """
1653
1654    kind: SeedKind
1655    seed: Seed
1656    column_hashes_: t.Optional[t.Dict[str, str]] = Field(default=None, alias="column_hashes")
1657    derived_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None
1658    is_hydrated: bool = True
1659    source_type: t.Literal["seed"] = "seed"
1660
1661    def __getstate__(self) -> t.Dict[t.Any, t.Any]:
1662        state = super().__getstate__()
1663        state["__dict__"] = state["__dict__"].copy()
1664        state["__dict__"].pop("_reader", None)
1665        return state
1666
1667    def copy(self, **kwargs: t.Any) -> Self:
1668        model = super().copy(**kwargs)
1669        model.__dict__.pop("_reader", None)
1670        return model
1671
1672    def render(
1673        self,
1674        *,
1675        context: ExecutionContext,
1676        start: t.Optional[TimeLike] = None,
1677        end: t.Optional[TimeLike] = None,
1678        execution_time: t.Optional[TimeLike] = None,
1679        **kwargs: t.Any,
1680    ) -> t.Iterator[QueryOrDF]:
1681        if not self.is_hydrated:
1682            return
1683        yield from self.render_seed()
1684
1685    def render_seed(self) -> t.Iterator[QueryOrDF]:
1686        import numpy as np
1687
1688        self._ensure_hydrated()
1689
1690        date_columns = []
1691        datetime_columns = []
1692        bool_columns = []
1693        string_columns = []
1694
1695        columns_to_types = self.columns_to_types_ or {}
1696        column_names_to_check = set(columns_to_types)
1697        for name, tpe in columns_to_types.items():
1698            if tpe.this in (exp.DataType.Type.DATE, exp.DataType.Type.DATE32):
1699                date_columns.append(name)
1700            elif tpe.this in exp.DataType.TEMPORAL_TYPES:
1701                datetime_columns.append(name)
1702            elif tpe.is_type("boolean"):
1703                bool_columns.append(name)
1704            elif tpe.this in exp.DataType.TEXT_TYPES:
1705                string_columns.append(name)
1706
1707        for df in self._reader.read(batch_size=self.kind.batch_size):
1708            rename_dict = {}
1709            for column in columns_to_types:
1710                if column not in df:
1711                    normalized_name = normalize_identifiers(column, dialect=self.dialect).name
1712                    if normalized_name in df:
1713                        rename_dict[normalized_name] = column
1714            if rename_dict:
1715                df.rename(columns=rename_dict, inplace=True)
1716                # These names have already been checked
1717                column_names_to_check -= set(rename_dict)
1718
1719            missing_columns = column_names_to_check - set(df.columns)
1720            if missing_columns:
1721                raise_config_error(
1722                    f"Seed model '{self.name}' has missing columns: {missing_columns}", self._path
1723                )
1724
1725            # convert all date/time types to native pandas timestamp
1726            for column in [*date_columns, *datetime_columns]:
1727                import pandas as pd
1728
1729                df[column] = pd.to_datetime(df[column], infer_datetime_format=True, errors="ignore")  # type: ignore
1730
1731            # extract datetime.date from pandas timestamp for DATE columns
1732            for column in date_columns:
1733                try:
1734                    df[column] = df[column].dt.date
1735                except Exception as ex:
1736                    logger.error(
1737                        "Failed to convert column '%s' to date in seed model '%s': %s",
1738                        column,
1739                        self.name,
1740                        ex,
1741                    )
1742
1743            for column in bool_columns:
1744                df[column] = df[column].apply(lambda i: str_to_bool(str(i)))
1745
1746            df.loc[:, string_columns] = df[string_columns].mask(
1747                cond=lambda x: x.notna(),  # type: ignore
1748                other=df[string_columns].astype(str),  # type: ignore
1749            )
1750            yield df.replace({np.nan: None})
1751
1752    @property
1753    def columns_to_types(self) -> t.Optional[t.Dict[str, exp.DataType]]:
1754        if self.columns_to_types_ is not None:
1755            return self.columns_to_types_
1756        if self.derived_columns_to_types is not None:
1757            return self.derived_columns_to_types
1758        if self.is_hydrated:
1759            return self._reader.columns_to_types
1760        return None
1761
1762    @property
1763    def column_hashes(self) -> t.Dict[str, str]:
1764        if self.column_hashes_ is not None:
1765            return self.column_hashes_
1766        self._ensure_hydrated()
1767        return self._reader.column_hashes
1768
1769    @property
1770    def is_seed(self) -> bool:
1771        return True
1772
1773    @property
1774    def seed_path(self) -> Path:
1775        seed_path = Path(self.kind.path)
1776        if not seed_path.is_absolute():
1777            if self._path is None:
1778                raise SQLMeshError(f"Seed model '{self.name}' has no path")
1779            return self._path.parent / seed_path
1780        return seed_path
1781
1782    @property
1783    def depends_on(self) -> t.Set[str]:
1784        return (self.depends_on_ or set()) - {self.fqn}
1785
1786    @property
1787    def depends_on_self(self) -> bool:
1788        return False
1789
1790    @property
1791    def batch_size(self) -> t.Optional[int]:
1792        # Unlike other model kinds, the batch size provided in the SEED kind represents the
1793        # maximum number of rows to insert in a single batch.
1794        # We should never batch intervals for seed models.
1795        return None
1796
1797    def to_dehydrated(self) -> SeedModel:
1798        """Creates a dehydrated copy of this model.
1799
1800        The dehydrated seed model will not contain the seed content, but will contain
1801        the column hashes. This is useful for comparing two seed models without
1802        having to read the seed content from disk.
1803
1804        Returns:
1805            A dehydrated copy of this model.
1806        """
1807        if not self.is_hydrated:
1808            return self
1809
1810        return self.copy(
1811            update={
1812                "seed": Seed(content=""),
1813                "is_hydrated": False,
1814                "column_hashes_": self.column_hashes,
1815                "derived_columns_to_types": self.columns_to_types
1816                if self.columns_to_types_ is None
1817                else None,
1818            }
1819        )
1820
1821    def to_hydrated(self, content: str) -> SeedModel:
1822        """Creates a hydrated copy of this model with the given seed content.
1823
1824        Returns:
1825            A hydrated copy of this model.
1826        """
1827        if self.is_hydrated:
1828            return self
1829
1830        return self.copy(
1831            update={
1832                "seed": Seed(content=content),
1833                "is_hydrated": True,
1834                "column_hashes_": None,
1835            },
1836        )
1837
1838    def is_breaking_change(self, previous: Model) -> t.Optional[bool]:
1839        if not isinstance(previous, SeedModel):
1840            return None
1841
1842        new_columns = set(self.column_hashes)
1843        old_columns = set(previous.column_hashes)
1844
1845        if not new_columns.issuperset(old_columns):
1846            return None
1847
1848        for col in old_columns:
1849            if self.column_hashes[col] != previous.column_hashes[col]:
1850                return None
1851
1852        return False
1853
1854    def _ensure_hydrated(self) -> None:
1855        if not self.is_hydrated:
1856            raise SQLMeshError(f"Seed model '{self.name}' is not hydrated.")
1857
1858    @cached_property
1859    def _reader(self) -> CsvSeedReader:
1860        return self.seed.reader(dialect=self.dialect, settings=self.kind.csv_settings)
1861
1862    @property
1863    def _data_hash_values_no_sql(self) -> t.List[str]:
1864        data = super()._data_hash_values_no_sql
1865        for column_name, column_hash in self.column_hashes.items():
1866            data.append(column_name)
1867            data.append(column_hash)
1868
1869        # Include grants in data hash for seed models to force recreation on grant changes
1870        # since seed models don't support migration
1871        data.append(json.dumps(self.grants, sort_keys=True) if self.grants else "")
1872        data.append(self.grants_target_layer)
1873
1874        return data
1875
1876
1877class PythonModel(_Model):
1878    """The model definition which relies on a Python script to fetch the data.
1879
1880    Args:
1881        entrypoint: The name of a Python function which contains the data fetching / transformation logic.
1882    """
1883
1884    kind: ModelKind = FullKind()
1885    entrypoint: str
1886    source_type: t.Literal["python"] = "python"
1887
1888    def validate_definition(self) -> None:
1889        super().validate_definition()
1890
1891        if self.kind and not self.kind.supports_python_models:
1892            raise_config_error(
1893                f"Cannot create Python model '{self.name}' as the '{self.kind.name}' kind doesn't support Python models",
1894                self._path,
1895            )
1896
1897    def render(
1898        self,
1899        *,
1900        context: ExecutionContext,
1901        start: t.Optional[TimeLike] = None,
1902        end: t.Optional[TimeLike] = None,
1903        execution_time: t.Optional[TimeLike] = None,
1904        **kwargs: t.Any,
1905    ) -> t.Iterator[QueryOrDF]:
1906        env = prepare_env(self.python_env)
1907        start, end = make_inclusive(start or c.EPOCH, end or c.EPOCH, self.dialect)
1908        execution_time = to_datetime(execution_time or c.EPOCH)
1909
1910        variables = {
1911            **env.get(c.SQLMESH_VARS, {}),
1912            **env.get(c.SQLMESH_VARS_METADATA, {}),
1913            **kwargs.pop("variables", {}),
1914        }
1915        blueprint_variables = {
1916            k: d.parse_one(v.sql, dialect=self.dialect) if isinstance(v, SqlValue) else v
1917            for k, v in {
1918                **env.get(c.SQLMESH_BLUEPRINT_VARS, {}),
1919                **env.get(c.SQLMESH_BLUEPRINT_VARS_METADATA, {}),
1920            }.items()
1921        }
1922        try:
1923            kwargs = {
1924                **variables,
1925                **kwargs,
1926                "start": start,
1927                "end": end,
1928                "execution_time": execution_time,
1929                "latest": execution_time,  # TODO: Preserved for backward compatibility. Remove in 1.0.0.
1930            }
1931            df_or_iter = env[self.entrypoint](
1932                context=context.with_variables(variables, blueprint_variables=blueprint_variables),
1933                **kwargs,
1934            )
1935
1936            if not isinstance(df_or_iter, types.GeneratorType):
1937                df_or_iter = [df_or_iter]
1938
1939            for df in df_or_iter:
1940                yield df
1941        except Exception as e:
1942            raise PythonModelEvalError(format_evaluated_code_exception(e, self.python_env))
1943
1944    def render_definition(
1945        self,
1946        include_python: bool = True,
1947        include_defaults: bool = False,
1948        render_query: bool = False,
1949    ) -> t.List[exp.Expr]:
1950        # Ignore the provided value for the include_python flag, since the Pyhon model's
1951        # definition without Python code is meaningless.
1952        return super().render_definition(
1953            include_python=True, include_defaults=include_defaults, render_query=render_query
1954        )
1955
1956    @property
1957    def is_python(self) -> bool:
1958        return True
1959
1960    def is_breaking_change(self, previous: Model) -> t.Optional[bool]:
1961        return None
1962
1963    @property
1964    def _data_hash_values_no_sql(self) -> t.List[str]:
1965        data = super()._data_hash_values_no_sql
1966        data.append(self.entrypoint)
1967        return data
1968
1969
1970class ExternalModel(_Model):
1971    """The model definition which represents an external source/table."""
1972
1973    kind: ModelKind = ExternalKind()
1974    source_type: t.Literal["external"] = "external"
1975
1976    def is_breaking_change(self, previous: Model) -> t.Optional[bool]:
1977        if not isinstance(previous, ExternalModel):
1978            return None
1979        if not previous.columns_to_types_or_raise.items() - self.columns_to_types_or_raise.items():
1980            return False
1981        return None
1982
1983    @property
1984    def depends_on(self) -> t.Set[str]:
1985        return set()
1986
1987    @property
1988    def depends_on_self(self) -> bool:
1989        return False
1990
1991
1992Model = t.Union[SqlModel, SeedModel, PythonModel, ExternalModel]
1993
1994
1995class AuditResult(PydanticModel):
1996    audit: Audit
1997    """The audit this result is for."""
1998    audit_args: t.Dict[t.Any, t.Any]
1999    """Arguments passed to the audit."""
2000    model: t.Optional[_Model] = None
2001    """The model this audit is for."""
2002    count: t.Optional[int] = None
2003    """The number of records returned by the audit query. This could be None if the audit was skipped."""
2004    query: t.Optional[exp.Expr] = None
2005    """The rendered query used by the audit. This could be None if the audit was skipped."""
2006    skipped: bool = False
2007    """Whether or not the audit was blocking. This can be overriden by the user."""
2008    blocking: bool = True
2009
2010
2011class EvaluatableSignals(PydanticModel):
2012    signals_to_kwargs: t.Dict[str, t.Dict[str, t.Optional[exp.Expr]]]
2013    """A mapping of signal names to the kwargs passed to the signal."""
2014    python_env: t.Dict[str, Executable]
2015    """The Python environment that should be used to evaluated the rendered signal calls."""
2016    prepared_python_env: t.Dict[str, t.Any]
2017    """The prepared Python environment that should be used to evaluated the rendered signal calls."""
2018
2019
2020def _extract_blueprints(blueprints: t.Any, path: Path) -> t.List[t.Any]:
2021    if not blueprints:
2022        return [None]
2023    if isinstance(blueprints, exp.Paren):
2024        return [blueprints.unnest()]
2025    if isinstance(blueprints, (exp.Tuple, exp.Array)):
2026        return blueprints.expressions
2027    if isinstance(blueprints, list):
2028        return blueprints
2029
2030    raise_config_error(
2031        "Expected a list or tuple consisting of key-value mappings for "
2032        f"the 'blueprints' property, got '{blueprints}' instead",
2033        path,
2034    )
2035    return []  # This is unreachable, but is done to satisfy mypy
2036
2037
2038def _extract_blueprint_variables(blueprint: t.Any, path: Path) -> t.Dict[str, t.Any]:
2039    if not blueprint:
2040        return {}
2041    if isinstance(blueprint, (exp.Paren, exp.PropertyEQ)):
2042        blueprint = blueprint.unnest()
2043        return {blueprint.left.name.lower(): blueprint.right}
2044    if isinstance(blueprint, (exp.Tuple, exp.Array)):
2045        return {e.left.name.lower(): e.right for e in blueprint.expressions}
2046    if isinstance(blueprint, dict):
2047        return {k.lower(): v for k, v in blueprint.items()}
2048
2049    raise_config_error(
2050        f"Expected a key-value mapping for the blueprint value, got '{blueprint}' instead",
2051        path,
2052    )
2053    return {}  # This is unreachable, but is done to satisfy mypy
2054
2055
2056def create_models_from_blueprints(
2057    gateway: t.Optional[str | exp.Expr],
2058    blueprints: t.Any,
2059    get_variables: t.Callable[[t.Optional[str]], t.Dict[str, str]],
2060    loader: t.Callable[..., Model],
2061    path: Path = Path(),
2062    module_path: Path = Path(),
2063    dialect: DialectType = None,
2064    default_catalog_per_gateway: t.Optional[t.Dict[str, str]] = None,
2065    **loader_kwargs: t.Any,
2066) -> t.List[Model]:
2067    model_blueprints: t.List[Model] = []
2068    for blueprint in _extract_blueprints(blueprints, path):
2069        blueprint_variables = _extract_blueprint_variables(blueprint, path)
2070
2071        if gateway:
2072            rendered_gateway = render_expression(
2073                expression=exp.maybe_parse(gateway, dialect=dialect),
2074                module_path=module_path,
2075                macros=loader_kwargs.get("macros"),
2076                jinja_macros=loader_kwargs.get("jinja_macros"),
2077                path=path,
2078                dialect=dialect,
2079                default_catalog=loader_kwargs.get("default_catalog"),
2080                blueprint_variables=blueprint_variables,
2081            )
2082            gateway_name = rendered_gateway[0].name if rendered_gateway else None
2083        else:
2084            gateway_name = None
2085
2086        if (
2087            default_catalog_per_gateway
2088            and gateway_name
2089            and (catalog := default_catalog_per_gateway.get(gateway_name)) is not None
2090        ):
2091            loader_kwargs["default_catalog"] = catalog
2092
2093        model_blueprints.append(
2094            loader(
2095                path=path,
2096                module_path=module_path,
2097                dialect=dialect,
2098                variables=get_variables(gateway_name),
2099                blueprint_variables=blueprint_variables,
2100                **loader_kwargs,
2101            )
2102        )
2103
2104    return model_blueprints
2105
2106
2107def load_sql_based_models(
2108    expressions: t.List[exp.Expr],
2109    get_variables: t.Callable[[t.Optional[str]], t.Dict[str, str]],
2110    path: Path = Path(),
2111    module_path: Path = Path(),
2112    dialect: DialectType = None,
2113    default_catalog_per_gateway: t.Optional[t.Dict[str, str]] = None,
2114    **loader_kwargs: t.Any,
2115) -> t.List[Model]:
2116    gateway: t.Optional[exp.Expr] = None
2117    blueprints: t.Optional[exp.Expr] = None
2118
2119    model_meta = seq_get(expressions, 0)
2120    for prop in (isinstance(model_meta, d.Model) and model_meta.expressions) or []:
2121        if prop.name == "gateway":
2122            gateway = prop.args["value"]
2123        elif prop.name == "blueprints":
2124            # We pop the `blueprints` here to avoid walking large lists when rendering the meta
2125            blueprints = prop.pop().args["value"]
2126
2127    if isinstance(blueprints, d.MacroFunc):
2128        rendered_blueprints = render_expression(
2129            expression=blueprints,
2130            module_path=module_path,
2131            macros=loader_kwargs.get("macros"),
2132            jinja_macros=loader_kwargs.get("jinja_macros"),
2133            variables=get_variables(None),
2134            path=path,
2135            dialect=dialect,
2136            default_catalog=loader_kwargs.get("default_catalog"),
2137        )
2138        if not rendered_blueprints:
2139            raise_config_error("Failed to render blueprints property", path)
2140
2141        # Help mypy see that rendered_blueprints can't be None
2142        assert rendered_blueprints
2143
2144        if len(rendered_blueprints) > 1:
2145            rendered_blueprints = [exp.Tuple(expressions=rendered_blueprints)]
2146
2147        blueprints = rendered_blueprints[0]
2148
2149    return create_models_from_blueprints(
2150        gateway=gateway,
2151        blueprints=blueprints,
2152        get_variables=get_variables,
2153        loader=partial(load_sql_based_model, expressions),
2154        path=path,
2155        module_path=module_path,
2156        dialect=dialect,
2157        default_catalog_per_gateway=default_catalog_per_gateway,
2158        **loader_kwargs,
2159    )
2160
2161
2162def load_sql_based_model(
2163    expressions: t.List[exp.Expr],
2164    *,
2165    defaults: t.Optional[t.Dict[str, t.Any]] = None,
2166    path: t.Optional[Path] = None,
2167    module_path: Path = Path(),
2168    time_column_format: str = c.DEFAULT_TIME_COLUMN_FORMAT,
2169    macros: t.Optional[MacroRegistry] = None,
2170    jinja_macros: t.Optional[JinjaMacroRegistry] = None,
2171    audits: t.Optional[t.Dict[str, ModelAudit]] = None,
2172    python_env: t.Optional[t.Dict[str, Executable]] = None,
2173    dialect: t.Optional[str] = None,
2174    physical_schema_mapping: t.Optional[t.Dict[re.Pattern, str]] = None,
2175    default_catalog: t.Optional[str] = None,
2176    variables: t.Optional[t.Dict[str, t.Any]] = None,
2177    infer_names: t.Optional[bool] = False,
2178    blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None,
2179    **kwargs: t.Any,
2180) -> Model:
2181    """Load a model from a parsed SQLMesh model SQL file.
2182
2183    Args:
2184        expressions: Model, *Statements, Query.
2185        defaults: Definition default values.
2186        path: An optional path to the model definition file.
2187        module_path: The python module path to serialize macros for.
2188        time_column_format: The default time column format to use if no model time column is configured.
2189        macros: The custom registry of macros. If not provided the default registry will be used.
2190        jinja_macros: The registry of Jinja macros.
2191        python_env: The custom Python environment for macros. If not provided the environment will be constructed
2192            from the macro registry.
2193        dialect: The default dialect if no model dialect is configured.
2194            The format must adhere to Python's strftime codes.
2195        physical_schema_mapping: A mapping of regular expressions to match against the model schema to produce the corresponding physical schema
2196        default_catalog: The default catalog if no model catalog is configured.
2197        variables: The variables to pass to the model.
2198        kwargs: Additional kwargs to pass to the loader.
2199    """
2200    missing_model_msg = f"""Please add a MODEL block at the top of the file. Example:
2201
2202MODEL (
2203  name sqlmesh_example.full_model, --model name
2204  kind FULL, --materialization
2205  cron '@daily', --schedule
2206);
2207
2208Learn more at https://sqlmesh.readthedocs.io/en/stable/concepts/models/overview
2209"""
2210
2211    if not expressions:
2212        raise_config_error(missing_model_msg)
2213
2214    dialect = dialect or ""
2215    meta = expressions[0]
2216    if not isinstance(meta, d.Model):
2217        if not infer_names:
2218            raise_config_error(missing_model_msg)
2219        meta = d.Model(expressions=[])  # Dummy meta node
2220        expressions.insert(0, meta)
2221
2222    # We deliberately hold off rendering some properties at load time because there is not enough information available
2223    # at load time to render them. They will get rendered later at evaluation time
2224    unrendered_properties = {}
2225    unrendered_merge_filter = None
2226
2227    for prop in meta.expressions:
2228        # Macro functions that programmaticaly generate the key-value pair properties should be rendered
2229        # This is needed in the odd case where a macro shares the name of one of the properties
2230        # eg `@session_properties()` Test: `test_macros_in_model_statement` Reference PR: #2574
2231        if isinstance(prop, d.MacroFunc):
2232            continue
2233
2234        prop_name = prop.name.lower()
2235        if prop_name in {"signals", "audits"} | PROPERTIES:
2236            unrendered_properties[prop_name] = prop.args.get("value")
2237        elif (
2238            prop.name.lower() == "kind"
2239            and (value := prop.args.get("value"))
2240            and value.name.lower() == "incremental_by_unique_key"
2241        ):
2242            for kind_prop in value.expressions:
2243                if kind_prop.name.lower() == "merge_filter":
2244                    unrendered_merge_filter = kind_prop
2245
2246    rendered_meta_exprs = render_expression(
2247        expression=meta,
2248        module_path=module_path,
2249        macros=macros,
2250        jinja_macros=jinja_macros,
2251        variables=variables,
2252        path=path,
2253        dialect=dialect,
2254        default_catalog=default_catalog,
2255        blueprint_variables=blueprint_variables,
2256    )
2257
2258    if rendered_meta_exprs is None or len(rendered_meta_exprs) != 1:
2259        raise_config_error(
2260            f"Invalid MODEL statement:\n{meta.sql(dialect=dialect, pretty=True)}",
2261            path,
2262        )
2263        raise
2264
2265    rendered_meta = rendered_meta_exprs[0]
2266
2267    rendered_defaults = (
2268        render_model_defaults(
2269            defaults=defaults,
2270            module_path=module_path,
2271            macros=macros,
2272            jinja_macros=jinja_macros,
2273            variables=variables,
2274            path=path,
2275            dialect=dialect,
2276            default_catalog=default_catalog,
2277        )
2278        if defaults
2279        else {}
2280    )
2281
2282    rendered_defaults = parse_defaults_properties(rendered_defaults, dialect=dialect)
2283
2284    # Extract the query and any pre/post statements
2285    query_or_seed_insert, pre_statements, post_statements, on_virtual_update, inline_audits = (
2286        _split_sql_model_statements(expressions[1:], path, dialect=dialect)
2287    )
2288
2289    meta_fields: t.Dict[str, t.Any] = {
2290        "dialect": dialect,
2291        "description": (
2292            "\n".join(comment.strip() for comment in rendered_meta.comments)
2293            if rendered_meta.comments
2294            else None
2295        ),
2296        **{prop.name.lower(): prop.args.get("value") for prop in rendered_meta.expressions},
2297        **kwargs,
2298    }
2299
2300    # Discard the potentially half-rendered versions of these properties and replace them with the
2301    # original unrendered versions. They will get rendered properly at evaluation time
2302    meta_fields.update(unrendered_properties)
2303
2304    if unrendered_merge_filter:
2305        for idx, kind_prop in enumerate(meta_fields["kind"].expressions):
2306            if kind_prop.name.lower() == "merge_filter":
2307                meta_fields["kind"].expressions[idx] = unrendered_merge_filter
2308
2309    if isinstance(meta_fields.get("dialect"), exp.Expr):
2310        meta_fields["dialect"] = meta_fields["dialect"].name
2311
2312    # The name of the model will be inferred from its path relative to `models/`, if it's not explicitly specified
2313    name = meta_fields.pop("name", "")
2314    if not name and infer_names:
2315        if path is None:
2316            raise ValueError(f"Model {name} must have a name")
2317        name = get_model_name(path)
2318
2319    if not name:
2320        raise_config_error(
2321            "Please add the required 'name' field to the MODEL block at the top of the file.\n\n"
2322            + "Learn more at https://sqlmesh.readthedocs.io/en/stable/concepts/models/overview"
2323        )
2324    if "default_catalog" in meta_fields:
2325        raise_config_error(
2326            "`default_catalog` cannot be set on a per-model basis. It must be set at the connection level.",
2327            path,
2328        )
2329
2330    common_kwargs = dict(
2331        pre_statements=pre_statements,
2332        post_statements=post_statements,
2333        on_virtual_update=on_virtual_update,
2334        defaults=rendered_defaults,
2335        path=path,
2336        module_path=module_path,
2337        macros=macros,
2338        python_env=python_env,
2339        jinja_macros=jinja_macros,
2340        physical_schema_mapping=physical_schema_mapping,
2341        default_catalog=default_catalog,
2342        variables=variables,
2343        inline_audits=inline_audits,
2344        blueprint_variables=blueprint_variables,
2345        use_original_sql=True,
2346        **meta_fields,
2347    )
2348
2349    kind = common_kwargs.pop("kind", ModelMeta.all_field_infos()["kind"].default)
2350
2351    if kind.name != ModelKindName.SEED:
2352        return create_sql_model(
2353            name,
2354            query_or_seed_insert,
2355            kind=kind,
2356            time_column_format=time_column_format,
2357            **common_kwargs,
2358        )
2359
2360    seed_properties = {p.name.lower(): p.args.get("value") for p in kind.expressions}
2361    return create_seed_model(
2362        name,
2363        SeedKind(**seed_properties),
2364        **common_kwargs,
2365    )
2366
2367
2368def create_sql_model(
2369    name: TableName,
2370    query: t.Optional[exp.Expr],
2371    **kwargs: t.Any,
2372) -> Model:
2373    """Creates a SQL model.
2374
2375    Args:
2376        name: The name of the model, which is of the form [catalog].[db].table.
2377            The catalog and db are optional.
2378        query: The model's logic in a form of a SELECT query.
2379    """
2380    if not isinstance(query, (exp.Query, d.JinjaQuery, d.MacroFunc)):
2381        raise_config_error(
2382            "A query is required and must be a SELECT statement, a UNION statement, or a JINJA_QUERY block",
2383            kwargs.get("path"),
2384        )
2385        assert isinstance(query, (exp.Query, d.JinjaQuery, d.MacroFunc))
2386
2387    return _create_model(SqlModel, name, query=query, **kwargs)
2388
2389
2390def create_seed_model(
2391    name: TableName,
2392    seed_kind: SeedKind,
2393    *,
2394    path: t.Optional[Path] = None,
2395    module_path: Path = Path(),
2396    **kwargs: t.Any,
2397) -> Model:
2398    """Creates a Seed model.
2399
2400    Args:
2401        name: The name of the model, which is of the form [catalog].[db].table.
2402            The catalog and db are optional.
2403        seed_kind: The information about the location of a seed and other related configuration.
2404        path: An optional path to the model definition file.
2405            from the macro registry.
2406    """
2407    seed_path = Path(seed_kind.path)
2408    marker, *subdirs = seed_path.parts
2409    if marker.lower() == "$root":
2410        seed_path = module_path.joinpath(*subdirs)
2411        seed_kind.path = str(seed_path)
2412    elif not seed_path.is_absolute():
2413        if path is None:
2414            seed_path = seed_path
2415        elif path.is_dir():
2416            seed_path = path / seed_path
2417        else:
2418            seed_path = path.parent / seed_path
2419
2420    seed = create_seed(seed_path)
2421
2422    return _create_model(
2423        SeedModel,
2424        name,
2425        path=path,
2426        seed=seed,
2427        kind=seed_kind,
2428        depends_on=kwargs.pop("depends_on", None),
2429        module_path=module_path,
2430        **kwargs,
2431    )
2432
2433
2434def create_python_model(
2435    name: str,
2436    entrypoint: str,
2437    python_env: t.Dict[str, Executable],
2438    *,
2439    macros: t.Optional[MacroRegistry] = None,
2440    jinja_macros: t.Optional[JinjaMacroRegistry] = None,
2441    path: Path = Path(),
2442    module_path: Path = Path(),
2443    depends_on: t.Optional[t.Set[str]] = None,
2444    variables: t.Optional[t.Dict[str, t.Any]] = None,
2445    blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None,
2446    **kwargs: t.Any,
2447) -> Model:
2448    """Creates a Python model.
2449
2450    Args:
2451        name: The name of the model, which is of the form [catalog].[db].table.
2452            The catalog and db are optional.
2453        entrypoint: The name of a Python function which contains the data fetching / transformation logic.
2454        python_env: The Python environment of all objects referenced by the model implementation.
2455        path: An optional path to the model definition file.
2456        depends_on: The custom set of model's upstream dependencies.
2457        variables: The variables to pass to the model.
2458        blueprint_variables: The blueprint's variables to pass to the model.
2459    """
2460    # Find dependencies for python models by parsing code if they are not explicitly defined
2461    # Also remove self-references that are found
2462
2463    dialect = kwargs.get("dialect")
2464
2465    dependencies_unspecified = depends_on is None
2466
2467    parsed_depends_on, referenced_variables = (
2468        parse_dependencies(
2469            python_env,
2470            entrypoint,
2471            strict_resolution=dependencies_unspecified,
2472            variables=variables,
2473            blueprint_variables=blueprint_variables,
2474        )
2475        if python_env is not None
2476        else (set(), set())
2477    )
2478    if dependencies_unspecified:
2479        depends_on = parsed_depends_on - {name}
2480    else:
2481        depends_on_rendered = render_expression(
2482            expression=exp.Array(
2483                expressions=[exp.maybe_parse(dep, dialect=dialect) for dep in depends_on or []]
2484            ),
2485            module_path=module_path,
2486            macros=macros,
2487            jinja_macros=jinja_macros,
2488            variables=variables,
2489            path=path,
2490            dialect=dialect,
2491            default_catalog=kwargs.get("default_catalog"),
2492        )
2493        depends_on = {
2494            dep.sql(dialect=dialect)
2495            for dep in t.cast(t.List[exp.Expr], depends_on_rendered)[0].expressions
2496        }
2497
2498    used_variables = {k: v for k, v in (variables or {}).items() if k in referenced_variables}
2499    if used_variables:
2500        python_env[c.SQLMESH_VARS] = Executable.value(used_variables, sort_root_dict=True)
2501
2502    return _create_model(
2503        PythonModel,
2504        name,
2505        path=path,
2506        depends_on=depends_on,
2507        entrypoint=entrypoint,
2508        python_env=python_env,
2509        macros=macros,
2510        jinja_macros=jinja_macros,
2511        module_path=module_path,
2512        variables=variables,
2513        blueprint_variables=blueprint_variables,
2514        **kwargs,
2515    )
2516
2517
2518def create_external_model(
2519    name: TableName,
2520    *,
2521    dialect: t.Optional[str] = None,
2522    path: Path = Path(),
2523    defaults: t.Optional[t.Dict[str, t.Any]] = None,
2524    **kwargs: t.Any,
2525) -> ExternalModel:
2526    """Creates an external model.
2527
2528    Args:
2529        name: The name of the model, which is of the form [catalog].[db].table.
2530            The catalog and db are optional.
2531        dialect: The dialect to serialize.
2532        path: An optional path to the model definition file.
2533    """
2534    return t.cast(
2535        ExternalModel,
2536        _create_model(
2537            ExternalModel,
2538            name,
2539            defaults=defaults,
2540            dialect=dialect,
2541            path=path,
2542            kind=ModelKindName.EXTERNAL.value,
2543            **kwargs,
2544        ),
2545    )
2546
2547
2548def _create_model(
2549    klass: t.Type[_Model],
2550    name: TableName,
2551    *,
2552    defaults: t.Optional[t.Dict[str, t.Any]] = None,
2553    path: t.Optional[Path] = None,
2554    time_column_format: str = c.DEFAULT_TIME_COLUMN_FORMAT,
2555    jinja_macros: t.Optional[JinjaMacroRegistry] = None,
2556    jinja_macro_references: t.Optional[t.Set[MacroReference]] = None,
2557    depends_on: t.Optional[t.Set[str]] = None,
2558    dialect: t.Optional[str] = None,
2559    physical_schema_mapping: t.Optional[t.Dict[re.Pattern, str]] = None,
2560    python_env: t.Optional[t.Dict[str, Executable]] = None,
2561    audit_definitions: t.Optional[t.Dict[str, ModelAudit]] = None,
2562    inline_audits: t.Optional[t.Dict[str, ModelAudit]] = None,
2563    module_path: Path = Path(),
2564    macros: t.Optional[MacroRegistry] = None,
2565    signal_definitions: t.Optional[SignalRegistry] = None,
2566    variables: t.Optional[t.Dict[str, t.Any]] = None,
2567    blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None,
2568    use_original_sql: bool = False,
2569    **kwargs: t.Any,
2570) -> Model:
2571    validate_extra_and_required_fields(
2572        klass,
2573        {"name", *kwargs} - {"grain", "table_properties"},
2574        "MODEL block",
2575        path,
2576    )
2577
2578    for prop in PROPERTIES:
2579        kwargs[prop] = _resolve_properties((defaults or {}).get(prop), kwargs.get(prop))
2580
2581    dialect = dialect or ""
2582
2583    physical_schema_mapping = physical_schema_mapping or {}
2584    model_schema_name = exp.to_table(name, dialect=dialect).db
2585    physical_schema_override: t.Optional[str] = None
2586
2587    for re_pattern, override_schema in physical_schema_mapping.items():
2588        if re.match(re_pattern, model_schema_name):
2589            physical_schema_override = override_schema
2590            break
2591
2592    raw_kind = kwargs.pop("kind", None)
2593    if raw_kind:
2594        kwargs["kind"] = create_model_kind(raw_kind, dialect, defaults or {})
2595
2596    defaults = {k: v for k, v in (defaults or {}).items() if k in klass.all_fields()}
2597    if not issubclass(klass, SqlModel):
2598        defaults.pop("optimize_query", None)
2599
2600    statements: t.List[t.Union[exp.Expr, t.Tuple[exp.Expr, bool]]] = []
2601
2602    if "query" in kwargs:
2603        statements.append(kwargs["query"])
2604        kwargs["query"] = ParsableSql.from_parsed_expression(
2605            kwargs["query"], dialect, use_meta_sql=use_original_sql
2606        )
2607
2608    # Merge default statements with model-specific statements
2609    for statement_field in ["pre_statements", "post_statements", "on_virtual_update"]:
2610        if statement_field in defaults:
2611            kwargs[statement_field] = [
2612                exp.maybe_parse(stmt, dialect=dialect) for stmt in defaults[statement_field]
2613            ] + kwargs.get(statement_field, [])
2614        if statement_field in kwargs:
2615            # Macros extracted from these statements need to be treated as metadata only
2616            is_metadata = statement_field == "on_virtual_update"
2617            for stmt in kwargs[statement_field]:
2618                # Extract the expression if it's ParsableSql already
2619                expr = stmt.parse(dialect) if isinstance(stmt, ParsableSql) else stmt
2620                statements.append((expr, is_metadata))
2621            kwargs[statement_field] = [
2622                # this to retain the transaction information
2623                stmt
2624                if isinstance(stmt, ParsableSql)
2625                else ParsableSql.from_parsed_expression(
2626                    stmt, dialect, use_meta_sql=use_original_sql
2627                )
2628                for stmt in kwargs[statement_field]
2629            ]
2630
2631    # This is done to allow variables like @gateway to be used in these properties
2632    # since rendering shifted from load time to run time.
2633    # Note: we check for Tuple since that's what we expect from _resolve_properties
2634    for property_name in PROPERTIES:
2635        property_values = kwargs.get(property_name)
2636        if isinstance(property_values, exp.Tuple):
2637            statements.extend(property_values.expressions)
2638
2639    if isinstance(getattr(kwargs.get("kind"), "merge_filter", None), exp.Expr):
2640        statements.append(kwargs["kind"].merge_filter)
2641
2642    jinja_macro_references, referenced_variables = extract_macro_references_and_variables(
2643        *(gen(e if isinstance(e, exp.Expr) else e[0]) for e in statements)
2644    )
2645
2646    if jinja_macros:
2647        jinja_macros = (
2648            jinja_macros if jinja_macros.trimmed else jinja_macros.trim(jinja_macro_references)
2649        )
2650    else:
2651        jinja_macros = JinjaMacroRegistry()
2652
2653    for jinja_macro in jinja_macros.root_macros.values():
2654        referenced_variables.update(
2655            extract_macro_references_and_variables(jinja_macro.definition)[1]
2656        )
2657
2658    # Merge model-specific audits with default audits
2659    if default_audits := defaults.pop("audits", None):
2660        kwargs["audits"] = default_audits + d.extract_function_calls(kwargs.pop("audits", []))
2661
2662    model = klass(
2663        name=name,
2664        **{
2665            **(defaults or {}),
2666            "jinja_macros": jinja_macros or JinjaMacroRegistry(),
2667            "dialect": dialect,
2668            "depends_on": depends_on,
2669            "physical_schema_override": physical_schema_override,
2670            **kwargs,
2671        },
2672    )
2673
2674    audit_definitions = {
2675        **(audit_definitions or {}),
2676        **(inline_audits or {}),
2677    }
2678
2679    used_audits: t.Set[str] = {audit_name for audit_name, _ in model.audits}
2680
2681    audit_definitions = {
2682        audit_name: audit_definitions[audit_name]
2683        for audit_name in used_audits
2684        if audit_name in audit_definitions
2685    }
2686
2687    model.audit_definitions.update(audit_definitions)
2688
2689    # Any macro referenced in audits or signals needs to be treated as metadata-only
2690    statements.extend((audit.query, True) for audit in audit_definitions.values())  # type: ignore[misc]
2691
2692    # Ensure that all audits referenced in the model are defined
2693    from sqlmesh.core.audit.builtin import BUILT_IN_AUDITS
2694
2695    available_audits = BUILT_IN_AUDITS.keys() | model.audit_definitions.keys()
2696    for referenced_audit, audit_args in model.audits:
2697        if referenced_audit not in available_audits:
2698            raise_config_error(f"Audit '{referenced_audit}' is undefined", location=path)
2699
2700        statements.extend(
2701            (audit_arg_expression, True) for audit_arg_expression in audit_args.values()
2702        )
2703
2704    signal_definitions = signal_definitions or UniqueKeyDict("signals")
2705
2706    for referenced_signal, kwargs in model.signals:
2707        if referenced_signal and referenced_signal not in signal_definitions:
2708            raise_config_error(f"Signal '{referenced_signal}' is undefined", location=path)
2709
2710        statements.extend((signal_kwarg, True) for signal_kwarg in kwargs.values())
2711
2712    python_env = make_python_env(
2713        statements,
2714        jinja_macro_references,
2715        module_path,
2716        macros or macro.get_registry(),
2717        variables=variables,
2718        referenced_variables=referenced_variables,
2719        path=path,
2720        python_env=python_env,
2721        strict_resolution=depends_on is None,
2722        blueprint_variables=blueprint_variables,
2723        dialect=dialect,
2724    )
2725
2726    env: t.Dict[str, t.Tuple[t.Any, t.Optional[bool]]] = {}
2727
2728    for signal_name, _ in model.signals:
2729        if signal_name and signal_name in signal_definitions:
2730            func = signal_definitions[signal_name].func
2731            setattr(func, c.SQLMESH_METADATA, True)
2732            build_env(func, env=env, name=signal_name, path=module_path)
2733
2734    model.python_env.update(python_env)
2735    model.python_env.update(serialize_env(env, path=module_path))
2736    model._path = path
2737    model.set_time_format(time_column_format)
2738
2739    return t.cast(Model, model)
2740
2741
2742INSERT_SEED_MACRO_CALL = d.parse_one("@INSERT_SEED()")
2743
2744
2745def _split_sql_model_statements(
2746    expressions: t.List[exp.Expr],
2747    path: t.Optional[Path],
2748    dialect: t.Optional[str] = None,
2749) -> t.Tuple[
2750    t.Optional[exp.Expr],
2751    t.List[exp.Expr],
2752    t.List[exp.Expr],
2753    t.List[exp.Expr],
2754    UniqueKeyDict[str, ModelAudit],
2755]:
2756    """Extracts the SELECT query from a sequence of expressions.
2757
2758    Args:
2759        expressions: The list of all SQL statements in the model definition.
2760
2761    Returns:
2762        A tuple containing the extracted SELECT query or the `@INSERT_SEED()` call, the statements before the it,
2763        the statements after it, and the inline audit definitions.
2764
2765    Raises:
2766        ConfigError: If the model definition contains more than one SELECT query or `@INSERT_SEED()` call.
2767    """
2768    from sqlmesh.core.audit import ModelAudit, load_audit
2769
2770    query_positions = []
2771    sql_statements = []
2772    on_virtual_update = []
2773    inline_audits: UniqueKeyDict[str, ModelAudit] = UniqueKeyDict("inline_audits")
2774
2775    idx = 0
2776    length = len(expressions)
2777    while idx < length:
2778        expr = expressions[idx]
2779
2780        if isinstance(expr, d.Audit):
2781            loaded_audit = load_audit([expr, expressions[idx + 1]], dialect=dialect)
2782            assert isinstance(loaded_audit, ModelAudit)
2783            inline_audits[loaded_audit.name] = loaded_audit
2784            idx += 2
2785        elif isinstance(expr, d.VirtualUpdateStatement):
2786            for statement in expr.expressions:
2787                on_virtual_update.append(statement)
2788            idx += 1
2789        else:
2790            if (
2791                isinstance(expr, (exp.Query, d.JinjaQuery))
2792                or expr == INSERT_SEED_MACRO_CALL
2793                or (
2794                    isinstance(expr, d.MacroFunc)
2795                    and (expr.this.name.lower() == "union" or length == 1)
2796                )
2797            ):
2798                query_positions.append((expr, idx))
2799            sql_statements.append(expr)
2800            idx += 1
2801
2802    if not query_positions:
2803        return None, sql_statements, [], on_virtual_update, inline_audits
2804
2805    if len(query_positions) > 1:
2806        raise_config_error("Only one SELECT query is allowed per model", path)
2807
2808    query, pos = query_positions[0]
2809    return query, sql_statements[:pos], sql_statements[pos + 1 :], on_virtual_update, inline_audits
2810
2811
2812def _resolve_properties(
2813    default: t.Optional[t.Dict[str, t.Any]],
2814    provided: t.Optional[exp.Expr | t.Dict[str, t.Any]],
2815) -> t.Optional[exp.Expr]:
2816    if isinstance(provided, dict):
2817        properties = {k: exp.Literal.string(k).eq(v) for k, v in provided.items()}
2818    elif provided:
2819        if isinstance(provided, exp.Paren):
2820            provided = exp.Tuple(expressions=[provided.this])
2821        properties = {expr.this.name: expr for expr in provided}
2822    else:
2823        properties = {}
2824
2825    for k, v in (default or {}).items():
2826        if k not in properties:
2827            properties[k] = exp.Literal.string(k).eq(v)
2828        elif properties[k].expression.sql().lower() in {"none", "null"}:
2829            del properties[k]
2830
2831    if properties:
2832        return exp.Tuple(expressions=list(properties.values()))
2833
2834    return None
2835
2836
2837def _list_of_calls_to_exp(value: t.List[t.Tuple[str, t.Dict[str, t.Any]]]) -> exp.Expr:
2838    return exp.Tuple(
2839        expressions=[
2840            exp.Anonymous(
2841                this=v[0],
2842                expressions=[
2843                    exp.EQ(this=exp.convert(left), expression=exp.convert(right))
2844                    for left, right in v[1].items()
2845                ],
2846            )
2847            for v in value
2848        ]
2849    )
2850
2851
2852def _is_projection(expr: exp.Expr) -> bool:
2853    parent = expr.parent
2854    return isinstance(parent, exp.Select) and expr.arg_key == "expressions"
2855
2856
2857def _single_expr_or_tuple(values: t.Sequence[exp.Expr]) -> exp.Expr | exp.Tuple:
2858    return values[0] if len(values) == 1 else exp.Tuple(expressions=values)
2859
2860
2861def _refs_to_sql(values: t.Any) -> exp.Expr:
2862    return exp.Tuple(expressions=values)
2863
2864
2865def render_meta_fields(
2866    fields: t.Dict[str, t.Any],
2867    module_path: Path,
2868    path: t.Optional[Path],
2869    jinja_macros: t.Optional[JinjaMacroRegistry],
2870    macros: t.Optional[MacroRegistry],
2871    dialect: DialectType,
2872    variables: t.Optional[t.Dict[str, t.Any]],
2873    default_catalog: t.Optional[str],
2874    blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None,
2875) -> t.Dict[str, t.Any]:
2876    def render_field_value(value: t.Any) -> t.Any:
2877        if isinstance(value, exp.Expr) or (isinstance(value, str) and "@" in value):
2878            expression = exp.maybe_parse(value, dialect=dialect)
2879            rendered_expr = render_expression(
2880                expression=expression,
2881                module_path=module_path,
2882                macros=macros,
2883                jinja_macros=jinja_macros,
2884                variables=variables,
2885                path=path,
2886                dialect=dialect,
2887                default_catalog=default_catalog,
2888                blueprint_variables=blueprint_variables,
2889            )
2890            if not rendered_expr:
2891                raise SQLMeshError(
2892                    f"Rendering `{expression.sql(dialect=dialect)}` did not return an expression"
2893                )
2894
2895            if len(rendered_expr) != 1:
2896                raise SQLMeshError(
2897                    f"Rendering `{expression.sql(dialect=dialect)}` must return one result, but got {len(rendered_expr)}"
2898                )
2899
2900            # For cases where a property is conditionally assigned
2901            if rendered_expr[0].sql().lower() in {"none", "null"}:
2902                return None
2903
2904            return rendered_expr[0]
2905
2906        return value
2907
2908    for field_name, field_info in ModelMeta.all_field_infos().items():
2909        field = field_info.alias or field_name
2910        field_value = fields.get(field)
2911
2912        # We don't want to parse python model cron="@..." kwargs (e.g. @daily) into MacroVar
2913        if (
2914            field == "cron"
2915            and isinstance(field_value, str)
2916            and field_value.lower() in CRON_SHORTCUTS
2917        ) or field_value is None:
2918            continue
2919
2920        if field in RUNTIME_RENDERED_MODEL_FIELDS:
2921            fields[field] = parse_strings_with_macro_refs(field_value, dialect)
2922            continue
2923
2924        if isinstance(field_value, dict):
2925            rendered_dict = {}
2926            for key, value in field_value.items():
2927                if key in RUNTIME_RENDERED_MODEL_FIELDS:
2928                    rendered_dict[key] = parse_strings_with_macro_refs(value, dialect)
2929                elif (
2930                    # don't parse kind auto_restatement_cron="@..." kwargs (e.g. @daily) into MacroVar
2931                    key == "auto_restatement_cron"
2932                    and isinstance(value, str)
2933                    and value.lower() in CRON_SHORTCUTS
2934                ):
2935                    rendered_dict[key] = value
2936                elif (rendered := render_field_value(value)) is not None:
2937                    rendered_dict[key] = rendered
2938
2939            if rendered_dict:
2940                fields[field] = rendered_dict
2941            else:
2942                fields.pop(field)
2943        elif isinstance(field_value, list):
2944            rendered_list = [
2945                rendered
2946                for value in field_value
2947                if (rendered := render_field_value(value)) is not None
2948            ]
2949            if rendered_list:
2950                fields[field] = rendered_list
2951            else:
2952                fields.pop(field)
2953        else:
2954            rendered_field = render_field_value(field_value)
2955            if rendered_field is not None:
2956                fields[field] = rendered_field
2957            else:
2958                fields.pop(field)
2959
2960    return fields
2961
2962
2963def render_model_defaults(
2964    defaults: t.Dict[str, t.Any],
2965    module_path: Path,
2966    path: t.Optional[Path],
2967    jinja_macros: t.Optional[JinjaMacroRegistry],
2968    macros: t.Optional[MacroRegistry],
2969    dialect: DialectType,
2970    variables: t.Optional[t.Dict[str, t.Any]],
2971    default_catalog: t.Optional[str],
2972) -> t.Dict[str, t.Any]:
2973    rendered_defaults = render_meta_fields(
2974        fields=defaults,
2975        module_path=module_path,
2976        macros=macros,
2977        jinja_macros=jinja_macros,
2978        variables=variables,
2979        path=path,
2980        dialect=dialect,
2981        default_catalog=default_catalog,
2982    )
2983
2984    # Validate defaults that have macros are rendered to boolean
2985    for boolean in {"optimize_query", "allow_partials", "enabled"}:
2986        var = rendered_defaults.get(boolean)
2987        if var is not None and not isinstance(var, (exp.Boolean, bool)):
2988            raise ConfigError(f"Expected boolean for '{var}', got '{type(var)}' instead")
2989
2990    # Validate the 'interval_unit' if present is an Interval Unit
2991    var = rendered_defaults.get("interval_unit")
2992    if isinstance(var, str):
2993        try:
2994            rendered_defaults["interval_unit"] = IntervalUnit(var)
2995        except ValueError as e:
2996            raise ConfigError(f"Invalid interval unit: {var}") from e
2997
2998    return rendered_defaults
2999
3000
3001def parse_defaults_properties(
3002    defaults: t.Dict[str, t.Any], dialect: DialectType
3003) -> t.Dict[str, t.Any]:
3004    for prop in PROPERTIES:
3005        default_properties = defaults.get(prop)
3006        for key, value in (default_properties or {}).items():
3007            if isinstance(key, str) and d.SQLMESH_MACRO_PREFIX in str(value):
3008                defaults[prop][key] = exp.maybe_parse(value, dialect=dialect)
3009
3010    return defaults
3011
3012
3013def render_expression(
3014    expression: exp.Expr,
3015    module_path: Path,
3016    path: t.Optional[Path],
3017    jinja_macros: t.Optional[JinjaMacroRegistry] = None,
3018    macros: t.Optional[MacroRegistry] = None,
3019    dialect: DialectType = None,
3020    variables: t.Optional[t.Dict[str, t.Any]] = None,
3021    default_catalog: t.Optional[str] = None,
3022    blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None,
3023) -> t.Optional[t.List[exp.Expr]]:
3024    meta_python_env = make_python_env(
3025        expressions=expression,
3026        jinja_macro_references=None,
3027        module_path=module_path,
3028        macros=macros or macro.get_registry(),
3029        variables=variables,
3030        path=path,
3031        blueprint_variables=blueprint_variables,
3032    )
3033    return ExpressionRenderer(
3034        expression,
3035        dialect,
3036        [],
3037        path=path,
3038        jinja_macro_registry=jinja_macros,
3039        python_env=meta_python_env,
3040        default_catalog=default_catalog,
3041        quote_identifiers=False,
3042        normalize_identifiers=False,
3043    ).render()
3044
3045
3046META_FIELD_CONVERTER: t.Dict[str, t.Callable] = {
3047    "start": lambda value: exp.Literal.string(value),
3048    "cron": lambda value: exp.Literal.string(value),
3049    "cron_tz": lambda value: exp.Literal.string(value),
3050    "partitioned_by_": _single_expr_or_tuple,
3051    "clustered_by": _single_expr_or_tuple,
3052    "depends_on_": lambda value: exp.Tuple(expressions=sorted(value)),
3053    "pre": _list_of_calls_to_exp,
3054    "post": _list_of_calls_to_exp,
3055    "audits": _list_of_calls_to_exp,
3056    "columns_to_types_": lambda value: exp.Schema(
3057        expressions=[exp.ColumnDef(this=exp.to_column(c), kind=t) for c, t in value.items()]
3058    ),
3059    "column_descriptions_": lambda value: exp.Schema(
3060        expressions=[exp.to_column(c).eq(d) for c, d in value.items()]
3061    ),
3062    "tags": single_value_or_tuple,
3063    "grains": _refs_to_sql,
3064    "references": _refs_to_sql,
3065    "physical_properties_": lambda value: value,
3066    "virtual_properties_": lambda value: value,
3067    "session_properties_": lambda value: value,
3068    "allow_partials": exp.convert,
3069    "signals": lambda values: exp.tuple_(
3070        *(
3071            exp.func(
3072                name, *(exp.PropertyEQ(this=exp.var(k), expression=v) for k, v in args.items())
3073            )
3074            if name
3075            else exp.Tuple(expressions=[exp.var(k).eq(v) for k, v in args.items()])
3076            for name, args in values
3077        )
3078    ),
3079    "formatting": str,
3080    "optimize_query": str,
3081    "virtual_environment_mode": lambda value: exp.Literal.string(value.value),
3082    "dbt_node_info_": lambda value: value.to_expression(),
3083    "grants_": lambda value: value,
3084    "grants_target_layer": lambda value: exp.Literal.string(value.value),
3085}
3086
3087
3088def get_model_name(path: Path) -> str:
3089    path_parts = list(path.parts[path.parts.index("models") + 1 : -1]) + [path.stem]
3090    return ".".join(path_parts[-3:])
3091
3092
3093# function applied to time column when automatically used for partitioning in INCREMENTAL_BY_TIME_RANGE models
3094def clickhouse_partition_func(
3095    column: exp.Expr, columns_to_types: t.Optional[t.Dict[str, exp.DataType]]
3096) -> exp.Expr:
3097    # `toMonday()` function accepts a Date or DateTime type column
3098
3099    col_type = (columns_to_types and columns_to_types.get(column.name)) or exp.DataType.build(
3100        "UNKNOWN"
3101    )
3102    col_type_is_conformable = col_type.is_type(
3103        exp.DataType.Type.DATE,
3104        exp.DataType.Type.DATE32,
3105        exp.DataType.Type.DATETIME,
3106        exp.DataType.Type.DATETIME64,
3107    )
3108
3109    #  if input column is already a conformable type, just pass the column
3110    if col_type_is_conformable:
3111        return exp.func("toMonday", column, dialect="clickhouse")
3112
3113    # if input column type is not known, cast input to DateTime64
3114    if col_type.is_type(exp.DataType.Type.UNKNOWN):
3115        return exp.func(
3116            "toMonday",
3117            exp.cast(column, exp.DataType.build("DateTime64(9, 'UTC')", dialect="clickhouse")),
3118            dialect="clickhouse",
3119        )
3120
3121    # if input column type is known but not conformable, cast input to DateTime64 and cast output back to original type
3122    return exp.cast(
3123        exp.func(
3124            "toMonday",
3125            exp.cast(column, exp.DataType.build("DateTime64(9, 'UTC')", dialect="clickhouse")),
3126            dialect="clickhouse",
3127        ),
3128        col_type,
3129    )
3130
3131
3132TIME_COL_PARTITION_FUNC = {"clickhouse": clickhouse_partition_func}
logger = <Logger sqlmesh.core.model.definition (WARNING)>
PROPERTIES = {'session_properties', 'physical_properties', 'virtual_properties'}
RUNTIME_RENDERED_MODEL_FIELDS = {'signals', 'merge_filter', 'virtual_properties', 'session_properties', 'audits', 'physical_properties'}
CRON_SHORTCUTS = {'@hourly', '@weekly', '@midnight', '@daily', '@monthly', '@yearly', '@annually'}
class SqlModel(_Model):
1355class SqlModel(_Model):
1356    """The model definition which relies on a SQL query to fetch the data.
1357
1358    Args:
1359        query: The main query representing the model.
1360        pre_statements: The list of SQL statements that precede the model's query.
1361        post_statements: The list of SQL statements that follow after the model's query.
1362        on_virtual_update: The list of SQL statements to be executed after the virtual update.
1363    """
1364
1365    query_: ParsableSql = Field(alias="query")
1366    source_type: t.Literal["sql"] = "sql"
1367
1368    _columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None
1369
1370    def __getstate__(self) -> t.Dict[t.Any, t.Any]:
1371        state = super().__getstate__()
1372        state["__dict__"] = state["__dict__"].copy()
1373        # query renderer is very expensive to serialize
1374        state["__dict__"].pop("_query_renderer", None)
1375        state["__dict__"].pop("column_descriptions", None)
1376        private = state[PRIVATE_FIELDS]
1377        private["_columns_to_types"] = None
1378        return state
1379
1380    def copy(self, **kwargs: t.Any) -> Self:
1381        model = super().copy(**kwargs)
1382        model.__dict__.pop("_query_renderer", None)
1383        model.__dict__.pop("column_descriptions", None)
1384        model._columns_to_types = None
1385        if kwargs.get("update", {}).keys() & {"depends_on_", "query"}:
1386            model._full_depends_on = None
1387        return model
1388
1389    @property
1390    def query(self) -> t.Union[exp.Query, d.JinjaQuery, d.MacroFunc]:
1391        parsed_query = self.query_.parse(self.dialect)
1392        return t.cast(t.Union[exp.Query, d.JinjaQuery, d.MacroFunc], parsed_query)
1393
1394    def render_query(
1395        self,
1396        *,
1397        start: t.Optional[TimeLike] = None,
1398        end: t.Optional[TimeLike] = None,
1399        execution_time: t.Optional[TimeLike] = None,
1400        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
1401        table_mapping: t.Optional[t.Dict[str, str]] = None,
1402        expand: t.Iterable[str] = tuple(),
1403        deployability_index: t.Optional[DeployabilityIndex] = None,
1404        engine_adapter: t.Optional[EngineAdapter] = None,
1405        **kwargs: t.Any,
1406    ) -> t.Optional[exp.Query]:
1407        query = self._query_renderer.render(
1408            start=start,
1409            end=end,
1410            execution_time=execution_time,
1411            snapshots=snapshots,
1412            table_mapping=table_mapping,
1413            expand=expand,
1414            deployability_index=deployability_index,
1415            engine_adapter=engine_adapter,
1416            **kwargs,
1417        )
1418
1419        return query
1420
1421    def render_definition(
1422        self,
1423        include_python: bool = True,
1424        include_defaults: bool = False,
1425        render_query: bool = False,
1426    ) -> t.List[exp.Expr]:
1427        result: t.List[exp.Expr] = super().render_definition(
1428            include_python=include_python, include_defaults=include_defaults
1429        )
1430
1431        if render_query:
1432            result.extend(self.render_pre_statements())
1433            result.append(self.render_query() or self.query)
1434            result.extend(self.render_post_statements())
1435            if virtual_update := self.render_on_virtual_update():
1436                result.append(d.VirtualUpdateStatement(expressions=virtual_update))
1437        else:
1438            result.extend(self.pre_statements)
1439            result.append(self.query)
1440            result.extend(self.post_statements)
1441            if self.on_virtual_update:
1442                result.append(d.VirtualUpdateStatement(expressions=self.on_virtual_update))
1443
1444        return result
1445
1446    @property
1447    def is_sql(self) -> bool:
1448        return True
1449
1450    @property
1451    def columns_to_types(self) -> t.Optional[t.Dict[str, exp.DataType]]:
1452        if self.columns_to_types_ is not None:
1453            self._columns_to_types = self.columns_to_types_
1454        elif self._columns_to_types is None:
1455            try:
1456                query = self._query_renderer.render()
1457            except Exception:
1458                logger.exception("Failed to render query for model %s", self.fqn)
1459                return None
1460
1461            if query is None:
1462                return None
1463
1464            unknown = exp.DataType.build("unknown")
1465
1466            columns_to_types = {}
1467            for select in query.selects:
1468                output_name = select.output_name
1469
1470                # If model validation is disabled, we cannot assume that projections
1471                # will have inferrable output names or even that they will be unique
1472                if not output_name or output_name in columns_to_types:
1473                    return None
1474
1475                # copy data type because it is used in the engine to build CTAS and other queries
1476                # this can change the parent which will mess up the diffing algo
1477                columns_to_types[output_name] = (select.type or unknown).copy()
1478
1479            self._columns_to_types = columns_to_types
1480
1481        if "*" in self._columns_to_types:
1482            return None
1483
1484        return {**self._columns_to_types, **self.managed_columns}
1485
1486    @cached_property
1487    def column_descriptions(self) -> t.Dict[str, str]:
1488        if self.column_descriptions_ is not None:
1489            return self.column_descriptions_
1490
1491        query = self.render_query()
1492        if query is None:
1493            return {}
1494
1495        return {
1496            select.alias_or_name: select.comments[-1].strip()
1497            for select in query.selects
1498            if select.comments
1499        }
1500
1501    def set_mapping_schema(self, schema: t.Dict) -> None:
1502        super().set_mapping_schema(schema)
1503        self._on_mapping_schema_set()
1504
1505    def update_schema(self, schema: MappingSchema) -> None:
1506        super().update_schema(schema)
1507        self._on_mapping_schema_set()
1508
1509    def _on_mapping_schema_set(self) -> None:
1510        self._columns_to_types = None
1511        self._query_renderer.update_schema(self.mapping_schema)
1512
1513    def validate_definition(self) -> None:
1514        query = self._query_renderer.render()
1515        if query is None:
1516            if self.depends_on_ is None:
1517                raise_config_error(
1518                    "Dependencies must be provided explicitly for models that can be rendered only at runtime",
1519                    self._path,
1520                )
1521            return
1522
1523        if not isinstance(query, exp.Query):
1524            raise_config_error("Missing SELECT query in the model definition", self._path)
1525
1526        projection_list = query.selects
1527        if not projection_list:
1528            raise_config_error("Query missing select statements", self._path)
1529
1530        if self.depends_on_self and not self.annotated:
1531            raise_config_error(
1532                "Self-referencing models require inferrable column types. There are three options available to mitigate this issue: add explicit types to all projections in the outermost SELECT statement, leverage external models (https://sqlmesh.readthedocs.io/en/stable/concepts/models/external_models/), or use the `columns` model attribute (https://sqlmesh.readthedocs.io/en/stable/concepts/models/overview/#columns).",
1533                self._path,
1534            )
1535
1536        super().validate_definition()
1537
1538    def is_breaking_change(self, previous: Model) -> t.Optional[bool]:
1539        if not isinstance(previous, SqlModel):
1540            return None
1541
1542        if self.lookback != previous.lookback:
1543            return None
1544
1545        try:
1546            # the previous model which comes from disk could be unrenderable
1547            previous_query = previous.render_query()
1548        except Exception:
1549            previous_query = None
1550        this_query = self.render_query()
1551
1552        if previous_query is None or this_query is None:
1553            # Can't determine if there's a breaking change if we can't render the query.
1554            return None
1555
1556        if previous_query is this_query:
1557            edits = []
1558        else:
1559            edits = diff(
1560                previous_query,
1561                this_query,
1562                matchings=[(previous_query, this_query)],
1563                delta_only=True,
1564                dialect=self.dialect if self.dialect == previous.dialect else None,
1565            )
1566        inserted_expressions = {e.expression for e in edits if isinstance(e, Insert)}
1567
1568        for edit in edits:
1569            if not isinstance(edit, Insert):
1570                return None
1571
1572            expr = edit.expression
1573            if isinstance(expr, exp.UDTF):
1574                # projection subqueries do not change cardinality, engines don't allow these to return
1575                # more than one row of data
1576                parent = expr.find_ancestor(exp.Subquery)
1577
1578                if not parent:
1579                    return None
1580
1581                expr = parent
1582
1583            if not _is_projection(expr) and expr.parent not in inserted_expressions:
1584                return None
1585
1586        return False
1587
1588    def is_metadata_only_change(self, previous: _Node) -> bool:
1589        if self._is_metadata_only_change_cache.get(id(previous), None) is not None:
1590            return self._is_metadata_only_change_cache[id(previous)]
1591
1592        if not super().is_metadata_only_change(previous):
1593            return False
1594
1595        if not isinstance(previous, SqlModel):
1596            self._is_metadata_only_change_cache[id(previous)] = False
1597            return False
1598
1599        this_rendered_query = self.render_query() or self.query
1600        previous_rendered_query = previous.render_query() or previous.query
1601        is_metadata_change = this_rendered_query == previous_rendered_query
1602
1603        self._is_metadata_only_change_cache[id(previous)] = is_metadata_change
1604        return is_metadata_change
1605
1606    @cached_property
1607    def _query_renderer(self) -> QueryRenderer:
1608        no_quote_identifiers = self.kind.is_view and self.dialect in ("trino", "spark")
1609        return QueryRenderer(
1610            self.query,
1611            self.dialect,
1612            self.macro_definitions,
1613            schema=self.mapping_schema,
1614            path=self._path,
1615            jinja_macro_registry=self.jinja_macros,
1616            python_env=self.python_env,
1617            only_execution_time=self.kind.only_execution_time,
1618            default_catalog=self.default_catalog,
1619            quote_identifiers=not no_quote_identifiers,
1620            optimize_query=self.optimize_query,
1621            model=self,
1622        )
1623
1624    @property
1625    def _data_hash_values_no_sql(self) -> t.List[str]:
1626        return [
1627            *super()._data_hash_values_no_sql,
1628            *self.jinja_macros.data_hash_values,
1629        ]
1630
1631    @property
1632    def _data_hash_values_sql(self) -> t.List[str]:
1633        return [
1634            *super()._data_hash_values_sql,
1635            self.query_.sql,
1636        ]
1637
1638    @property
1639    def _additional_metadata(self) -> t.List[str]:
1640        return [*super()._additional_metadata, self.query_.sql]
1641
1642    @property
1643    def violated_rules_for_query(self) -> t.Dict[type[Rule], t.Any]:
1644        self.render_query()
1645        return self._query_renderer._violated_rules

The model definition which relies on a SQL query to fetch the data.

Arguments:
  • query: The main query representing the model.
  • pre_statements: The list of SQL statements that precede the model's query.
  • post_statements: The list of SQL statements that follow after the model's query.
  • on_virtual_update: The list of SQL statements to be executed after the virtual update.
source_type: Literal['sql']
def copy(self, **kwargs: Any) -> typing_extensions.Self:
1380    def copy(self, **kwargs: t.Any) -> Self:
1381        model = super().copy(**kwargs)
1382        model.__dict__.pop("_query_renderer", None)
1383        model.__dict__.pop("column_descriptions", None)
1384        model._columns_to_types = None
1385        if kwargs.get("update", {}).keys() & {"depends_on_", "query"}:
1386            model._full_depends_on = None
1387        return model

Returns a copy of the model.

!!! warning "Deprecated" This method is now deprecated; use model_copy instead.

If you need include or exclude, use:

python {test="skip" lint="skip"} data = self.model_dump(include=include, exclude=exclude, round_trip=True) data = {**data, **(update or {})} copied = self.model_validate(data)

Arguments:
  • include: Optional set or mapping specifying which fields to include in the copied model.
  • exclude: Optional set or mapping specifying which fields to exclude in the copied model.
  • update: Optional dictionary of field-value pairs to override field values in the copied model.
  • deep: If True, the values of fields that are Pydantic models will be deep-copied.
Returns:

A copy of the model with included, excluded and updated fields as specified.

query: Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]
1389    @property
1390    def query(self) -> t.Union[exp.Query, d.JinjaQuery, d.MacroFunc]:
1391        parsed_query = self.query_.parse(self.dialect)
1392        return t.cast(t.Union[exp.Query, d.JinjaQuery, d.MacroFunc], parsed_query)
def render_query( self, *, start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, snapshots: Optional[Dict[str, <MagicMock id='132726832066240'>]] = None, table_mapping: Optional[Dict[str, str]] = None, expand: Iterable[str] = (), deployability_index: Optional[<MagicMock id='132726831634784'>] = None, engine_adapter: Optional[sqlmesh.core.engine_adapter.base.EngineAdapter] = None, **kwargs: Any) -> Optional[sqlglot.expressions.query.Query]:
1394    def render_query(
1395        self,
1396        *,
1397        start: t.Optional[TimeLike] = None,
1398        end: t.Optional[TimeLike] = None,
1399        execution_time: t.Optional[TimeLike] = None,
1400        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
1401        table_mapping: t.Optional[t.Dict[str, str]] = None,
1402        expand: t.Iterable[str] = tuple(),
1403        deployability_index: t.Optional[DeployabilityIndex] = None,
1404        engine_adapter: t.Optional[EngineAdapter] = None,
1405        **kwargs: t.Any,
1406    ) -> t.Optional[exp.Query]:
1407        query = self._query_renderer.render(
1408            start=start,
1409            end=end,
1410            execution_time=execution_time,
1411            snapshots=snapshots,
1412            table_mapping=table_mapping,
1413            expand=expand,
1414            deployability_index=deployability_index,
1415            engine_adapter=engine_adapter,
1416            **kwargs,
1417        )
1418
1419        return query

Renders a model's query, expanding macros with provided kwargs, and optionally expanding referenced models.

Arguments:
  • start: The start datetime to render. Defaults to epoch start.
  • end: The end datetime to render. Defaults to epoch start.
  • execution_time: The date/time time reference to use for execution time.
  • snapshots: All upstream snapshots (by name) to use for expansion and mapping of physical locations.
  • table_mapping: Table mapping of physical locations. Takes precedence over snapshot mappings.
  • expand: Expand referenced models as subqueries. This is used to bypass backfills when running queries that depend on materialized tables. Model definitions are inlined and can thus be run end to end on the fly.
  • deployability_index: Determines snapshots that are deployable in the context of this render.
  • kwargs: Additional kwargs to pass to the renderer.
Returns:

The rendered expression.

def render_definition( self, include_python: bool = True, include_defaults: bool = False, render_query: bool = False) -> List[sqlglot.expressions.core.Expr]:
1421    def render_definition(
1422        self,
1423        include_python: bool = True,
1424        include_defaults: bool = False,
1425        render_query: bool = False,
1426    ) -> t.List[exp.Expr]:
1427        result: t.List[exp.Expr] = super().render_definition(
1428            include_python=include_python, include_defaults=include_defaults
1429        )
1430
1431        if render_query:
1432            result.extend(self.render_pre_statements())
1433            result.append(self.render_query() or self.query)
1434            result.extend(self.render_post_statements())
1435            if virtual_update := self.render_on_virtual_update():
1436                result.append(d.VirtualUpdateStatement(expressions=virtual_update))
1437        else:
1438            result.extend(self.pre_statements)
1439            result.append(self.query)
1440            result.extend(self.post_statements)
1441            if self.on_virtual_update:
1442                result.append(d.VirtualUpdateStatement(expressions=self.on_virtual_update))
1443
1444        return result

Returns the original list of sql expressions comprising the model definition.

Arguments:
  • include_python: Whether or not to include Python code in the rendered definition.
is_sql: bool
1446    @property
1447    def is_sql(self) -> bool:
1448        return True
columns_to_types: Optional[Dict[str, sqlglot.expressions.datatypes.DataType]]
1450    @property
1451    def columns_to_types(self) -> t.Optional[t.Dict[str, exp.DataType]]:
1452        if self.columns_to_types_ is not None:
1453            self._columns_to_types = self.columns_to_types_
1454        elif self._columns_to_types is None:
1455            try:
1456                query = self._query_renderer.render()
1457            except Exception:
1458                logger.exception("Failed to render query for model %s", self.fqn)
1459                return None
1460
1461            if query is None:
1462                return None
1463
1464            unknown = exp.DataType.build("unknown")
1465
1466            columns_to_types = {}
1467            for select in query.selects:
1468                output_name = select.output_name
1469
1470                # If model validation is disabled, we cannot assume that projections
1471                # will have inferrable output names or even that they will be unique
1472                if not output_name or output_name in columns_to_types:
1473                    return None
1474
1475                # copy data type because it is used in the engine to build CTAS and other queries
1476                # this can change the parent which will mess up the diffing algo
1477                columns_to_types[output_name] = (select.type or unknown).copy()
1478
1479            self._columns_to_types = columns_to_types
1480
1481        if "*" in self._columns_to_types:
1482            return None
1483
1484        return {**self._columns_to_types, **self.managed_columns}

Returns the mapping of column names to types of this model.

column_descriptions: Dict[str, str]
1486    @cached_property
1487    def column_descriptions(self) -> t.Dict[str, str]:
1488        if self.column_descriptions_ is not None:
1489            return self.column_descriptions_
1490
1491        query = self.render_query()
1492        if query is None:
1493            return {}
1494
1495        return {
1496            select.alias_or_name: select.comments[-1].strip()
1497            for select in query.selects
1498            if select.comments
1499        }

A dictionary of column names to annotation comments.

def set_mapping_schema(self, schema: Dict) -> None:
1501    def set_mapping_schema(self, schema: t.Dict) -> None:
1502        super().set_mapping_schema(schema)
1503        self._on_mapping_schema_set()
def update_schema(self, schema: sqlglot.schema.MappingSchema) -> None:
1505    def update_schema(self, schema: MappingSchema) -> None:
1506        super().update_schema(schema)
1507        self._on_mapping_schema_set()

Updates the schema for this model's dependencies based on the given mapping schema.

def validate_definition(self) -> None:
1513    def validate_definition(self) -> None:
1514        query = self._query_renderer.render()
1515        if query is None:
1516            if self.depends_on_ is None:
1517                raise_config_error(
1518                    "Dependencies must be provided explicitly for models that can be rendered only at runtime",
1519                    self._path,
1520                )
1521            return
1522
1523        if not isinstance(query, exp.Query):
1524            raise_config_error("Missing SELECT query in the model definition", self._path)
1525
1526        projection_list = query.selects
1527        if not projection_list:
1528            raise_config_error("Query missing select statements", self._path)
1529
1530        if self.depends_on_self and not self.annotated:
1531            raise_config_error(
1532                "Self-referencing models require inferrable column types. There are three options available to mitigate this issue: add explicit types to all projections in the outermost SELECT statement, leverage external models (https://sqlmesh.readthedocs.io/en/stable/concepts/models/external_models/), or use the `columns` model attribute (https://sqlmesh.readthedocs.io/en/stable/concepts/models/overview/#columns).",
1533                self._path,
1534            )
1535
1536        super().validate_definition()

Validates the model's definition.

Raises:
  • ConfigError
def is_breaking_change( self, previous: Union[SqlModel, SeedModel, PythonModel, ExternalModel]) -> Optional[bool]:
1538    def is_breaking_change(self, previous: Model) -> t.Optional[bool]:
1539        if not isinstance(previous, SqlModel):
1540            return None
1541
1542        if self.lookback != previous.lookback:
1543            return None
1544
1545        try:
1546            # the previous model which comes from disk could be unrenderable
1547            previous_query = previous.render_query()
1548        except Exception:
1549            previous_query = None
1550        this_query = self.render_query()
1551
1552        if previous_query is None or this_query is None:
1553            # Can't determine if there's a breaking change if we can't render the query.
1554            return None
1555
1556        if previous_query is this_query:
1557            edits = []
1558        else:
1559            edits = diff(
1560                previous_query,
1561                this_query,
1562                matchings=[(previous_query, this_query)],
1563                delta_only=True,
1564                dialect=self.dialect if self.dialect == previous.dialect else None,
1565            )
1566        inserted_expressions = {e.expression for e in edits if isinstance(e, Insert)}
1567
1568        for edit in edits:
1569            if not isinstance(edit, Insert):
1570                return None
1571
1572            expr = edit.expression
1573            if isinstance(expr, exp.UDTF):
1574                # projection subqueries do not change cardinality, engines don't allow these to return
1575                # more than one row of data
1576                parent = expr.find_ancestor(exp.Subquery)
1577
1578                if not parent:
1579                    return None
1580
1581                expr = parent
1582
1583            if not _is_projection(expr) and expr.parent not in inserted_expressions:
1584                return None
1585
1586        return False

Determines whether this model is a breaking change in relation to the previous model.

Arguments:
  • previous: The previous model to compare against.
Returns:

True if this model instance represents a breaking change, False if it's a non-breaking change and None if the nature of the change can't be determined.

def is_metadata_only_change(self, previous: sqlmesh.core.node._Node) -> bool:
1588    def is_metadata_only_change(self, previous: _Node) -> bool:
1589        if self._is_metadata_only_change_cache.get(id(previous), None) is not None:
1590            return self._is_metadata_only_change_cache[id(previous)]
1591
1592        if not super().is_metadata_only_change(previous):
1593            return False
1594
1595        if not isinstance(previous, SqlModel):
1596            self._is_metadata_only_change_cache[id(previous)] = False
1597            return False
1598
1599        this_rendered_query = self.render_query() or self.query
1600        previous_rendered_query = previous.render_query() or previous.query
1601        is_metadata_change = this_rendered_query == previous_rendered_query
1602
1603        self._is_metadata_only_change_cache[id(previous)] = is_metadata_change
1604        return is_metadata_change

Determines if this node is a metadata only change in relation to the previous node.

Arguments:
  • previous: The previous node to compare against.
Returns:

True if this node is a metadata only change, False otherwise.

violated_rules_for_query: Dict[type[<MagicMock id='132726837875840'>], Any]
1642    @property
1643    def violated_rules_for_query(self) -> t.Dict[type[Rule], t.Any]:
1644        self.render_query()
1645        return self._query_renderer._violated_rules
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': (), 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
358def init_private_attributes(self: BaseModel, context: Any, /) -> None:
359    """This function is meant to behave like a BaseModel method to initialise private attributes.
360
361    It takes context as an argument since that's what pydantic-core passes when calling it.
362
363    Args:
364        self: The BaseModel instance.
365        context: The context.
366    """
367    if getattr(self, '__pydantic_private__', None) is None:
368        pydantic_private = {}
369        for name, private_attr in self.__private_attributes__.items():
370            default = private_attr.get_default()
371            if default is not PydanticUndefined:
372                pydantic_private[name] = default
373        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Arguments:
  • self: The BaseModel instance.
  • context: The context.
Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
_Model
python_env
jinja_macros
audit_definitions
mapping_schema
extract_dependencies_from_query
pre_statements_
post_statements_
on_virtual_update_
render
render_query_or_raise
render_pre_statements
render_post_statements
render_on_virtual_update
render_audit_query
pre_statements
post_statements
on_virtual_update
macro_definitions
render_signals
render_signal_calls
render_merge_filter
render_physical_properties
render_virtual_properties
render_session_properties
ctas_query
text_diff
set_time_format
convert_to_time_column
depends_on
columns_to_types_or_raise
annotated
sorted_python_env
view_name
schema_name
physical_schema
is_python
is_seed
depends_on_self
forward_only
disable_restatement
auto_restatement_intervals
auto_restatement_cron
auto_restatement_croniter
wap_supported
data_hash
audit_metadata_hash
metadata_hash
is_model
grants_table_type
full_depends_on
partitioned_by
partition_interval_unit
audits_with_args
sqlmesh.core.model.meta.ModelMeta
dialect
name
kind
retention
table_format
storage_format
partitioned_by_
clustered_by
default_catalog
depends_on_
columns_to_types_
column_descriptions_
audits
grains
references
physical_schema_override
physical_properties_
virtual_properties_
session_properties_
allow_partials
signals
enabled
physical_version
gateway
optimize_query
ignored_rules_
formatting
virtual_environment_mode
grants_
grants_target_layer
ignored_rules_validator
session_properties_validator
time_column
unique_key
lookback
lookback_start
batch_size
batch_concurrency
physical_properties
virtual_properties
session_properties
custom_materialization_properties
grants
all_references
on
managed_columns
when_matched
merge_filter
catalog
fully_qualified_table
fqn
on_destructive_change
on_additive_change
ignored_rules
sqlmesh.core.node._Node
project
description
owner
start
end
cron
cron_tz
interval_unit_
tags
stamp
dbt_node_info_
interval_unit
is_data_change
croniter
cron_next
cron_prev
cron_floor
is_audit
dbt_node_info
sqlmesh.core.node.DbtInfoMixin
dbt_unique_id
dbt_fqn
sqlmesh.utils.pydantic.PydanticModel
dict
json
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class SeedModel(_Model):
1648class SeedModel(_Model):
1649    """The model definition which uses a pre-built static dataset to source the data from.
1650
1651    Args:
1652        seed: The content of a pre-built static dataset.
1653    """
1654
1655    kind: SeedKind
1656    seed: Seed
1657    column_hashes_: t.Optional[t.Dict[str, str]] = Field(default=None, alias="column_hashes")
1658    derived_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None
1659    is_hydrated: bool = True
1660    source_type: t.Literal["seed"] = "seed"
1661
1662    def __getstate__(self) -> t.Dict[t.Any, t.Any]:
1663        state = super().__getstate__()
1664        state["__dict__"] = state["__dict__"].copy()
1665        state["__dict__"].pop("_reader", None)
1666        return state
1667
1668    def copy(self, **kwargs: t.Any) -> Self:
1669        model = super().copy(**kwargs)
1670        model.__dict__.pop("_reader", None)
1671        return model
1672
1673    def render(
1674        self,
1675        *,
1676        context: ExecutionContext,
1677        start: t.Optional[TimeLike] = None,
1678        end: t.Optional[TimeLike] = None,
1679        execution_time: t.Optional[TimeLike] = None,
1680        **kwargs: t.Any,
1681    ) -> t.Iterator[QueryOrDF]:
1682        if not self.is_hydrated:
1683            return
1684        yield from self.render_seed()
1685
1686    def render_seed(self) -> t.Iterator[QueryOrDF]:
1687        import numpy as np
1688
1689        self._ensure_hydrated()
1690
1691        date_columns = []
1692        datetime_columns = []
1693        bool_columns = []
1694        string_columns = []
1695
1696        columns_to_types = self.columns_to_types_ or {}
1697        column_names_to_check = set(columns_to_types)
1698        for name, tpe in columns_to_types.items():
1699            if tpe.this in (exp.DataType.Type.DATE, exp.DataType.Type.DATE32):
1700                date_columns.append(name)
1701            elif tpe.this in exp.DataType.TEMPORAL_TYPES:
1702                datetime_columns.append(name)
1703            elif tpe.is_type("boolean"):
1704                bool_columns.append(name)
1705            elif tpe.this in exp.DataType.TEXT_TYPES:
1706                string_columns.append(name)
1707
1708        for df in self._reader.read(batch_size=self.kind.batch_size):
1709            rename_dict = {}
1710            for column in columns_to_types:
1711                if column not in df:
1712                    normalized_name = normalize_identifiers(column, dialect=self.dialect).name
1713                    if normalized_name in df:
1714                        rename_dict[normalized_name] = column
1715            if rename_dict:
1716                df.rename(columns=rename_dict, inplace=True)
1717                # These names have already been checked
1718                column_names_to_check -= set(rename_dict)
1719
1720            missing_columns = column_names_to_check - set(df.columns)
1721            if missing_columns:
1722                raise_config_error(
1723                    f"Seed model '{self.name}' has missing columns: {missing_columns}", self._path
1724                )
1725
1726            # convert all date/time types to native pandas timestamp
1727            for column in [*date_columns, *datetime_columns]:
1728                import pandas as pd
1729
1730                df[column] = pd.to_datetime(df[column], infer_datetime_format=True, errors="ignore")  # type: ignore
1731
1732            # extract datetime.date from pandas timestamp for DATE columns
1733            for column in date_columns:
1734                try:
1735                    df[column] = df[column].dt.date
1736                except Exception as ex:
1737                    logger.error(
1738                        "Failed to convert column '%s' to date in seed model '%s': %s",
1739                        column,
1740                        self.name,
1741                        ex,
1742                    )
1743
1744            for column in bool_columns:
1745                df[column] = df[column].apply(lambda i: str_to_bool(str(i)))
1746
1747            df.loc[:, string_columns] = df[string_columns].mask(
1748                cond=lambda x: x.notna(),  # type: ignore
1749                other=df[string_columns].astype(str),  # type: ignore
1750            )
1751            yield df.replace({np.nan: None})
1752
1753    @property
1754    def columns_to_types(self) -> t.Optional[t.Dict[str, exp.DataType]]:
1755        if self.columns_to_types_ is not None:
1756            return self.columns_to_types_
1757        if self.derived_columns_to_types is not None:
1758            return self.derived_columns_to_types
1759        if self.is_hydrated:
1760            return self._reader.columns_to_types
1761        return None
1762
1763    @property
1764    def column_hashes(self) -> t.Dict[str, str]:
1765        if self.column_hashes_ is not None:
1766            return self.column_hashes_
1767        self._ensure_hydrated()
1768        return self._reader.column_hashes
1769
1770    @property
1771    def is_seed(self) -> bool:
1772        return True
1773
1774    @property
1775    def seed_path(self) -> Path:
1776        seed_path = Path(self.kind.path)
1777        if not seed_path.is_absolute():
1778            if self._path is None:
1779                raise SQLMeshError(f"Seed model '{self.name}' has no path")
1780            return self._path.parent / seed_path
1781        return seed_path
1782
1783    @property
1784    def depends_on(self) -> t.Set[str]:
1785        return (self.depends_on_ or set()) - {self.fqn}
1786
1787    @property
1788    def depends_on_self(self) -> bool:
1789        return False
1790
1791    @property
1792    def batch_size(self) -> t.Optional[int]:
1793        # Unlike other model kinds, the batch size provided in the SEED kind represents the
1794        # maximum number of rows to insert in a single batch.
1795        # We should never batch intervals for seed models.
1796        return None
1797
1798    def to_dehydrated(self) -> SeedModel:
1799        """Creates a dehydrated copy of this model.
1800
1801        The dehydrated seed model will not contain the seed content, but will contain
1802        the column hashes. This is useful for comparing two seed models without
1803        having to read the seed content from disk.
1804
1805        Returns:
1806            A dehydrated copy of this model.
1807        """
1808        if not self.is_hydrated:
1809            return self
1810
1811        return self.copy(
1812            update={
1813                "seed": Seed(content=""),
1814                "is_hydrated": False,
1815                "column_hashes_": self.column_hashes,
1816                "derived_columns_to_types": self.columns_to_types
1817                if self.columns_to_types_ is None
1818                else None,
1819            }
1820        )
1821
1822    def to_hydrated(self, content: str) -> SeedModel:
1823        """Creates a hydrated copy of this model with the given seed content.
1824
1825        Returns:
1826            A hydrated copy of this model.
1827        """
1828        if self.is_hydrated:
1829            return self
1830
1831        return self.copy(
1832            update={
1833                "seed": Seed(content=content),
1834                "is_hydrated": True,
1835                "column_hashes_": None,
1836            },
1837        )
1838
1839    def is_breaking_change(self, previous: Model) -> t.Optional[bool]:
1840        if not isinstance(previous, SeedModel):
1841            return None
1842
1843        new_columns = set(self.column_hashes)
1844        old_columns = set(previous.column_hashes)
1845
1846        if not new_columns.issuperset(old_columns):
1847            return None
1848
1849        for col in old_columns:
1850            if self.column_hashes[col] != previous.column_hashes[col]:
1851                return None
1852
1853        return False
1854
1855    def _ensure_hydrated(self) -> None:
1856        if not self.is_hydrated:
1857            raise SQLMeshError(f"Seed model '{self.name}' is not hydrated.")
1858
1859    @cached_property
1860    def _reader(self) -> CsvSeedReader:
1861        return self.seed.reader(dialect=self.dialect, settings=self.kind.csv_settings)
1862
1863    @property
1864    def _data_hash_values_no_sql(self) -> t.List[str]:
1865        data = super()._data_hash_values_no_sql
1866        for column_name, column_hash in self.column_hashes.items():
1867            data.append(column_name)
1868            data.append(column_hash)
1869
1870        # Include grants in data hash for seed models to force recreation on grant changes
1871        # since seed models don't support migration
1872        data.append(json.dumps(self.grants, sort_keys=True) if self.grants else "")
1873        data.append(self.grants_target_layer)
1874
1875        return data

The model definition which uses a pre-built static dataset to source the data from.

Arguments:
  • seed: The content of a pre-built static dataset.
column_hashes_: Optional[Dict[str, str]]
derived_columns_to_types: Optional[Dict[str, sqlglot.expressions.datatypes.DataType]]
is_hydrated: bool
source_type: Literal['seed']
def copy(self, **kwargs: Any) -> typing_extensions.Self:
1668    def copy(self, **kwargs: t.Any) -> Self:
1669        model = super().copy(**kwargs)
1670        model.__dict__.pop("_reader", None)
1671        return model

Returns a copy of the model.

!!! warning "Deprecated" This method is now deprecated; use model_copy instead.

If you need include or exclude, use:

python {test="skip" lint="skip"} data = self.model_dump(include=include, exclude=exclude, round_trip=True) data = {**data, **(update or {})} copied = self.model_validate(data)

Arguments:
  • include: Optional set or mapping specifying which fields to include in the copied model.
  • exclude: Optional set or mapping specifying which fields to exclude in the copied model.
  • update: Optional dictionary of field-value pairs to override field values in the copied model.
  • deep: If True, the values of fields that are Pydantic models will be deep-copied.
Returns:

A copy of the model with included, excluded and updated fields as specified.

def render( self, *, context: sqlmesh.core.context.ExecutionContext, start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, **kwargs: Any) -> Iterator[<MagicMock id='132726834172096'>]:
1673    def render(
1674        self,
1675        *,
1676        context: ExecutionContext,
1677        start: t.Optional[TimeLike] = None,
1678        end: t.Optional[TimeLike] = None,
1679        execution_time: t.Optional[TimeLike] = None,
1680        **kwargs: t.Any,
1681    ) -> t.Iterator[QueryOrDF]:
1682        if not self.is_hydrated:
1683            return
1684        yield from self.render_seed()

Renders the content of this model in a form of either a SELECT query, executing which the data for this model can be fetched, or a dataframe object which contains the data itself.

The type of the returned object (query or dataframe) depends on whether the model was sourced from a SQL query, a Python script or a pre-built dataset (seed).

Arguments:
  • context: The execution context used for fetching data.
  • start: The start date/time of the run.
  • end: The end date/time of the run.
  • execution_time: The date/time time reference to use for execution time.
Returns:

A generator which yields either a query object or one of the supported dataframe objects.

def render_seed(self) -> Iterator[<MagicMock id='132726834172096'>]:
1686    def render_seed(self) -> t.Iterator[QueryOrDF]:
1687        import numpy as np
1688
1689        self._ensure_hydrated()
1690
1691        date_columns = []
1692        datetime_columns = []
1693        bool_columns = []
1694        string_columns = []
1695
1696        columns_to_types = self.columns_to_types_ or {}
1697        column_names_to_check = set(columns_to_types)
1698        for name, tpe in columns_to_types.items():
1699            if tpe.this in (exp.DataType.Type.DATE, exp.DataType.Type.DATE32):
1700                date_columns.append(name)
1701            elif tpe.this in exp.DataType.TEMPORAL_TYPES:
1702                datetime_columns.append(name)
1703            elif tpe.is_type("boolean"):
1704                bool_columns.append(name)
1705            elif tpe.this in exp.DataType.TEXT_TYPES:
1706                string_columns.append(name)
1707
1708        for df in self._reader.read(batch_size=self.kind.batch_size):
1709            rename_dict = {}
1710            for column in columns_to_types:
1711                if column not in df:
1712                    normalized_name = normalize_identifiers(column, dialect=self.dialect).name
1713                    if normalized_name in df:
1714                        rename_dict[normalized_name] = column
1715            if rename_dict:
1716                df.rename(columns=rename_dict, inplace=True)
1717                # These names have already been checked
1718                column_names_to_check -= set(rename_dict)
1719
1720            missing_columns = column_names_to_check - set(df.columns)
1721            if missing_columns:
1722                raise_config_error(
1723                    f"Seed model '{self.name}' has missing columns: {missing_columns}", self._path
1724                )
1725
1726            # convert all date/time types to native pandas timestamp
1727            for column in [*date_columns, *datetime_columns]:
1728                import pandas as pd
1729
1730                df[column] = pd.to_datetime(df[column], infer_datetime_format=True, errors="ignore")  # type: ignore
1731
1732            # extract datetime.date from pandas timestamp for DATE columns
1733            for column in date_columns:
1734                try:
1735                    df[column] = df[column].dt.date
1736                except Exception as ex:
1737                    logger.error(
1738                        "Failed to convert column '%s' to date in seed model '%s': %s",
1739                        column,
1740                        self.name,
1741                        ex,
1742                    )
1743
1744            for column in bool_columns:
1745                df[column] = df[column].apply(lambda i: str_to_bool(str(i)))
1746
1747            df.loc[:, string_columns] = df[string_columns].mask(
1748                cond=lambda x: x.notna(),  # type: ignore
1749                other=df[string_columns].astype(str),  # type: ignore
1750            )
1751            yield df.replace({np.nan: None})
columns_to_types: Optional[Dict[str, sqlglot.expressions.datatypes.DataType]]
1753    @property
1754    def columns_to_types(self) -> t.Optional[t.Dict[str, exp.DataType]]:
1755        if self.columns_to_types_ is not None:
1756            return self.columns_to_types_
1757        if self.derived_columns_to_types is not None:
1758            return self.derived_columns_to_types
1759        if self.is_hydrated:
1760            return self._reader.columns_to_types
1761        return None

Returns the mapping of column names to types of this model.

column_hashes: Dict[str, str]
1763    @property
1764    def column_hashes(self) -> t.Dict[str, str]:
1765        if self.column_hashes_ is not None:
1766            return self.column_hashes_
1767        self._ensure_hydrated()
1768        return self._reader.column_hashes
is_seed: bool
1770    @property
1771    def is_seed(self) -> bool:
1772        return True
seed_path: pathlib.Path
1774    @property
1775    def seed_path(self) -> Path:
1776        seed_path = Path(self.kind.path)
1777        if not seed_path.is_absolute():
1778            if self._path is None:
1779                raise SQLMeshError(f"Seed model '{self.name}' has no path")
1780            return self._path.parent / seed_path
1781        return seed_path
depends_on: Set[str]
1783    @property
1784    def depends_on(self) -> t.Set[str]:
1785        return (self.depends_on_ or set()) - {self.fqn}

All of the upstream dependencies referenced in the model's query, excluding self references.

Returns:

A list of all the upstream table names.

depends_on_self: bool
1787    @property
1788    def depends_on_self(self) -> bool:
1789        return False
batch_size: Optional[int]
1791    @property
1792    def batch_size(self) -> t.Optional[int]:
1793        # Unlike other model kinds, the batch size provided in the SEED kind represents the
1794        # maximum number of rows to insert in a single batch.
1795        # We should never batch intervals for seed models.
1796        return None

The maximal number of units in a single task for a backfill.

def to_dehydrated(self) -> SeedModel:
1798    def to_dehydrated(self) -> SeedModel:
1799        """Creates a dehydrated copy of this model.
1800
1801        The dehydrated seed model will not contain the seed content, but will contain
1802        the column hashes. This is useful for comparing two seed models without
1803        having to read the seed content from disk.
1804
1805        Returns:
1806            A dehydrated copy of this model.
1807        """
1808        if not self.is_hydrated:
1809            return self
1810
1811        return self.copy(
1812            update={
1813                "seed": Seed(content=""),
1814                "is_hydrated": False,
1815                "column_hashes_": self.column_hashes,
1816                "derived_columns_to_types": self.columns_to_types
1817                if self.columns_to_types_ is None
1818                else None,
1819            }
1820        )

Creates a dehydrated copy of this model.

The dehydrated seed model will not contain the seed content, but will contain the column hashes. This is useful for comparing two seed models without having to read the seed content from disk.

Returns:

A dehydrated copy of this model.

def to_hydrated(self, content: str) -> SeedModel:
1822    def to_hydrated(self, content: str) -> SeedModel:
1823        """Creates a hydrated copy of this model with the given seed content.
1824
1825        Returns:
1826            A hydrated copy of this model.
1827        """
1828        if self.is_hydrated:
1829            return self
1830
1831        return self.copy(
1832            update={
1833                "seed": Seed(content=content),
1834                "is_hydrated": True,
1835                "column_hashes_": None,
1836            },
1837        )

Creates a hydrated copy of this model with the given seed content.

Returns:

A hydrated copy of this model.

def is_breaking_change( self, previous: Union[SqlModel, SeedModel, PythonModel, ExternalModel]) -> Optional[bool]:
1839    def is_breaking_change(self, previous: Model) -> t.Optional[bool]:
1840        if not isinstance(previous, SeedModel):
1841            return None
1842
1843        new_columns = set(self.column_hashes)
1844        old_columns = set(previous.column_hashes)
1845
1846        if not new_columns.issuperset(old_columns):
1847            return None
1848
1849        for col in old_columns:
1850            if self.column_hashes[col] != previous.column_hashes[col]:
1851                return None
1852
1853        return False

Determines whether this model is a breaking change in relation to the previous model.

Arguments:
  • previous: The previous model to compare against.
Returns:

True if this model instance represents a breaking change, False if it's a non-breaking change and None if the nature of the change can't be determined.

model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': (), 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
358def init_private_attributes(self: BaseModel, context: Any, /) -> None:
359    """This function is meant to behave like a BaseModel method to initialise private attributes.
360
361    It takes context as an argument since that's what pydantic-core passes when calling it.
362
363    Args:
364        self: The BaseModel instance.
365        context: The context.
366    """
367    if getattr(self, '__pydantic_private__', None) is None:
368        pydantic_private = {}
369        for name, private_attr in self.__private_attributes__.items():
370            default = private_attr.get_default()
371            if default is not PydanticUndefined:
372                pydantic_private[name] = default
373        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Arguments:
  • self: The BaseModel instance.
  • context: The context.
Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
_Model
python_env
jinja_macros
audit_definitions
mapping_schema
extract_dependencies_from_query
pre_statements_
post_statements_
on_virtual_update_
render_definition
render_query
render_query_or_raise
render_pre_statements
render_post_statements
render_on_virtual_update
render_audit_query
pre_statements
post_statements
on_virtual_update
macro_definitions
render_signals
render_signal_calls
render_merge_filter
render_physical_properties
render_virtual_properties
render_session_properties
ctas_query
text_diff
set_time_format
convert_to_time_column
set_mapping_schema
update_schema
columns_to_types_or_raise
annotated
sorted_python_env
view_name
schema_name
physical_schema
is_sql
is_python
forward_only
disable_restatement
auto_restatement_intervals
auto_restatement_cron
auto_restatement_croniter
wap_supported
validate_definition
is_metadata_only_change
data_hash
audit_metadata_hash
metadata_hash
is_model
grants_table_type
full_depends_on
partitioned_by
partition_interval_unit
audits_with_args
violated_rules_for_query
sqlmesh.core.model.meta.ModelMeta
dialect
name
retention
table_format
storage_format
partitioned_by_
clustered_by
default_catalog
depends_on_
columns_to_types_
column_descriptions_
audits
grains
references
physical_schema_override
physical_properties_
virtual_properties_
session_properties_
allow_partials
signals
enabled
physical_version
gateway
optimize_query
ignored_rules_
formatting
virtual_environment_mode
grants_
grants_target_layer
ignored_rules_validator
session_properties_validator
time_column
unique_key
column_descriptions
lookback
lookback_start
batch_concurrency
physical_properties
virtual_properties
session_properties
custom_materialization_properties
grants
all_references
on
managed_columns
when_matched
merge_filter
catalog
fully_qualified_table
fqn
on_destructive_change
on_additive_change
ignored_rules
sqlmesh.core.node._Node
project
description
owner
start
end
cron
cron_tz
interval_unit_
tags
stamp
dbt_node_info_
interval_unit
is_data_change
croniter
cron_next
cron_prev
cron_floor
is_audit
dbt_node_info
sqlmesh.core.node.DbtInfoMixin
dbt_unique_id
dbt_fqn
sqlmesh.utils.pydantic.PydanticModel
dict
json
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class PythonModel(_Model):
1878class PythonModel(_Model):
1879    """The model definition which relies on a Python script to fetch the data.
1880
1881    Args:
1882        entrypoint: The name of a Python function which contains the data fetching / transformation logic.
1883    """
1884
1885    kind: ModelKind = FullKind()
1886    entrypoint: str
1887    source_type: t.Literal["python"] = "python"
1888
1889    def validate_definition(self) -> None:
1890        super().validate_definition()
1891
1892        if self.kind and not self.kind.supports_python_models:
1893            raise_config_error(
1894                f"Cannot create Python model '{self.name}' as the '{self.kind.name}' kind doesn't support Python models",
1895                self._path,
1896            )
1897
1898    def render(
1899        self,
1900        *,
1901        context: ExecutionContext,
1902        start: t.Optional[TimeLike] = None,
1903        end: t.Optional[TimeLike] = None,
1904        execution_time: t.Optional[TimeLike] = None,
1905        **kwargs: t.Any,
1906    ) -> t.Iterator[QueryOrDF]:
1907        env = prepare_env(self.python_env)
1908        start, end = make_inclusive(start or c.EPOCH, end or c.EPOCH, self.dialect)
1909        execution_time = to_datetime(execution_time or c.EPOCH)
1910
1911        variables = {
1912            **env.get(c.SQLMESH_VARS, {}),
1913            **env.get(c.SQLMESH_VARS_METADATA, {}),
1914            **kwargs.pop("variables", {}),
1915        }
1916        blueprint_variables = {
1917            k: d.parse_one(v.sql, dialect=self.dialect) if isinstance(v, SqlValue) else v
1918            for k, v in {
1919                **env.get(c.SQLMESH_BLUEPRINT_VARS, {}),
1920                **env.get(c.SQLMESH_BLUEPRINT_VARS_METADATA, {}),
1921            }.items()
1922        }
1923        try:
1924            kwargs = {
1925                **variables,
1926                **kwargs,
1927                "start": start,
1928                "end": end,
1929                "execution_time": execution_time,
1930                "latest": execution_time,  # TODO: Preserved for backward compatibility. Remove in 1.0.0.
1931            }
1932            df_or_iter = env[self.entrypoint](
1933                context=context.with_variables(variables, blueprint_variables=blueprint_variables),
1934                **kwargs,
1935            )
1936
1937            if not isinstance(df_or_iter, types.GeneratorType):
1938                df_or_iter = [df_or_iter]
1939
1940            for df in df_or_iter:
1941                yield df
1942        except Exception as e:
1943            raise PythonModelEvalError(format_evaluated_code_exception(e, self.python_env))
1944
1945    def render_definition(
1946        self,
1947        include_python: bool = True,
1948        include_defaults: bool = False,
1949        render_query: bool = False,
1950    ) -> t.List[exp.Expr]:
1951        # Ignore the provided value for the include_python flag, since the Pyhon model's
1952        # definition without Python code is meaningless.
1953        return super().render_definition(
1954            include_python=True, include_defaults=include_defaults, render_query=render_query
1955        )
1956
1957    @property
1958    def is_python(self) -> bool:
1959        return True
1960
1961    def is_breaking_change(self, previous: Model) -> t.Optional[bool]:
1962        return None
1963
1964    @property
1965    def _data_hash_values_no_sql(self) -> t.List[str]:
1966        data = super()._data_hash_values_no_sql
1967        data.append(self.entrypoint)
1968        return data

The model definition which relies on a Python script to fetch the data.

Arguments:
  • entrypoint: The name of a Python function which contains the data fetching / transformation logic.
entrypoint: str
source_type: Literal['python']
def validate_definition(self) -> None:
1889    def validate_definition(self) -> None:
1890        super().validate_definition()
1891
1892        if self.kind and not self.kind.supports_python_models:
1893            raise_config_error(
1894                f"Cannot create Python model '{self.name}' as the '{self.kind.name}' kind doesn't support Python models",
1895                self._path,
1896            )

Validates the model's definition.

Raises:
  • ConfigError
def render( self, *, context: sqlmesh.core.context.ExecutionContext, start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, **kwargs: Any) -> Iterator[<MagicMock id='132726834172096'>]:
1898    def render(
1899        self,
1900        *,
1901        context: ExecutionContext,
1902        start: t.Optional[TimeLike] = None,
1903        end: t.Optional[TimeLike] = None,
1904        execution_time: t.Optional[TimeLike] = None,
1905        **kwargs: t.Any,
1906    ) -> t.Iterator[QueryOrDF]:
1907        env = prepare_env(self.python_env)
1908        start, end = make_inclusive(start or c.EPOCH, end or c.EPOCH, self.dialect)
1909        execution_time = to_datetime(execution_time or c.EPOCH)
1910
1911        variables = {
1912            **env.get(c.SQLMESH_VARS, {}),
1913            **env.get(c.SQLMESH_VARS_METADATA, {}),
1914            **kwargs.pop("variables", {}),
1915        }
1916        blueprint_variables = {
1917            k: d.parse_one(v.sql, dialect=self.dialect) if isinstance(v, SqlValue) else v
1918            for k, v in {
1919                **env.get(c.SQLMESH_BLUEPRINT_VARS, {}),
1920                **env.get(c.SQLMESH_BLUEPRINT_VARS_METADATA, {}),
1921            }.items()
1922        }
1923        try:
1924            kwargs = {
1925                **variables,
1926                **kwargs,
1927                "start": start,
1928                "end": end,
1929                "execution_time": execution_time,
1930                "latest": execution_time,  # TODO: Preserved for backward compatibility. Remove in 1.0.0.
1931            }
1932            df_or_iter = env[self.entrypoint](
1933                context=context.with_variables(variables, blueprint_variables=blueprint_variables),
1934                **kwargs,
1935            )
1936
1937            if not isinstance(df_or_iter, types.GeneratorType):
1938                df_or_iter = [df_or_iter]
1939
1940            for df in df_or_iter:
1941                yield df
1942        except Exception as e:
1943            raise PythonModelEvalError(format_evaluated_code_exception(e, self.python_env))

Renders the content of this model in a form of either a SELECT query, executing which the data for this model can be fetched, or a dataframe object which contains the data itself.

The type of the returned object (query or dataframe) depends on whether the model was sourced from a SQL query, a Python script or a pre-built dataset (seed).

Arguments:
  • context: The execution context used for fetching data.
  • start: The start date/time of the run.
  • end: The end date/time of the run.
  • execution_time: The date/time time reference to use for execution time.
Returns:

A generator which yields either a query object or one of the supported dataframe objects.

def render_definition( self, include_python: bool = True, include_defaults: bool = False, render_query: bool = False) -> List[sqlglot.expressions.core.Expr]:
1945    def render_definition(
1946        self,
1947        include_python: bool = True,
1948        include_defaults: bool = False,
1949        render_query: bool = False,
1950    ) -> t.List[exp.Expr]:
1951        # Ignore the provided value for the include_python flag, since the Pyhon model's
1952        # definition without Python code is meaningless.
1953        return super().render_definition(
1954            include_python=True, include_defaults=include_defaults, render_query=render_query
1955        )

Returns the original list of sql expressions comprising the model definition.

Arguments:
  • include_python: Whether or not to include Python code in the rendered definition.
is_python: bool
1957    @property
1958    def is_python(self) -> bool:
1959        return True
def is_breaking_change( self, previous: Union[SqlModel, SeedModel, PythonModel, ExternalModel]) -> Optional[bool]:
1961    def is_breaking_change(self, previous: Model) -> t.Optional[bool]:
1962        return None

Determines whether this model is a breaking change in relation to the previous model.

Arguments:
  • previous: The previous model to compare against.
Returns:

True if this model instance represents a breaking change, False if it's a non-breaking change and None if the nature of the change can't be determined.

model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': (), 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
358def init_private_attributes(self: BaseModel, context: Any, /) -> None:
359    """This function is meant to behave like a BaseModel method to initialise private attributes.
360
361    It takes context as an argument since that's what pydantic-core passes when calling it.
362
363    Args:
364        self: The BaseModel instance.
365        context: The context.
366    """
367    if getattr(self, '__pydantic_private__', None) is None:
368        pydantic_private = {}
369        for name, private_attr in self.__private_attributes__.items():
370            default = private_attr.get_default()
371            if default is not PydanticUndefined:
372                pydantic_private[name] = default
373        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Arguments:
  • self: The BaseModel instance.
  • context: The context.
Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
_Model
python_env
jinja_macros
audit_definitions
mapping_schema
extract_dependencies_from_query
pre_statements_
post_statements_
on_virtual_update_
copy
render_query
render_query_or_raise
render_pre_statements
render_post_statements
render_on_virtual_update
render_audit_query
pre_statements
post_statements
on_virtual_update
macro_definitions
render_signals
render_signal_calls
render_merge_filter
render_physical_properties
render_virtual_properties
render_session_properties
ctas_query
text_diff
set_time_format
convert_to_time_column
set_mapping_schema
update_schema
depends_on
columns_to_types
columns_to_types_or_raise
annotated
sorted_python_env
view_name
schema_name
physical_schema
is_sql
is_seed
depends_on_self
forward_only
disable_restatement
auto_restatement_intervals
auto_restatement_cron
auto_restatement_croniter
wap_supported
is_metadata_only_change
data_hash
audit_metadata_hash
metadata_hash
is_model
grants_table_type
full_depends_on
partitioned_by
partition_interval_unit
audits_with_args
violated_rules_for_query
sqlmesh.core.model.meta.ModelMeta
dialect
name
retention
table_format
storage_format
partitioned_by_
clustered_by
default_catalog
depends_on_
columns_to_types_
column_descriptions_
audits
grains
references
physical_schema_override
physical_properties_
virtual_properties_
session_properties_
allow_partials
signals
enabled
physical_version
gateway
optimize_query
ignored_rules_
formatting
virtual_environment_mode
grants_
grants_target_layer
ignored_rules_validator
session_properties_validator
time_column
unique_key
column_descriptions
lookback
lookback_start
batch_size
batch_concurrency
physical_properties
virtual_properties
session_properties
custom_materialization_properties
grants
all_references
on
managed_columns
when_matched
merge_filter
catalog
fully_qualified_table
fqn
on_destructive_change
on_additive_change
ignored_rules
sqlmesh.core.node._Node
project
description
owner
start
end
cron
cron_tz
interval_unit_
tags
stamp
dbt_node_info_
interval_unit
is_data_change
croniter
cron_next
cron_prev
cron_floor
is_audit
dbt_node_info
sqlmesh.core.node.DbtInfoMixin
dbt_unique_id
dbt_fqn
sqlmesh.utils.pydantic.PydanticModel
dict
json
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class ExternalModel(_Model):
1971class ExternalModel(_Model):
1972    """The model definition which represents an external source/table."""
1973
1974    kind: ModelKind = ExternalKind()
1975    source_type: t.Literal["external"] = "external"
1976
1977    def is_breaking_change(self, previous: Model) -> t.Optional[bool]:
1978        if not isinstance(previous, ExternalModel):
1979            return None
1980        if not previous.columns_to_types_or_raise.items() - self.columns_to_types_or_raise.items():
1981            return False
1982        return None
1983
1984    @property
1985    def depends_on(self) -> t.Set[str]:
1986        return set()
1987
1988    @property
1989    def depends_on_self(self) -> bool:
1990        return False

The model definition which represents an external source/table.

source_type: Literal['external']
def is_breaking_change( self, previous: Union[SqlModel, SeedModel, PythonModel, ExternalModel]) -> Optional[bool]:
1977    def is_breaking_change(self, previous: Model) -> t.Optional[bool]:
1978        if not isinstance(previous, ExternalModel):
1979            return None
1980        if not previous.columns_to_types_or_raise.items() - self.columns_to_types_or_raise.items():
1981            return False
1982        return None

Determines whether this model is a breaking change in relation to the previous model.

Arguments:
  • previous: The previous model to compare against.
Returns:

True if this model instance represents a breaking change, False if it's a non-breaking change and None if the nature of the change can't be determined.

depends_on: Set[str]
1984    @property
1985    def depends_on(self) -> t.Set[str]:
1986        return set()

All of the upstream dependencies referenced in the model's query, excluding self references.

Returns:

A list of all the upstream table names.

depends_on_self: bool
1988    @property
1989    def depends_on_self(self) -> bool:
1990        return False
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': (), 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
358def init_private_attributes(self: BaseModel, context: Any, /) -> None:
359    """This function is meant to behave like a BaseModel method to initialise private attributes.
360
361    It takes context as an argument since that's what pydantic-core passes when calling it.
362
363    Args:
364        self: The BaseModel instance.
365        context: The context.
366    """
367    if getattr(self, '__pydantic_private__', None) is None:
368        pydantic_private = {}
369        for name, private_attr in self.__private_attributes__.items():
370            default = private_attr.get_default()
371            if default is not PydanticUndefined:
372                pydantic_private[name] = default
373        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Arguments:
  • self: The BaseModel instance.
  • context: The context.
Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
_Model
python_env
jinja_macros
audit_definitions
mapping_schema
extract_dependencies_from_query
pre_statements_
post_statements_
on_virtual_update_
copy
render
render_definition
render_query
render_query_or_raise
render_pre_statements
render_post_statements
render_on_virtual_update
render_audit_query
pre_statements
post_statements
on_virtual_update
macro_definitions
render_signals
render_signal_calls
render_merge_filter
render_physical_properties
render_virtual_properties
render_session_properties
ctas_query
text_diff
set_time_format
convert_to_time_column
set_mapping_schema
update_schema
columns_to_types
columns_to_types_or_raise
annotated
sorted_python_env
view_name
schema_name
physical_schema
is_sql
is_python
is_seed
forward_only
disable_restatement
auto_restatement_intervals
auto_restatement_cron
auto_restatement_croniter
wap_supported
validate_definition
is_metadata_only_change
data_hash
audit_metadata_hash
metadata_hash
is_model
grants_table_type
full_depends_on
partitioned_by
partition_interval_unit
audits_with_args
violated_rules_for_query
sqlmesh.core.model.meta.ModelMeta
dialect
name
retention
table_format
storage_format
partitioned_by_
clustered_by
default_catalog
depends_on_
columns_to_types_
column_descriptions_
audits
grains
references
physical_schema_override
physical_properties_
virtual_properties_
session_properties_
allow_partials
signals
enabled
physical_version
gateway
optimize_query
ignored_rules_
formatting
virtual_environment_mode
grants_
grants_target_layer
ignored_rules_validator
session_properties_validator
time_column
unique_key
column_descriptions
lookback
lookback_start
batch_size
batch_concurrency
physical_properties
virtual_properties
session_properties
custom_materialization_properties
grants
all_references
on
managed_columns
when_matched
merge_filter
catalog
fully_qualified_table
fqn
on_destructive_change
on_additive_change
ignored_rules
sqlmesh.core.node._Node
project
description
owner
start
end
cron
cron_tz
interval_unit_
tags
stamp
dbt_node_info_
interval_unit
is_data_change
croniter
cron_next
cron_prev
cron_floor
is_audit
dbt_node_info
sqlmesh.core.node.DbtInfoMixin
dbt_unique_id
dbt_fqn
sqlmesh.utils.pydantic.PydanticModel
dict
json
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
Model = typing.Union[SqlModel, SeedModel, PythonModel, ExternalModel]
class AuditResult(sqlmesh.utils.pydantic.PydanticModel):
1996class AuditResult(PydanticModel):
1997    audit: Audit
1998    """The audit this result is for."""
1999    audit_args: t.Dict[t.Any, t.Any]
2000    """Arguments passed to the audit."""
2001    model: t.Optional[_Model] = None
2002    """The model this audit is for."""
2003    count: t.Optional[int] = None
2004    """The number of records returned by the audit query. This could be None if the audit was skipped."""
2005    query: t.Optional[exp.Expr] = None
2006    """The rendered query used by the audit. This could be None if the audit was skipped."""
2007    skipped: bool = False
2008    """Whether or not the audit was blocking. This can be overriden by the user."""
2009    blocking: bool = True

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of the class variables defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The core schema of the model.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a [RootModel][pydantic.root_model.RootModel].
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_fields__: A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
  • __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
  • __pydantic_extra__: A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
  • __pydantic_fields_set__: The names of fields explicitly set during instantiation.
  • __pydantic_private__: Values of private attributes set on the model instance.
audit_args: Dict[Any, Any]

Arguments passed to the audit.

model: Optional[sqlmesh.core.model.definition._Model]

The model this audit is for.

count: Optional[int]

The number of records returned by the audit query. This could be None if the audit was skipped.

query: Optional[sqlglot.expressions.core.Expr]

The rendered query used by the audit. This could be None if the audit was skipped.

skipped: bool

Whether or not the audit was blocking. This can be overriden by the user.

blocking: bool
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class EvaluatableSignals(sqlmesh.utils.pydantic.PydanticModel):
2012class EvaluatableSignals(PydanticModel):
2013    signals_to_kwargs: t.Dict[str, t.Dict[str, t.Optional[exp.Expr]]]
2014    """A mapping of signal names to the kwargs passed to the signal."""
2015    python_env: t.Dict[str, Executable]
2016    """The Python environment that should be used to evaluated the rendered signal calls."""
2017    prepared_python_env: t.Dict[str, t.Any]
2018    """The prepared Python environment that should be used to evaluated the rendered signal calls."""

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of the class variables defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The core schema of the model.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a [RootModel][pydantic.root_model.RootModel].
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_fields__: A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
  • __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
  • __pydantic_extra__: A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
  • __pydantic_fields_set__: The names of fields explicitly set during instantiation.
  • __pydantic_private__: Values of private attributes set on the model instance.
signals_to_kwargs: Dict[str, Dict[str, Optional[sqlglot.expressions.core.Expr]]]

A mapping of signal names to the kwargs passed to the signal.

The Python environment that should be used to evaluated the rendered signal calls.

prepared_python_env: Dict[str, Any]

The prepared Python environment that should be used to evaluated the rendered signal calls.

model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
def create_models_from_blueprints( gateway: Union[str, sqlglot.expressions.core.Expr, NoneType], blueprints: Any, get_variables: Callable[[Optional[str]], Dict[str, str]], loader: Callable[..., Union[SqlModel, SeedModel, PythonModel, ExternalModel]], path: pathlib.Path = PosixPath('.'), module_path: pathlib.Path = PosixPath('.'), dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType] = None, default_catalog_per_gateway: Optional[Dict[str, str]] = None, **loader_kwargs: Any) -> List[Union[SqlModel, SeedModel, PythonModel, ExternalModel]]:
2057def create_models_from_blueprints(
2058    gateway: t.Optional[str | exp.Expr],
2059    blueprints: t.Any,
2060    get_variables: t.Callable[[t.Optional[str]], t.Dict[str, str]],
2061    loader: t.Callable[..., Model],
2062    path: Path = Path(),
2063    module_path: Path = Path(),
2064    dialect: DialectType = None,
2065    default_catalog_per_gateway: t.Optional[t.Dict[str, str]] = None,
2066    **loader_kwargs: t.Any,
2067) -> t.List[Model]:
2068    model_blueprints: t.List[Model] = []
2069    for blueprint in _extract_blueprints(blueprints, path):
2070        blueprint_variables = _extract_blueprint_variables(blueprint, path)
2071
2072        if gateway:
2073            rendered_gateway = render_expression(
2074                expression=exp.maybe_parse(gateway, dialect=dialect),
2075                module_path=module_path,
2076                macros=loader_kwargs.get("macros"),
2077                jinja_macros=loader_kwargs.get("jinja_macros"),
2078                path=path,
2079                dialect=dialect,
2080                default_catalog=loader_kwargs.get("default_catalog"),
2081                blueprint_variables=blueprint_variables,
2082            )
2083            gateway_name = rendered_gateway[0].name if rendered_gateway else None
2084        else:
2085            gateway_name = None
2086
2087        if (
2088            default_catalog_per_gateway
2089            and gateway_name
2090            and (catalog := default_catalog_per_gateway.get(gateway_name)) is not None
2091        ):
2092            loader_kwargs["default_catalog"] = catalog
2093
2094        model_blueprints.append(
2095            loader(
2096                path=path,
2097                module_path=module_path,
2098                dialect=dialect,
2099                variables=get_variables(gateway_name),
2100                blueprint_variables=blueprint_variables,
2101                **loader_kwargs,
2102            )
2103        )
2104
2105    return model_blueprints
def load_sql_based_models( expressions: List[sqlglot.expressions.core.Expr], get_variables: Callable[[Optional[str]], Dict[str, str]], path: pathlib.Path = PosixPath('.'), module_path: pathlib.Path = PosixPath('.'), dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType] = None, default_catalog_per_gateway: Optional[Dict[str, str]] = None, **loader_kwargs: Any) -> List[Union[SqlModel, SeedModel, PythonModel, ExternalModel]]:
2108def load_sql_based_models(
2109    expressions: t.List[exp.Expr],
2110    get_variables: t.Callable[[t.Optional[str]], t.Dict[str, str]],
2111    path: Path = Path(),
2112    module_path: Path = Path(),
2113    dialect: DialectType = None,
2114    default_catalog_per_gateway: t.Optional[t.Dict[str, str]] = None,
2115    **loader_kwargs: t.Any,
2116) -> t.List[Model]:
2117    gateway: t.Optional[exp.Expr] = None
2118    blueprints: t.Optional[exp.Expr] = None
2119
2120    model_meta = seq_get(expressions, 0)
2121    for prop in (isinstance(model_meta, d.Model) and model_meta.expressions) or []:
2122        if prop.name == "gateway":
2123            gateway = prop.args["value"]
2124        elif prop.name == "blueprints":
2125            # We pop the `blueprints` here to avoid walking large lists when rendering the meta
2126            blueprints = prop.pop().args["value"]
2127
2128    if isinstance(blueprints, d.MacroFunc):
2129        rendered_blueprints = render_expression(
2130            expression=blueprints,
2131            module_path=module_path,
2132            macros=loader_kwargs.get("macros"),
2133            jinja_macros=loader_kwargs.get("jinja_macros"),
2134            variables=get_variables(None),
2135            path=path,
2136            dialect=dialect,
2137            default_catalog=loader_kwargs.get("default_catalog"),
2138        )
2139        if not rendered_blueprints:
2140            raise_config_error("Failed to render blueprints property", path)
2141
2142        # Help mypy see that rendered_blueprints can't be None
2143        assert rendered_blueprints
2144
2145        if len(rendered_blueprints) > 1:
2146            rendered_blueprints = [exp.Tuple(expressions=rendered_blueprints)]
2147
2148        blueprints = rendered_blueprints[0]
2149
2150    return create_models_from_blueprints(
2151        gateway=gateway,
2152        blueprints=blueprints,
2153        get_variables=get_variables,
2154        loader=partial(load_sql_based_model, expressions),
2155        path=path,
2156        module_path=module_path,
2157        dialect=dialect,
2158        default_catalog_per_gateway=default_catalog_per_gateway,
2159        **loader_kwargs,
2160    )
def load_sql_based_model( expressions: List[sqlglot.expressions.core.Expr], *, defaults: Optional[Dict[str, Any]] = None, path: Optional[pathlib.Path] = None, module_path: pathlib.Path = PosixPath('.'), time_column_format: str = '%Y-%m-%d', macros: Optional[sqlmesh.utils.UniqueKeyDict[str, Union[sqlmesh.utils.metaprogramming.Executable, sqlmesh.core.macros.macro]]] = None, jinja_macros: Optional[sqlmesh.utils.jinja.JinjaMacroRegistry] = None, audits: Optional[Dict[str, sqlmesh.core.audit.definition.ModelAudit]] = None, python_env: Optional[Dict[str, sqlmesh.utils.metaprogramming.Executable]] = None, dialect: Optional[str] = None, physical_schema_mapping: Optional[Dict[re.Pattern, str]] = None, default_catalog: Optional[str] = None, variables: Optional[Dict[str, Any]] = None, infer_names: Optional[bool] = False, blueprint_variables: Optional[Dict[str, Any]] = None, **kwargs: Any) -> Union[SqlModel, SeedModel, PythonModel, ExternalModel]:
2163def load_sql_based_model(
2164    expressions: t.List[exp.Expr],
2165    *,
2166    defaults: t.Optional[t.Dict[str, t.Any]] = None,
2167    path: t.Optional[Path] = None,
2168    module_path: Path = Path(),
2169    time_column_format: str = c.DEFAULT_TIME_COLUMN_FORMAT,
2170    macros: t.Optional[MacroRegistry] = None,
2171    jinja_macros: t.Optional[JinjaMacroRegistry] = None,
2172    audits: t.Optional[t.Dict[str, ModelAudit]] = None,
2173    python_env: t.Optional[t.Dict[str, Executable]] = None,
2174    dialect: t.Optional[str] = None,
2175    physical_schema_mapping: t.Optional[t.Dict[re.Pattern, str]] = None,
2176    default_catalog: t.Optional[str] = None,
2177    variables: t.Optional[t.Dict[str, t.Any]] = None,
2178    infer_names: t.Optional[bool] = False,
2179    blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None,
2180    **kwargs: t.Any,
2181) -> Model:
2182    """Load a model from a parsed SQLMesh model SQL file.
2183
2184    Args:
2185        expressions: Model, *Statements, Query.
2186        defaults: Definition default values.
2187        path: An optional path to the model definition file.
2188        module_path: The python module path to serialize macros for.
2189        time_column_format: The default time column format to use if no model time column is configured.
2190        macros: The custom registry of macros. If not provided the default registry will be used.
2191        jinja_macros: The registry of Jinja macros.
2192        python_env: The custom Python environment for macros. If not provided the environment will be constructed
2193            from the macro registry.
2194        dialect: The default dialect if no model dialect is configured.
2195            The format must adhere to Python's strftime codes.
2196        physical_schema_mapping: A mapping of regular expressions to match against the model schema to produce the corresponding physical schema
2197        default_catalog: The default catalog if no model catalog is configured.
2198        variables: The variables to pass to the model.
2199        kwargs: Additional kwargs to pass to the loader.
2200    """
2201    missing_model_msg = f"""Please add a MODEL block at the top of the file. Example:
2202
2203MODEL (
2204  name sqlmesh_example.full_model, --model name
2205  kind FULL, --materialization
2206  cron '@daily', --schedule
2207);
2208
2209Learn more at https://sqlmesh.readthedocs.io/en/stable/concepts/models/overview
2210"""
2211
2212    if not expressions:
2213        raise_config_error(missing_model_msg)
2214
2215    dialect = dialect or ""
2216    meta = expressions[0]
2217    if not isinstance(meta, d.Model):
2218        if not infer_names:
2219            raise_config_error(missing_model_msg)
2220        meta = d.Model(expressions=[])  # Dummy meta node
2221        expressions.insert(0, meta)
2222
2223    # We deliberately hold off rendering some properties at load time because there is not enough information available
2224    # at load time to render them. They will get rendered later at evaluation time
2225    unrendered_properties = {}
2226    unrendered_merge_filter = None
2227
2228    for prop in meta.expressions:
2229        # Macro functions that programmaticaly generate the key-value pair properties should be rendered
2230        # This is needed in the odd case where a macro shares the name of one of the properties
2231        # eg `@session_properties()` Test: `test_macros_in_model_statement` Reference PR: #2574
2232        if isinstance(prop, d.MacroFunc):
2233            continue
2234
2235        prop_name = prop.name.lower()
2236        if prop_name in {"signals", "audits"} | PROPERTIES:
2237            unrendered_properties[prop_name] = prop.args.get("value")
2238        elif (
2239            prop.name.lower() == "kind"
2240            and (value := prop.args.get("value"))
2241            and value.name.lower() == "incremental_by_unique_key"
2242        ):
2243            for kind_prop in value.expressions:
2244                if kind_prop.name.lower() == "merge_filter":
2245                    unrendered_merge_filter = kind_prop
2246
2247    rendered_meta_exprs = render_expression(
2248        expression=meta,
2249        module_path=module_path,
2250        macros=macros,
2251        jinja_macros=jinja_macros,
2252        variables=variables,
2253        path=path,
2254        dialect=dialect,
2255        default_catalog=default_catalog,
2256        blueprint_variables=blueprint_variables,
2257    )
2258
2259    if rendered_meta_exprs is None or len(rendered_meta_exprs) != 1:
2260        raise_config_error(
2261            f"Invalid MODEL statement:\n{meta.sql(dialect=dialect, pretty=True)}",
2262            path,
2263        )
2264        raise
2265
2266    rendered_meta = rendered_meta_exprs[0]
2267
2268    rendered_defaults = (
2269        render_model_defaults(
2270            defaults=defaults,
2271            module_path=module_path,
2272            macros=macros,
2273            jinja_macros=jinja_macros,
2274            variables=variables,
2275            path=path,
2276            dialect=dialect,
2277            default_catalog=default_catalog,
2278        )
2279        if defaults
2280        else {}
2281    )
2282
2283    rendered_defaults = parse_defaults_properties(rendered_defaults, dialect=dialect)
2284
2285    # Extract the query and any pre/post statements
2286    query_or_seed_insert, pre_statements, post_statements, on_virtual_update, inline_audits = (
2287        _split_sql_model_statements(expressions[1:], path, dialect=dialect)
2288    )
2289
2290    meta_fields: t.Dict[str, t.Any] = {
2291        "dialect": dialect,
2292        "description": (
2293            "\n".join(comment.strip() for comment in rendered_meta.comments)
2294            if rendered_meta.comments
2295            else None
2296        ),
2297        **{prop.name.lower(): prop.args.get("value") for prop in rendered_meta.expressions},
2298        **kwargs,
2299    }
2300
2301    # Discard the potentially half-rendered versions of these properties and replace them with the
2302    # original unrendered versions. They will get rendered properly at evaluation time
2303    meta_fields.update(unrendered_properties)
2304
2305    if unrendered_merge_filter:
2306        for idx, kind_prop in enumerate(meta_fields["kind"].expressions):
2307            if kind_prop.name.lower() == "merge_filter":
2308                meta_fields["kind"].expressions[idx] = unrendered_merge_filter
2309
2310    if isinstance(meta_fields.get("dialect"), exp.Expr):
2311        meta_fields["dialect"] = meta_fields["dialect"].name
2312
2313    # The name of the model will be inferred from its path relative to `models/`, if it's not explicitly specified
2314    name = meta_fields.pop("name", "")
2315    if not name and infer_names:
2316        if path is None:
2317            raise ValueError(f"Model {name} must have a name")
2318        name = get_model_name(path)
2319
2320    if not name:
2321        raise_config_error(
2322            "Please add the required 'name' field to the MODEL block at the top of the file.\n\n"
2323            + "Learn more at https://sqlmesh.readthedocs.io/en/stable/concepts/models/overview"
2324        )
2325    if "default_catalog" in meta_fields:
2326        raise_config_error(
2327            "`default_catalog` cannot be set on a per-model basis. It must be set at the connection level.",
2328            path,
2329        )
2330
2331    common_kwargs = dict(
2332        pre_statements=pre_statements,
2333        post_statements=post_statements,
2334        on_virtual_update=on_virtual_update,
2335        defaults=rendered_defaults,
2336        path=path,
2337        module_path=module_path,
2338        macros=macros,
2339        python_env=python_env,
2340        jinja_macros=jinja_macros,
2341        physical_schema_mapping=physical_schema_mapping,
2342        default_catalog=default_catalog,
2343        variables=variables,
2344        inline_audits=inline_audits,
2345        blueprint_variables=blueprint_variables,
2346        use_original_sql=True,
2347        **meta_fields,
2348    )
2349
2350    kind = common_kwargs.pop("kind", ModelMeta.all_field_infos()["kind"].default)
2351
2352    if kind.name != ModelKindName.SEED:
2353        return create_sql_model(
2354            name,
2355            query_or_seed_insert,
2356            kind=kind,
2357            time_column_format=time_column_format,
2358            **common_kwargs,
2359        )
2360
2361    seed_properties = {p.name.lower(): p.args.get("value") for p in kind.expressions}
2362    return create_seed_model(
2363        name,
2364        SeedKind(**seed_properties),
2365        **common_kwargs,
2366    )

Load a model from a parsed SQLMesh model SQL file.

Arguments:
  • expressions: Model, *Statements, Query.
  • defaults: Definition default values.
  • path: An optional path to the model definition file.
  • module_path: The python module path to serialize macros for.
  • time_column_format: The default time column format to use if no model time column is configured.
  • macros: The custom registry of macros. If not provided the default registry will be used.
  • jinja_macros: The registry of Jinja macros.
  • python_env: The custom Python environment for macros. If not provided the environment will be constructed from the macro registry.
  • dialect: The default dialect if no model dialect is configured. The format must adhere to Python's strftime codes.
  • physical_schema_mapping: A mapping of regular expressions to match against the model schema to produce the corresponding physical schema
  • default_catalog: The default catalog if no model catalog is configured.
  • variables: The variables to pass to the model.
  • kwargs: Additional kwargs to pass to the loader.
def create_sql_model( name: Union[str, sqlglot.expressions.query.Table], query: Optional[sqlglot.expressions.core.Expr], **kwargs: Any) -> Union[SqlModel, SeedModel, PythonModel, ExternalModel]:
2369def create_sql_model(
2370    name: TableName,
2371    query: t.Optional[exp.Expr],
2372    **kwargs: t.Any,
2373) -> Model:
2374    """Creates a SQL model.
2375
2376    Args:
2377        name: The name of the model, which is of the form [catalog].[db].table.
2378            The catalog and db are optional.
2379        query: The model's logic in a form of a SELECT query.
2380    """
2381    if not isinstance(query, (exp.Query, d.JinjaQuery, d.MacroFunc)):
2382        raise_config_error(
2383            "A query is required and must be a SELECT statement, a UNION statement, or a JINJA_QUERY block",
2384            kwargs.get("path"),
2385        )
2386        assert isinstance(query, (exp.Query, d.JinjaQuery, d.MacroFunc))
2387
2388    return _create_model(SqlModel, name, query=query, **kwargs)

Creates a SQL model.

Arguments:
  • name: The name of the model, which is of the form [catalog].[db].table. The catalog and db are optional.
  • query: The model's logic in a form of a SELECT query.
def create_seed_model( name: Union[str, sqlglot.expressions.query.Table], seed_kind: sqlmesh.core.model.kind.SeedKind, *, path: Optional[pathlib.Path] = None, module_path: pathlib.Path = PosixPath('.'), **kwargs: Any) -> Union[SqlModel, SeedModel, PythonModel, ExternalModel]:
2391def create_seed_model(
2392    name: TableName,
2393    seed_kind: SeedKind,
2394    *,
2395    path: t.Optional[Path] = None,
2396    module_path: Path = Path(),
2397    **kwargs: t.Any,
2398) -> Model:
2399    """Creates a Seed model.
2400
2401    Args:
2402        name: The name of the model, which is of the form [catalog].[db].table.
2403            The catalog and db are optional.
2404        seed_kind: The information about the location of a seed and other related configuration.
2405        path: An optional path to the model definition file.
2406            from the macro registry.
2407    """
2408    seed_path = Path(seed_kind.path)
2409    marker, *subdirs = seed_path.parts
2410    if marker.lower() == "$root":
2411        seed_path = module_path.joinpath(*subdirs)
2412        seed_kind.path = str(seed_path)
2413    elif not seed_path.is_absolute():
2414        if path is None:
2415            seed_path = seed_path
2416        elif path.is_dir():
2417            seed_path = path / seed_path
2418        else:
2419            seed_path = path.parent / seed_path
2420
2421    seed = create_seed(seed_path)
2422
2423    return _create_model(
2424        SeedModel,
2425        name,
2426        path=path,
2427        seed=seed,
2428        kind=seed_kind,
2429        depends_on=kwargs.pop("depends_on", None),
2430        module_path=module_path,
2431        **kwargs,
2432    )

Creates a Seed model.

Arguments:
  • name: The name of the model, which is of the form [catalog].[db].table. The catalog and db are optional.
  • seed_kind: The information about the location of a seed and other related configuration.
  • path: An optional path to the model definition file. from the macro registry.
def create_python_model( name: str, entrypoint: str, python_env: Dict[str, sqlmesh.utils.metaprogramming.Executable], *, macros: Optional[sqlmesh.utils.UniqueKeyDict[str, Union[sqlmesh.utils.metaprogramming.Executable, sqlmesh.core.macros.macro]]] = None, jinja_macros: Optional[sqlmesh.utils.jinja.JinjaMacroRegistry] = None, path: pathlib.Path = PosixPath('.'), module_path: pathlib.Path = PosixPath('.'), depends_on: Optional[Set[str]] = None, variables: Optional[Dict[str, Any]] = None, blueprint_variables: Optional[Dict[str, Any]] = None, **kwargs: Any) -> Union[SqlModel, SeedModel, PythonModel, ExternalModel]:
2435def create_python_model(
2436    name: str,
2437    entrypoint: str,
2438    python_env: t.Dict[str, Executable],
2439    *,
2440    macros: t.Optional[MacroRegistry] = None,
2441    jinja_macros: t.Optional[JinjaMacroRegistry] = None,
2442    path: Path = Path(),
2443    module_path: Path = Path(),
2444    depends_on: t.Optional[t.Set[str]] = None,
2445    variables: t.Optional[t.Dict[str, t.Any]] = None,
2446    blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None,
2447    **kwargs: t.Any,
2448) -> Model:
2449    """Creates a Python model.
2450
2451    Args:
2452        name: The name of the model, which is of the form [catalog].[db].table.
2453            The catalog and db are optional.
2454        entrypoint: The name of a Python function which contains the data fetching / transformation logic.
2455        python_env: The Python environment of all objects referenced by the model implementation.
2456        path: An optional path to the model definition file.
2457        depends_on: The custom set of model's upstream dependencies.
2458        variables: The variables to pass to the model.
2459        blueprint_variables: The blueprint's variables to pass to the model.
2460    """
2461    # Find dependencies for python models by parsing code if they are not explicitly defined
2462    # Also remove self-references that are found
2463
2464    dialect = kwargs.get("dialect")
2465
2466    dependencies_unspecified = depends_on is None
2467
2468    parsed_depends_on, referenced_variables = (
2469        parse_dependencies(
2470            python_env,
2471            entrypoint,
2472            strict_resolution=dependencies_unspecified,
2473            variables=variables,
2474            blueprint_variables=blueprint_variables,
2475        )
2476        if python_env is not None
2477        else (set(), set())
2478    )
2479    if dependencies_unspecified:
2480        depends_on = parsed_depends_on - {name}
2481    else:
2482        depends_on_rendered = render_expression(
2483            expression=exp.Array(
2484                expressions=[exp.maybe_parse(dep, dialect=dialect) for dep in depends_on or []]
2485            ),
2486            module_path=module_path,
2487            macros=macros,
2488            jinja_macros=jinja_macros,
2489            variables=variables,
2490            path=path,
2491            dialect=dialect,
2492            default_catalog=kwargs.get("default_catalog"),
2493        )
2494        depends_on = {
2495            dep.sql(dialect=dialect)
2496            for dep in t.cast(t.List[exp.Expr], depends_on_rendered)[0].expressions
2497        }
2498
2499    used_variables = {k: v for k, v in (variables or {}).items() if k in referenced_variables}
2500    if used_variables:
2501        python_env[c.SQLMESH_VARS] = Executable.value(used_variables, sort_root_dict=True)
2502
2503    return _create_model(
2504        PythonModel,
2505        name,
2506        path=path,
2507        depends_on=depends_on,
2508        entrypoint=entrypoint,
2509        python_env=python_env,
2510        macros=macros,
2511        jinja_macros=jinja_macros,
2512        module_path=module_path,
2513        variables=variables,
2514        blueprint_variables=blueprint_variables,
2515        **kwargs,
2516    )

Creates a Python model.

Arguments:
  • name: The name of the model, which is of the form [catalog].[db].table. The catalog and db are optional.
  • entrypoint: The name of a Python function which contains the data fetching / transformation logic.
  • python_env: The Python environment of all objects referenced by the model implementation.
  • path: An optional path to the model definition file.
  • depends_on: The custom set of model's upstream dependencies.
  • variables: The variables to pass to the model.
  • blueprint_variables: The blueprint's variables to pass to the model.
def create_external_model( name: Union[str, sqlglot.expressions.query.Table], *, dialect: Optional[str] = None, path: pathlib.Path = PosixPath('.'), defaults: Optional[Dict[str, Any]] = None, **kwargs: Any) -> ExternalModel:
2519def create_external_model(
2520    name: TableName,
2521    *,
2522    dialect: t.Optional[str] = None,
2523    path: Path = Path(),
2524    defaults: t.Optional[t.Dict[str, t.Any]] = None,
2525    **kwargs: t.Any,
2526) -> ExternalModel:
2527    """Creates an external model.
2528
2529    Args:
2530        name: The name of the model, which is of the form [catalog].[db].table.
2531            The catalog and db are optional.
2532        dialect: The dialect to serialize.
2533        path: An optional path to the model definition file.
2534    """
2535    return t.cast(
2536        ExternalModel,
2537        _create_model(
2538            ExternalModel,
2539            name,
2540            defaults=defaults,
2541            dialect=dialect,
2542            path=path,
2543            kind=ModelKindName.EXTERNAL.value,
2544            **kwargs,
2545        ),
2546    )

Creates an external model.

Arguments:
  • name: The name of the model, which is of the form [catalog].[db].table. The catalog and db are optional.
  • dialect: The dialect to serialize.
  • path: An optional path to the model definition file.
INSERT_SEED_MACRO_CALL = MacroFunc( this=Anonymous(this=INSERT_SEED))
def render_meta_fields( fields: Dict[str, Any], module_path: pathlib.Path, path: Optional[pathlib.Path], jinja_macros: Optional[sqlmesh.utils.jinja.JinjaMacroRegistry], macros: Optional[sqlmesh.utils.UniqueKeyDict[str, Union[sqlmesh.utils.metaprogramming.Executable, sqlmesh.core.macros.macro]]], dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType], variables: Optional[Dict[str, Any]], default_catalog: Optional[str], blueprint_variables: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
2866def render_meta_fields(
2867    fields: t.Dict[str, t.Any],
2868    module_path: Path,
2869    path: t.Optional[Path],
2870    jinja_macros: t.Optional[JinjaMacroRegistry],
2871    macros: t.Optional[MacroRegistry],
2872    dialect: DialectType,
2873    variables: t.Optional[t.Dict[str, t.Any]],
2874    default_catalog: t.Optional[str],
2875    blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None,
2876) -> t.Dict[str, t.Any]:
2877    def render_field_value(value: t.Any) -> t.Any:
2878        if isinstance(value, exp.Expr) or (isinstance(value, str) and "@" in value):
2879            expression = exp.maybe_parse(value, dialect=dialect)
2880            rendered_expr = render_expression(
2881                expression=expression,
2882                module_path=module_path,
2883                macros=macros,
2884                jinja_macros=jinja_macros,
2885                variables=variables,
2886                path=path,
2887                dialect=dialect,
2888                default_catalog=default_catalog,
2889                blueprint_variables=blueprint_variables,
2890            )
2891            if not rendered_expr:
2892                raise SQLMeshError(
2893                    f"Rendering `{expression.sql(dialect=dialect)}` did not return an expression"
2894                )
2895
2896            if len(rendered_expr) != 1:
2897                raise SQLMeshError(
2898                    f"Rendering `{expression.sql(dialect=dialect)}` must return one result, but got {len(rendered_expr)}"
2899                )
2900
2901            # For cases where a property is conditionally assigned
2902            if rendered_expr[0].sql().lower() in {"none", "null"}:
2903                return None
2904
2905            return rendered_expr[0]
2906
2907        return value
2908
2909    for field_name, field_info in ModelMeta.all_field_infos().items():
2910        field = field_info.alias or field_name
2911        field_value = fields.get(field)
2912
2913        # We don't want to parse python model cron="@..." kwargs (e.g. @daily) into MacroVar
2914        if (
2915            field == "cron"
2916            and isinstance(field_value, str)
2917            and field_value.lower() in CRON_SHORTCUTS
2918        ) or field_value is None:
2919            continue
2920
2921        if field in RUNTIME_RENDERED_MODEL_FIELDS:
2922            fields[field] = parse_strings_with_macro_refs(field_value, dialect)
2923            continue
2924
2925        if isinstance(field_value, dict):
2926            rendered_dict = {}
2927            for key, value in field_value.items():
2928                if key in RUNTIME_RENDERED_MODEL_FIELDS:
2929                    rendered_dict[key] = parse_strings_with_macro_refs(value, dialect)
2930                elif (
2931                    # don't parse kind auto_restatement_cron="@..." kwargs (e.g. @daily) into MacroVar
2932                    key == "auto_restatement_cron"
2933                    and isinstance(value, str)
2934                    and value.lower() in CRON_SHORTCUTS
2935                ):
2936                    rendered_dict[key] = value
2937                elif (rendered := render_field_value(value)) is not None:
2938                    rendered_dict[key] = rendered
2939
2940            if rendered_dict:
2941                fields[field] = rendered_dict
2942            else:
2943                fields.pop(field)
2944        elif isinstance(field_value, list):
2945            rendered_list = [
2946                rendered
2947                for value in field_value
2948                if (rendered := render_field_value(value)) is not None
2949            ]
2950            if rendered_list:
2951                fields[field] = rendered_list
2952            else:
2953                fields.pop(field)
2954        else:
2955            rendered_field = render_field_value(field_value)
2956            if rendered_field is not None:
2957                fields[field] = rendered_field
2958            else:
2959                fields.pop(field)
2960
2961    return fields
def render_model_defaults( defaults: Dict[str, Any], module_path: pathlib.Path, path: Optional[pathlib.Path], jinja_macros: Optional[sqlmesh.utils.jinja.JinjaMacroRegistry], macros: Optional[sqlmesh.utils.UniqueKeyDict[str, Union[sqlmesh.utils.metaprogramming.Executable, sqlmesh.core.macros.macro]]], dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType], variables: Optional[Dict[str, Any]], default_catalog: Optional[str]) -> Dict[str, Any]:
2964def render_model_defaults(
2965    defaults: t.Dict[str, t.Any],
2966    module_path: Path,
2967    path: t.Optional[Path],
2968    jinja_macros: t.Optional[JinjaMacroRegistry],
2969    macros: t.Optional[MacroRegistry],
2970    dialect: DialectType,
2971    variables: t.Optional[t.Dict[str, t.Any]],
2972    default_catalog: t.Optional[str],
2973) -> t.Dict[str, t.Any]:
2974    rendered_defaults = render_meta_fields(
2975        fields=defaults,
2976        module_path=module_path,
2977        macros=macros,
2978        jinja_macros=jinja_macros,
2979        variables=variables,
2980        path=path,
2981        dialect=dialect,
2982        default_catalog=default_catalog,
2983    )
2984
2985    # Validate defaults that have macros are rendered to boolean
2986    for boolean in {"optimize_query", "allow_partials", "enabled"}:
2987        var = rendered_defaults.get(boolean)
2988        if var is not None and not isinstance(var, (exp.Boolean, bool)):
2989            raise ConfigError(f"Expected boolean for '{var}', got '{type(var)}' instead")
2990
2991    # Validate the 'interval_unit' if present is an Interval Unit
2992    var = rendered_defaults.get("interval_unit")
2993    if isinstance(var, str):
2994        try:
2995            rendered_defaults["interval_unit"] = IntervalUnit(var)
2996        except ValueError as e:
2997            raise ConfigError(f"Invalid interval unit: {var}") from e
2998
2999    return rendered_defaults
def parse_defaults_properties( defaults: Dict[str, Any], dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType]) -> Dict[str, Any]:
3002def parse_defaults_properties(
3003    defaults: t.Dict[str, t.Any], dialect: DialectType
3004) -> t.Dict[str, t.Any]:
3005    for prop in PROPERTIES:
3006        default_properties = defaults.get(prop)
3007        for key, value in (default_properties or {}).items():
3008            if isinstance(key, str) and d.SQLMESH_MACRO_PREFIX in str(value):
3009                defaults[prop][key] = exp.maybe_parse(value, dialect=dialect)
3010
3011    return defaults
def render_expression( expression: sqlglot.expressions.core.Expr, module_path: pathlib.Path, path: Optional[pathlib.Path], jinja_macros: Optional[sqlmesh.utils.jinja.JinjaMacroRegistry] = None, macros: Optional[sqlmesh.utils.UniqueKeyDict[str, Union[sqlmesh.utils.metaprogramming.Executable, sqlmesh.core.macros.macro]]] = None, dialect: Union[str, sqlglot.dialects.dialect.Dialect, Type[sqlglot.dialects.dialect.Dialect], NoneType] = None, variables: Optional[Dict[str, Any]] = None, default_catalog: Optional[str] = None, blueprint_variables: Optional[Dict[str, Any]] = None) -> Optional[List[sqlglot.expressions.core.Expr]]:
3014def render_expression(
3015    expression: exp.Expr,
3016    module_path: Path,
3017    path: t.Optional[Path],
3018    jinja_macros: t.Optional[JinjaMacroRegistry] = None,
3019    macros: t.Optional[MacroRegistry] = None,
3020    dialect: DialectType = None,
3021    variables: t.Optional[t.Dict[str, t.Any]] = None,
3022    default_catalog: t.Optional[str] = None,
3023    blueprint_variables: t.Optional[t.Dict[str, t.Any]] = None,
3024) -> t.Optional[t.List[exp.Expr]]:
3025    meta_python_env = make_python_env(
3026        expressions=expression,
3027        jinja_macro_references=None,
3028        module_path=module_path,
3029        macros=macros or macro.get_registry(),
3030        variables=variables,
3031        path=path,
3032        blueprint_variables=blueprint_variables,
3033    )
3034    return ExpressionRenderer(
3035        expression,
3036        dialect,
3037        [],
3038        path=path,
3039        jinja_macro_registry=jinja_macros,
3040        python_env=meta_python_env,
3041        default_catalog=default_catalog,
3042        quote_identifiers=False,
3043        normalize_identifiers=False,
3044    ).render()
META_FIELD_CONVERTER: Dict[str, Callable] = {'start': <function <lambda>>, 'cron': <function <lambda>>, 'cron_tz': <function <lambda>>, 'partitioned_by_': <function _single_expr_or_tuple>, 'clustered_by': <function _single_expr_or_tuple>, 'depends_on_': <function <lambda>>, 'pre': <function _list_of_calls_to_exp>, 'post': <function _list_of_calls_to_exp>, 'audits': <function _list_of_calls_to_exp>, 'columns_to_types_': <function <lambda>>, 'column_descriptions_': <function <lambda>>, 'tags': <function single_value_or_tuple>, 'grains': <function _refs_to_sql>, 'references': <function _refs_to_sql>, 'physical_properties_': <function <lambda>>, 'virtual_properties_': <function <lambda>>, 'session_properties_': <function <lambda>>, 'allow_partials': <function convert>, 'signals': <function <lambda>>, 'formatting': <class 'str'>, 'optimize_query': <class 'str'>, 'virtual_environment_mode': <function <lambda>>, 'dbt_node_info_': <function <lambda>>, 'grants_': <function <lambda>>, 'grants_target_layer': <function <lambda>>}
def get_model_name(path: pathlib.Path) -> str:
3089def get_model_name(path: Path) -> str:
3090    path_parts = list(path.parts[path.parts.index("models") + 1 : -1]) + [path.stem]
3091    return ".".join(path_parts[-3:])
def clickhouse_partition_func( column: sqlglot.expressions.core.Expr, columns_to_types: Optional[Dict[str, sqlglot.expressions.datatypes.DataType]]) -> sqlglot.expressions.core.Expr:
3095def clickhouse_partition_func(
3096    column: exp.Expr, columns_to_types: t.Optional[t.Dict[str, exp.DataType]]
3097) -> exp.Expr:
3098    # `toMonday()` function accepts a Date or DateTime type column
3099
3100    col_type = (columns_to_types and columns_to_types.get(column.name)) or exp.DataType.build(
3101        "UNKNOWN"
3102    )
3103    col_type_is_conformable = col_type.is_type(
3104        exp.DataType.Type.DATE,
3105        exp.DataType.Type.DATE32,
3106        exp.DataType.Type.DATETIME,
3107        exp.DataType.Type.DATETIME64,
3108    )
3109
3110    #  if input column is already a conformable type, just pass the column
3111    if col_type_is_conformable:
3112        return exp.func("toMonday", column, dialect="clickhouse")
3113
3114    # if input column type is not known, cast input to DateTime64
3115    if col_type.is_type(exp.DataType.Type.UNKNOWN):
3116        return exp.func(
3117            "toMonday",
3118            exp.cast(column, exp.DataType.build("DateTime64(9, 'UTC')", dialect="clickhouse")),
3119            dialect="clickhouse",
3120        )
3121
3122    # if input column type is known but not conformable, cast input to DateTime64 and cast output back to original type
3123    return exp.cast(
3124        exp.func(
3125            "toMonday",
3126            exp.cast(column, exp.DataType.build("DateTime64(9, 'UTC')", dialect="clickhouse")),
3127            dialect="clickhouse",
3128        ),
3129        col_type,
3130    )
TIME_COL_PARTITION_FUNC = {'clickhouse': <function clickhouse_partition_func>}