Edit on GitHub

sqlmesh.core.audit.definition

  1from __future__ import annotations
  2
  3import pathlib
  4import typing as t
  5from functools import cached_property
  6from pathlib import Path
  7
  8from pydantic import Field
  9from sqlglot import exp
 10from sqlglot.optimizer.simplify import gen
 11
 12from sqlmesh.core import dialect as d
 13from sqlmesh.core.macros import MacroRegistry, macro
 14from sqlmesh.core.model.common import (
 15    bool_validator,
 16    default_catalog_validator,
 17    depends_on_validator,
 18    sort_python_env,
 19    sorted_python_env_payloads,
 20)
 21from sqlmesh.core.model.common import make_python_env, single_value_or_tuple, ParsableSql
 22from sqlmesh.core.node import _Node, DbtInfoMixin, DbtNodeInfo
 23from sqlmesh.core.renderer import QueryRenderer
 24from sqlmesh.utils.date import TimeLike
 25from sqlmesh.utils.errors import AuditConfigError, SQLMeshError, raise_config_error
 26from sqlmesh.utils.hashing import hash_data
 27from sqlmesh.utils.jinja import (
 28    JinjaMacroRegistry,
 29    extract_macro_references_and_variables,
 30)
 31from sqlmesh.utils.metaprogramming import Executable
 32from sqlmesh.utils.pydantic import PydanticModel, field_validator, model_validator
 33
 34if t.TYPE_CHECKING:
 35    from sqlmesh.core._typing import Self
 36    from sqlmesh.core.snapshot import DeployabilityIndex, Node, Snapshot
 37
 38
 39class AuditCommonMetaMixin:
 40    """
 41    Metadata for audits which can be defined in SQL.
 42
 43    Args:
 44        name: The unique name of the audit.
 45        dialect: The dialect of the audit query.
 46        skip: Setting this to `true` will cause this audit to be skipped. Defaults to `false`.
 47        blocking: Setting this to `true` will cause the pipeline execution to stop if this audit fails.
 48        standalone: Setting this to `true` will cause this audit to be executed as a standalone audit.
 49    """
 50
 51    name: str
 52    dialect: str
 53    skip: bool
 54    blocking: bool
 55    standalone: bool
 56
 57
 58class AuditMixin(AuditCommonMetaMixin):
 59    """
 60    Mixin for common Audit functionality
 61
 62    Args:
 63        query: The audit query.
 64        defaults: Default values for the audit query.
 65        expressions: Additional sql statements to execute alongside the audit.
 66        jinja_macros: A registry of jinja macros to use when rendering the audit query.
 67    """
 68
 69    query_: ParsableSql
 70    defaults: t.Dict[str, exp.Expr]
 71    expressions_: t.Optional[t.List[ParsableSql]]
 72    jinja_macros: JinjaMacroRegistry
 73    formatting: t.Optional[bool]
 74
 75    @property
 76    def query(self) -> t.Union[exp.Query, d.JinjaQuery]:
 77        return t.cast(t.Union[exp.Query, d.JinjaQuery], self.query_.parse(self.dialect))
 78
 79    @property
 80    def expressions(self) -> t.List[exp.Expr]:
 81        if not self.expressions_:
 82            return []
 83        result: t.List[exp.Expr] = []
 84        for e in self.expressions_:
 85            parsed = e.parse(self.dialect)
 86            if not isinstance(parsed, exp.Semicolon):
 87                result.append(parsed)
 88        return result
 89
 90    @property
 91    def macro_definitions(self) -> t.List[d.MacroDef]:
 92        """All macro definitions from the list of expressions."""
 93        return [s for s in self.expressions if isinstance(s, d.MacroDef)]
 94
 95
 96@field_validator("name", "dialect", mode="before", check_fields=False)
 97def audit_string_validator(cls: t.Type, v: t.Any) -> t.Optional[str]:
 98    if isinstance(v, exp.Expr):
 99        return v.name.lower()
100    return str(v).lower() if v is not None else None
101
102
103@field_validator("defaults", mode="before", check_fields=False)
104def audit_map_validator(cls: t.Type, v: t.Any, values: t.Any) -> t.Dict[str, t.Any]:
105    from sqlmesh.utils.pydantic import get_dialect
106
107    if isinstance(v, exp.Paren):
108        return dict([_maybe_parse_arg_pair(v.unnest())])
109    if isinstance(v, (exp.Tuple, exp.Array)):
110        return dict(map(_maybe_parse_arg_pair, v.expressions))
111    if isinstance(v, dict):
112        dialect = get_dialect(values)
113        return {
114            key: value if isinstance(value, exp.Expr) else d.parse_one(str(value), dialect=dialect)
115            for key, value in v.items()
116        }
117    raise_config_error("Defaults must be a tuple of exp.EQ or a dict", error_type=AuditConfigError)
118    return {}
119
120
121class ModelAudit(PydanticModel, AuditMixin, DbtInfoMixin, frozen=True):
122    """
123    Audit is an assertion made about your tables.
124
125    An audit is a SQL query that returns bad records.
126    """
127
128    name: str
129    dialect: str = ""
130    skip: bool = False
131    blocking: bool = True
132    standalone: t.Literal[False] = False
133    query_: ParsableSql = Field(alias="query")
134    defaults: t.Dict[str, exp.Expr] = {}
135    expressions_: t.Optional[t.List[ParsableSql]] = Field(default=None, alias="expressions")
136    jinja_macros: JinjaMacroRegistry = JinjaMacroRegistry()
137    formatting: t.Optional[bool] = Field(default=None, exclude=True)
138    dbt_node_info_: t.Optional[DbtNodeInfo] = Field(alias="dbt_node_info", default=None)
139
140    _path: t.Optional[Path] = None
141
142    # Validators
143    _query_validator = ParsableSql.validator()
144    _bool_validator = bool_validator
145    _string_validator = audit_string_validator
146    _map_validator = audit_map_validator
147
148    def __str__(self) -> str:
149        path = f": {self._path.name}" if self._path else ""
150        return f"{self.__class__.__name__}<{self.name}{path}>"
151
152    @property
153    def dbt_node_info(self) -> t.Optional[DbtNodeInfo]:
154        return self.dbt_node_info_
155
156
157class StandaloneAudit(_Node, AuditMixin):
158    """
159    Args:
160        depends_on: A list of tables this audit depends on.
161        python_env: Dictionary containing all global variables needed to render the audit's macros.
162    """
163
164    name: str
165    dialect: str = ""
166    skip: bool = False
167    blocking: bool = False
168    standalone: t.Literal[True] = True
169    query_: ParsableSql = Field(alias="query")
170    defaults: t.Dict[str, exp.Expr] = {}
171    expressions_: t.Optional[t.List[ParsableSql]] = Field(default=None, alias="expressions")
172    jinja_macros: JinjaMacroRegistry = JinjaMacroRegistry()
173    default_catalog: t.Optional[str] = None
174    depends_on_: t.Optional[t.Set[str]] = Field(default=None, alias="depends_on")
175    python_env: t.Dict[str, Executable] = {}
176    formatting: t.Optional[bool] = Field(default=None, exclude=True)
177
178    source_type: t.Literal["audit"] = "audit"
179
180    # Validators
181    _query_validator = ParsableSql.validator()
182    _bool_validator = bool_validator
183    _string_validator = audit_string_validator
184    _map_validator = audit_map_validator
185    _default_catalog_validator = default_catalog_validator
186    _depends_on_validator = depends_on_validator
187
188    @model_validator(mode="after")
189    def _node_root_validator(self) -> Self:
190        if self.blocking:
191            raise AuditConfigError(f"Standalone audits cannot be blocking: '{self.name}'.")
192        return self
193
194    def render_audit_query(
195        self,
196        *,
197        start: t.Optional[TimeLike] = None,
198        end: t.Optional[TimeLike] = None,
199        execution_time: t.Optional[TimeLike] = None,
200        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
201        deployability_index: t.Optional[DeployabilityIndex] = None,
202        **kwargs: t.Any,
203    ) -> exp.Query:
204        """Renders the audit's query.
205
206        Args:
207            start: The start datetime to render. Defaults to epoch start.
208            end: The end datetime to render. Defaults to epoch start.
209            execution_time: The date/time time reference to use for execution time.
210            snapshots: All snapshots (by name) to use for mapping of physical locations.
211            deployability_index: Determines snapshots that are deployable in the context of this render.
212            kwargs: Additional kwargs to pass to the renderer.
213
214        Returns:
215            The rendered expression.
216        """
217        query_renderer = QueryRenderer(
218            self.query,
219            self.dialect,
220            self.macro_definitions,
221            path=self._path or Path(),
222            jinja_macro_registry=self.jinja_macros,
223            python_env=self.python_env,
224            default_catalog=self.default_catalog,
225        )
226
227        rendered_query = query_renderer.render(
228            start=start,
229            end=end,
230            execution_time=execution_time,
231            snapshots=snapshots,
232            deployability_index=deployability_index,
233            **{**self.defaults, **kwargs},  # type: ignore
234        )
235
236        if rendered_query is None:
237            raise SQLMeshError(f"Failed to render query for audit '{self.name}'.")
238
239        return rendered_query
240
241    @cached_property
242    def depends_on(self) -> t.Set[str]:
243        depends_on = self.depends_on_ or set()
244
245        query = self.render_audit_query()
246        if query is not None:
247            depends_on |= d.find_tables(
248                query, default_catalog=self.default_catalog, dialect=self.dialect
249            )
250
251        depends_on -= {self.name}
252        return depends_on
253
254    @property
255    def sorted_python_env(self) -> t.List[t.Tuple[str, Executable]]:
256        """Returns the python env sorted by executable kind and then var name."""
257        return sort_python_env(self.python_env)
258
259    @property
260    def data_hash(self) -> str:
261        """
262        Computes the data hash for the node.
263
264        Returns:
265            The data hash for the node.
266        """
267        # StandaloneAudits do not have a data hash
268        return hash_data("")
269
270    @property
271    def metadata_hash(self) -> str:
272        """
273        Computes the metadata hash for the node.
274
275        Args:
276            audits: Available audits by name.
277
278        Returns:
279            The metadata hash for the node.
280        """
281        if self._metadata_hash is None:
282            data = [
283                self.owner,
284                self.description,
285                *sorted(self.tags),
286                str(self.sorted_python_env),
287                self.stamp,
288                self.cron,
289                self.cron_tz.key if self.cron_tz else None,
290            ]
291
292            data.append(self.query_.sql)
293            data.extend([e.sql for e in self.expressions_ or []])
294            self._metadata_hash = hash_data(data)
295        return self._metadata_hash
296
297    def text_diff(self, other: Node, rendered: bool = False) -> str:
298        """Produce a text diff against another node.
299
300        Args:
301            other: The node to diff against.
302            rendered: Whether the diff should be between raw vs rendered nodes
303
304        Returns:
305            A unified text diff showing additions and deletions.
306        """
307        if not isinstance(other, StandaloneAudit):
308            raise SQLMeshError(
309                f"Cannot diff audit '{self.name} against a non-audit node '{other.name}'"
310            )
311
312        return d.text_diff(
313            self.render_definition(render_query=rendered),
314            other.render_definition(render_query=rendered),
315            self.dialect,
316            other.dialect,
317        ).strip()
318
319    def render_definition(
320        self,
321        include_python: bool = True,
322        include_defaults: bool = False,
323        render_query: bool = False,
324    ) -> t.List[exp.Expr]:
325        """Returns the original list of sql expressions comprising the model definition.
326
327        Args:
328            include_python: Whether or not to include Python code in the rendered definition.
329        """
330        expressions: t.List[exp.Expr] = []
331        comment = None
332        for field_name in sorted(self.meta_fields):
333            field_value = getattr(self, field_name)
334            field_info = self.all_field_infos()[field_name]
335            if (
336                field_name == "standalone"
337                or (include_defaults and field_value)
338                or field_value != field_info.default
339            ):
340                if field_name == "description":
341                    comment = field_value
342                else:
343                    expression = exp.Property(
344                        this=field_info.alias or field_name,
345                        value=META_FIELD_CONVERTER.get(field_name, exp.to_identifier)(field_value),
346                    )
347                    if field_name == "name":
348                        expressions.insert(0, expression)
349                    else:
350                        expressions.append(expression)
351
352        audit = d.Audit(expressions=expressions)
353        audit.comments = [comment] if comment else None
354
355        jinja_expressions = []
356        python_expressions = []
357        if include_python:
358            python_env = d.PythonCode(expressions=sorted_python_env_payloads(self.python_env))
359            if python_env.expressions:
360                python_expressions.append(python_env)
361
362            jinja_expressions = self.jinja_macros.to_expressions()
363
364        return [
365            audit,
366            *python_expressions,
367            *jinja_expressions,
368            *self.expressions,
369            self.render_audit_query() if render_query else self.query,
370        ]
371
372    @property
373    def is_audit(self) -> bool:
374        """Return True if this is an audit node"""
375        return True
376
377    @property
378    def meta_fields(self) -> t.Iterable[str]:
379        return set(AuditCommonMetaMixin.__annotations__) | set(_Node.all_field_infos())
380
381    @property
382    def audits_with_args(self) -> t.List[t.Tuple[Audit, t.Dict[str, exp.Expr]]]:
383        return [(self, {})]
384
385
386Audit = t.Union[ModelAudit, StandaloneAudit]
387
388
389def load_audit(
390    expressions: t.List[exp.Expr],
391    *,
392    path: Path = Path(),
393    module_path: Path = Path(),
394    macros: t.Optional[MacroRegistry] = None,
395    jinja_macros: t.Optional[JinjaMacroRegistry] = None,
396    dialect: t.Optional[str] = None,
397    default_catalog: t.Optional[str] = None,
398    variables: t.Optional[t.Dict[str, t.Any]] = None,
399    project: t.Optional[str] = None,
400) -> Audit:
401    """Load an audit from a parsed SQLMesh audit file.
402
403    Args:
404        expressions: Audit, *Statements, Query
405        path: An optional path of the file.
406        dialect: The default dialect if no audit dialect is configured.
407    """
408    if len(expressions) < 2:
409        _raise_config_error("Incomplete audit definition, missing AUDIT or QUERY", path)
410
411    meta, *statements, query = expressions
412
413    if not isinstance(meta, d.Audit):
414        _raise_config_error(
415            "AUDIT statement is required as the first statement in the definition",
416            path,
417        )
418        raise
419
420    meta_fields = {p.name: p.args.get("value") for p in meta.expressions if p}
421
422    standalone_field = meta_fields.pop("standalone", None)
423    if standalone_field and not isinstance(standalone_field, exp.Boolean):
424        _raise_config_error(
425            f"""Standalone must be a boolean for '{meta_fields.get("name")}'""",
426            path,
427        )
428        raise
429    is_standalone = standalone_field and standalone_field.this
430
431    audit_class: t.Union[t.Type[StandaloneAudit], t.Type[ModelAudit]] = (
432        StandaloneAudit if is_standalone else ModelAudit
433    )
434
435    missing_required_fields = audit_class.missing_required_fields(set(meta_fields))
436    missing_required_fields -= {"query"}
437    if missing_required_fields:
438        _raise_config_error(
439            f"Missing required fields {missing_required_fields} in the audit definition",
440            path,
441        )
442
443    extra_fields = audit_class.extra_fields(set(meta_fields))
444
445    if extra_fields:
446        _raise_config_error(f"Invalid extra fields {extra_fields} in the audit definition", path)
447
448    if not isinstance(query, exp.Query) and not isinstance(query, d.JinjaQuery):
449        _raise_config_error("Missing SELECT query in the audit definition", path)
450        raise
451
452    extra_kwargs: t.Dict[str, t.Any] = {}
453    if is_standalone:
454        jinja_macro_refrences, referenced_variables = extract_macro_references_and_variables(
455            *(gen(s) for s in statements),
456            gen(query),
457        )
458        jinja_macros = (jinja_macros or JinjaMacroRegistry()).trim(jinja_macro_refrences)
459        for jinja_macro in jinja_macros.root_macros.values():
460            referenced_variables.update(
461                extract_macro_references_and_variables(jinja_macro.definition)[1]
462            )
463
464        extra_kwargs["jinja_macros"] = jinja_macros
465        extra_kwargs["python_env"] = make_python_env(
466            [*statements, query],
467            jinja_macro_refrences,
468            module_path,
469            macros or macro.get_registry(),
470            variables=variables,
471            referenced_variables=referenced_variables,
472        )
473        extra_kwargs["default_catalog"] = default_catalog
474        if project is not None:
475            extra_kwargs["project"] = project
476
477    dialect = meta_fields.pop("dialect", dialect) or ""
478
479    parsable_query = ParsableSql.from_parsed_expression(query, dialect, use_meta_sql=True)
480    parsable_statements = [
481        ParsableSql.from_parsed_expression(s, dialect, use_meta_sql=True) for s in statements
482    ]
483
484    try:
485        audit = audit_class(
486            query=parsable_query,
487            expressions=parsable_statements,
488            dialect=dialect,
489            **extra_kwargs,
490            **meta_fields,
491        )
492    except Exception as ex:
493        _raise_config_error(str(ex), path)
494
495    audit._path = path
496    return audit
497
498
499def load_multiple_audits(
500    expressions: t.List[exp.Expr],
501    *,
502    path: Path = Path(),
503    module_path: Path = Path(),
504    macros: t.Optional[MacroRegistry] = None,
505    jinja_macros: t.Optional[JinjaMacroRegistry] = None,
506    dialect: t.Optional[str] = None,
507    default_catalog: t.Optional[str] = None,
508    variables: t.Optional[t.Dict[str, t.Any]] = None,
509    project: t.Optional[str] = None,
510) -> t.Generator[Audit, None, None]:
511    audit_block: t.List[exp.Expr] = []
512    for expression in expressions:
513        if isinstance(expression, d.Audit):
514            if audit_block:
515                yield load_audit(
516                    expressions=audit_block,
517                    path=path,
518                    module_path=module_path,
519                    macros=macros,
520                    jinja_macros=jinja_macros,
521                    dialect=dialect,
522                    default_catalog=default_catalog,
523                    variables=variables,
524                    project=project,
525                )
526                audit_block.clear()
527        audit_block.append(expression)
528    yield load_audit(
529        expressions=audit_block,
530        path=path,
531        dialect=dialect,
532        default_catalog=default_catalog,
533        variables=variables,
534        project=project,
535    )
536
537
538def _raise_config_error(msg: str, path: pathlib.Path) -> None:
539    raise_config_error(msg, location=path, error_type=AuditConfigError)
540
541
542# mypy doesn't realize raise_config_error raises an exception
543@t.no_type_check
544def _maybe_parse_arg_pair(e: exp.Expr) -> t.Tuple[str, exp.Expr]:
545    if isinstance(e, exp.EQ):
546        return e.left.name, e.right
547
548
549META_FIELD_CONVERTER: t.Dict[str, t.Callable] = {
550    "start": lambda value: exp.Literal.string(value),
551    "cron": lambda value: exp.Literal.string(value),
552    "skip": exp.convert,
553    "blocking": exp.convert,
554    "standalone": exp.convert,
555    "depends_on_": lambda value: exp.Tuple(expressions=sorted(value)),
556    "tags": single_value_or_tuple,
557    "default_catalog": exp.to_identifier,
558    "dbt_node_info_": lambda value: value.to_expression(),
559}
class AuditCommonMetaMixin:
40class AuditCommonMetaMixin:
41    """
42    Metadata for audits which can be defined in SQL.
43
44    Args:
45        name: The unique name of the audit.
46        dialect: The dialect of the audit query.
47        skip: Setting this to `true` will cause this audit to be skipped. Defaults to `false`.
48        blocking: Setting this to `true` will cause the pipeline execution to stop if this audit fails.
49        standalone: Setting this to `true` will cause this audit to be executed as a standalone audit.
50    """
51
52    name: str
53    dialect: str
54    skip: bool
55    blocking: bool
56    standalone: bool

Metadata for audits which can be defined in SQL.

Arguments:
  • name: The unique name of the audit.
  • dialect: The dialect of the audit query.
  • skip: Setting this to true will cause this audit to be skipped. Defaults to false.
  • blocking: Setting this to true will cause the pipeline execution to stop if this audit fails.
  • standalone: Setting this to true will cause this audit to be executed as a standalone audit.
name: str
dialect: str
skip: bool
blocking: bool
standalone: bool
class AuditMixin(AuditCommonMetaMixin):
59class AuditMixin(AuditCommonMetaMixin):
60    """
61    Mixin for common Audit functionality
62
63    Args:
64        query: The audit query.
65        defaults: Default values for the audit query.
66        expressions: Additional sql statements to execute alongside the audit.
67        jinja_macros: A registry of jinja macros to use when rendering the audit query.
68    """
69
70    query_: ParsableSql
71    defaults: t.Dict[str, exp.Expr]
72    expressions_: t.Optional[t.List[ParsableSql]]
73    jinja_macros: JinjaMacroRegistry
74    formatting: t.Optional[bool]
75
76    @property
77    def query(self) -> t.Union[exp.Query, d.JinjaQuery]:
78        return t.cast(t.Union[exp.Query, d.JinjaQuery], self.query_.parse(self.dialect))
79
80    @property
81    def expressions(self) -> t.List[exp.Expr]:
82        if not self.expressions_:
83            return []
84        result: t.List[exp.Expr] = []
85        for e in self.expressions_:
86            parsed = e.parse(self.dialect)
87            if not isinstance(parsed, exp.Semicolon):
88                result.append(parsed)
89        return result
90
91    @property
92    def macro_definitions(self) -> t.List[d.MacroDef]:
93        """All macro definitions from the list of expressions."""
94        return [s for s in self.expressions if isinstance(s, d.MacroDef)]

Mixin for common Audit functionality

Arguments:
  • query: The audit query.
  • defaults: Default values for the audit query.
  • expressions: Additional sql statements to execute alongside the audit.
  • jinja_macros: A registry of jinja macros to use when rendering the audit query.
defaults: Dict[str, sqlglot.expressions.core.Expr]
expressions_: Optional[List[sqlmesh.core.model.common.ParsableSql]]
formatting: Optional[bool]
query: Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]
76    @property
77    def query(self) -> t.Union[exp.Query, d.JinjaQuery]:
78        return t.cast(t.Union[exp.Query, d.JinjaQuery], self.query_.parse(self.dialect))
expressions: List[sqlglot.expressions.core.Expr]
80    @property
81    def expressions(self) -> t.List[exp.Expr]:
82        if not self.expressions_:
83            return []
84        result: t.List[exp.Expr] = []
85        for e in self.expressions_:
86            parsed = e.parse(self.dialect)
87            if not isinstance(parsed, exp.Semicolon):
88                result.append(parsed)
89        return result
macro_definitions: List[sqlmesh.core.dialect.MacroDef]
91    @property
92    def macro_definitions(self) -> t.List[d.MacroDef]:
93        """All macro definitions from the list of expressions."""
94        return [s for s in self.expressions if isinstance(s, d.MacroDef)]

All macro definitions from the list of expressions.

122class ModelAudit(PydanticModel, AuditMixin, DbtInfoMixin, frozen=True):
123    """
124    Audit is an assertion made about your tables.
125
126    An audit is a SQL query that returns bad records.
127    """
128
129    name: str
130    dialect: str = ""
131    skip: bool = False
132    blocking: bool = True
133    standalone: t.Literal[False] = False
134    query_: ParsableSql = Field(alias="query")
135    defaults: t.Dict[str, exp.Expr] = {}
136    expressions_: t.Optional[t.List[ParsableSql]] = Field(default=None, alias="expressions")
137    jinja_macros: JinjaMacroRegistry = JinjaMacroRegistry()
138    formatting: t.Optional[bool] = Field(default=None, exclude=True)
139    dbt_node_info_: t.Optional[DbtNodeInfo] = Field(alias="dbt_node_info", default=None)
140
141    _path: t.Optional[Path] = None
142
143    # Validators
144    _query_validator = ParsableSql.validator()
145    _bool_validator = bool_validator
146    _string_validator = audit_string_validator
147    _map_validator = audit_map_validator
148
149    def __str__(self) -> str:
150        path = f": {self._path.name}" if self._path else ""
151        return f"{self.__class__.__name__}<{self.name}{path}>"
152
153    @property
154    def dbt_node_info(self) -> t.Optional[DbtNodeInfo]:
155        return self.dbt_node_info_

Audit is an assertion made about your tables.

An audit is a SQL query that returns bad records.

name: str
dialect: str
skip: bool
blocking: bool
standalone: Literal[False]
defaults: Dict[str, sqlglot.expressions.core.Expr]
expressions_: Optional[List[sqlmesh.core.model.common.ParsableSql]]
formatting: Optional[bool]
dbt_node_info_: Optional[sqlmesh.core.node.DbtNodeInfo]
dbt_node_info: Optional[sqlmesh.core.node.DbtNodeInfo]
153    @property
154    def dbt_node_info(self) -> t.Optional[DbtNodeInfo]:
155        return self.dbt_node_info_
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': (), 'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
358def init_private_attributes(self: BaseModel, context: Any, /) -> None:
359    """This function is meant to behave like a BaseModel method to initialise private attributes.
360
361    It takes context as an argument since that's what pydantic-core passes when calling it.
362
363    Args:
364        self: The BaseModel instance.
365        context: The context.
366    """
367    if getattr(self, '__pydantic_private__', None) is None:
368        pydantic_private = {}
369        for name, private_attr in self.__private_attributes__.items():
370            default = private_attr.get_default()
371            if default is not PydanticUndefined:
372                pydantic_private[name] = default
373        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Arguments:
  • self: The BaseModel instance.
  • context: The context.
Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
AuditMixin
query
expressions
macro_definitions
sqlmesh.core.node.DbtInfoMixin
dbt_unique_id
dbt_fqn
class StandaloneAudit(sqlmesh.core.node._Node, AuditMixin):
158class StandaloneAudit(_Node, AuditMixin):
159    """
160    Args:
161        depends_on: A list of tables this audit depends on.
162        python_env: Dictionary containing all global variables needed to render the audit's macros.
163    """
164
165    name: str
166    dialect: str = ""
167    skip: bool = False
168    blocking: bool = False
169    standalone: t.Literal[True] = True
170    query_: ParsableSql = Field(alias="query")
171    defaults: t.Dict[str, exp.Expr] = {}
172    expressions_: t.Optional[t.List[ParsableSql]] = Field(default=None, alias="expressions")
173    jinja_macros: JinjaMacroRegistry = JinjaMacroRegistry()
174    default_catalog: t.Optional[str] = None
175    depends_on_: t.Optional[t.Set[str]] = Field(default=None, alias="depends_on")
176    python_env: t.Dict[str, Executable] = {}
177    formatting: t.Optional[bool] = Field(default=None, exclude=True)
178
179    source_type: t.Literal["audit"] = "audit"
180
181    # Validators
182    _query_validator = ParsableSql.validator()
183    _bool_validator = bool_validator
184    _string_validator = audit_string_validator
185    _map_validator = audit_map_validator
186    _default_catalog_validator = default_catalog_validator
187    _depends_on_validator = depends_on_validator
188
189    @model_validator(mode="after")
190    def _node_root_validator(self) -> Self:
191        if self.blocking:
192            raise AuditConfigError(f"Standalone audits cannot be blocking: '{self.name}'.")
193        return self
194
195    def render_audit_query(
196        self,
197        *,
198        start: t.Optional[TimeLike] = None,
199        end: t.Optional[TimeLike] = None,
200        execution_time: t.Optional[TimeLike] = None,
201        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
202        deployability_index: t.Optional[DeployabilityIndex] = None,
203        **kwargs: t.Any,
204    ) -> exp.Query:
205        """Renders the audit's query.
206
207        Args:
208            start: The start datetime to render. Defaults to epoch start.
209            end: The end datetime to render. Defaults to epoch start.
210            execution_time: The date/time time reference to use for execution time.
211            snapshots: All snapshots (by name) to use for mapping of physical locations.
212            deployability_index: Determines snapshots that are deployable in the context of this render.
213            kwargs: Additional kwargs to pass to the renderer.
214
215        Returns:
216            The rendered expression.
217        """
218        query_renderer = QueryRenderer(
219            self.query,
220            self.dialect,
221            self.macro_definitions,
222            path=self._path or Path(),
223            jinja_macro_registry=self.jinja_macros,
224            python_env=self.python_env,
225            default_catalog=self.default_catalog,
226        )
227
228        rendered_query = query_renderer.render(
229            start=start,
230            end=end,
231            execution_time=execution_time,
232            snapshots=snapshots,
233            deployability_index=deployability_index,
234            **{**self.defaults, **kwargs},  # type: ignore
235        )
236
237        if rendered_query is None:
238            raise SQLMeshError(f"Failed to render query for audit '{self.name}'.")
239
240        return rendered_query
241
242    @cached_property
243    def depends_on(self) -> t.Set[str]:
244        depends_on = self.depends_on_ or set()
245
246        query = self.render_audit_query()
247        if query is not None:
248            depends_on |= d.find_tables(
249                query, default_catalog=self.default_catalog, dialect=self.dialect
250            )
251
252        depends_on -= {self.name}
253        return depends_on
254
255    @property
256    def sorted_python_env(self) -> t.List[t.Tuple[str, Executable]]:
257        """Returns the python env sorted by executable kind and then var name."""
258        return sort_python_env(self.python_env)
259
260    @property
261    def data_hash(self) -> str:
262        """
263        Computes the data hash for the node.
264
265        Returns:
266            The data hash for the node.
267        """
268        # StandaloneAudits do not have a data hash
269        return hash_data("")
270
271    @property
272    def metadata_hash(self) -> str:
273        """
274        Computes the metadata hash for the node.
275
276        Args:
277            audits: Available audits by name.
278
279        Returns:
280            The metadata hash for the node.
281        """
282        if self._metadata_hash is None:
283            data = [
284                self.owner,
285                self.description,
286                *sorted(self.tags),
287                str(self.sorted_python_env),
288                self.stamp,
289                self.cron,
290                self.cron_tz.key if self.cron_tz else None,
291            ]
292
293            data.append(self.query_.sql)
294            data.extend([e.sql for e in self.expressions_ or []])
295            self._metadata_hash = hash_data(data)
296        return self._metadata_hash
297
298    def text_diff(self, other: Node, rendered: bool = False) -> str:
299        """Produce a text diff against another node.
300
301        Args:
302            other: The node to diff against.
303            rendered: Whether the diff should be between raw vs rendered nodes
304
305        Returns:
306            A unified text diff showing additions and deletions.
307        """
308        if not isinstance(other, StandaloneAudit):
309            raise SQLMeshError(
310                f"Cannot diff audit '{self.name} against a non-audit node '{other.name}'"
311            )
312
313        return d.text_diff(
314            self.render_definition(render_query=rendered),
315            other.render_definition(render_query=rendered),
316            self.dialect,
317            other.dialect,
318        ).strip()
319
320    def render_definition(
321        self,
322        include_python: bool = True,
323        include_defaults: bool = False,
324        render_query: bool = False,
325    ) -> t.List[exp.Expr]:
326        """Returns the original list of sql expressions comprising the model definition.
327
328        Args:
329            include_python: Whether or not to include Python code in the rendered definition.
330        """
331        expressions: t.List[exp.Expr] = []
332        comment = None
333        for field_name in sorted(self.meta_fields):
334            field_value = getattr(self, field_name)
335            field_info = self.all_field_infos()[field_name]
336            if (
337                field_name == "standalone"
338                or (include_defaults and field_value)
339                or field_value != field_info.default
340            ):
341                if field_name == "description":
342                    comment = field_value
343                else:
344                    expression = exp.Property(
345                        this=field_info.alias or field_name,
346                        value=META_FIELD_CONVERTER.get(field_name, exp.to_identifier)(field_value),
347                    )
348                    if field_name == "name":
349                        expressions.insert(0, expression)
350                    else:
351                        expressions.append(expression)
352
353        audit = d.Audit(expressions=expressions)
354        audit.comments = [comment] if comment else None
355
356        jinja_expressions = []
357        python_expressions = []
358        if include_python:
359            python_env = d.PythonCode(expressions=sorted_python_env_payloads(self.python_env))
360            if python_env.expressions:
361                python_expressions.append(python_env)
362
363            jinja_expressions = self.jinja_macros.to_expressions()
364
365        return [
366            audit,
367            *python_expressions,
368            *jinja_expressions,
369            *self.expressions,
370            self.render_audit_query() if render_query else self.query,
371        ]
372
373    @property
374    def is_audit(self) -> bool:
375        """Return True if this is an audit node"""
376        return True
377
378    @property
379    def meta_fields(self) -> t.Iterable[str]:
380        return set(AuditCommonMetaMixin.__annotations__) | set(_Node.all_field_infos())
381
382    @property
383    def audits_with_args(self) -> t.List[t.Tuple[Audit, t.Dict[str, exp.Expr]]]:
384        return [(self, {})]
Arguments:
  • depends_on: A list of tables this audit depends on.
  • python_env: Dictionary containing all global variables needed to render the audit's macros.
name: str
dialect: str
skip: bool
blocking: bool
standalone: Literal[True]
defaults: Dict[str, sqlglot.expressions.core.Expr]
expressions_: Optional[List[sqlmesh.core.model.common.ParsableSql]]
default_catalog: Optional[str]
depends_on_: Optional[Set[str]]
formatting: Optional[bool]
source_type: Literal['audit']
def render_audit_query( self, *, start: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, end: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, execution_time: Union[datetime.date, datetime.datetime, str, int, float, NoneType] = None, snapshots: Optional[Dict[str, sqlmesh.core.snapshot.definition.Snapshot]] = None, deployability_index: Optional[sqlmesh.core.snapshot.definition.DeployabilityIndex] = None, **kwargs: Any) -> sqlglot.expressions.query.Query:
195    def render_audit_query(
196        self,
197        *,
198        start: t.Optional[TimeLike] = None,
199        end: t.Optional[TimeLike] = None,
200        execution_time: t.Optional[TimeLike] = None,
201        snapshots: t.Optional[t.Dict[str, Snapshot]] = None,
202        deployability_index: t.Optional[DeployabilityIndex] = None,
203        **kwargs: t.Any,
204    ) -> exp.Query:
205        """Renders the audit's query.
206
207        Args:
208            start: The start datetime to render. Defaults to epoch start.
209            end: The end datetime to render. Defaults to epoch start.
210            execution_time: The date/time time reference to use for execution time.
211            snapshots: All snapshots (by name) to use for mapping of physical locations.
212            deployability_index: Determines snapshots that are deployable in the context of this render.
213            kwargs: Additional kwargs to pass to the renderer.
214
215        Returns:
216            The rendered expression.
217        """
218        query_renderer = QueryRenderer(
219            self.query,
220            self.dialect,
221            self.macro_definitions,
222            path=self._path or Path(),
223            jinja_macro_registry=self.jinja_macros,
224            python_env=self.python_env,
225            default_catalog=self.default_catalog,
226        )
227
228        rendered_query = query_renderer.render(
229            start=start,
230            end=end,
231            execution_time=execution_time,
232            snapshots=snapshots,
233            deployability_index=deployability_index,
234            **{**self.defaults, **kwargs},  # type: ignore
235        )
236
237        if rendered_query is None:
238            raise SQLMeshError(f"Failed to render query for audit '{self.name}'.")
239
240        return rendered_query

Renders the audit's query.

Arguments:
  • start: The start datetime to render. Defaults to epoch start.
  • end: The end datetime to render. Defaults to epoch start.
  • execution_time: The date/time time reference to use for execution time.
  • snapshots: All snapshots (by name) to use for mapping of physical locations.
  • deployability_index: Determines snapshots that are deployable in the context of this render.
  • kwargs: Additional kwargs to pass to the renderer.
Returns:

The rendered expression.

depends_on: Set[str]
242    @cached_property
243    def depends_on(self) -> t.Set[str]:
244        depends_on = self.depends_on_ or set()
245
246        query = self.render_audit_query()
247        if query is not None:
248            depends_on |= d.find_tables(
249                query, default_catalog=self.default_catalog, dialect=self.dialect
250            )
251
252        depends_on -= {self.name}
253        return depends_on
sorted_python_env: List[Tuple[str, sqlmesh.utils.metaprogramming.Executable]]
255    @property
256    def sorted_python_env(self) -> t.List[t.Tuple[str, Executable]]:
257        """Returns the python env sorted by executable kind and then var name."""
258        return sort_python_env(self.python_env)

Returns the python env sorted by executable kind and then var name.

data_hash: str
260    @property
261    def data_hash(self) -> str:
262        """
263        Computes the data hash for the node.
264
265        Returns:
266            The data hash for the node.
267        """
268        # StandaloneAudits do not have a data hash
269        return hash_data("")

Computes the data hash for the node.

Returns:

The data hash for the node.

metadata_hash: str
271    @property
272    def metadata_hash(self) -> str:
273        """
274        Computes the metadata hash for the node.
275
276        Args:
277            audits: Available audits by name.
278
279        Returns:
280            The metadata hash for the node.
281        """
282        if self._metadata_hash is None:
283            data = [
284                self.owner,
285                self.description,
286                *sorted(self.tags),
287                str(self.sorted_python_env),
288                self.stamp,
289                self.cron,
290                self.cron_tz.key if self.cron_tz else None,
291            ]
292
293            data.append(self.query_.sql)
294            data.extend([e.sql for e in self.expressions_ or []])
295            self._metadata_hash = hash_data(data)
296        return self._metadata_hash

Computes the metadata hash for the node.

Arguments:
  • audits: Available audits by name.
Returns:

The metadata hash for the node.

def text_diff( self, other: Annotated[Union[sqlmesh.core.model.definition.SqlModel, sqlmesh.core.model.definition.SeedModel, sqlmesh.core.model.definition.PythonModel, sqlmesh.core.model.definition.ExternalModel, StandaloneAudit], FieldInfo(annotation=NoneType, required=True, discriminator='source_type')], rendered: bool = False) -> str:
298    def text_diff(self, other: Node, rendered: bool = False) -> str:
299        """Produce a text diff against another node.
300
301        Args:
302            other: The node to diff against.
303            rendered: Whether the diff should be between raw vs rendered nodes
304
305        Returns:
306            A unified text diff showing additions and deletions.
307        """
308        if not isinstance(other, StandaloneAudit):
309            raise SQLMeshError(
310                f"Cannot diff audit '{self.name} against a non-audit node '{other.name}'"
311            )
312
313        return d.text_diff(
314            self.render_definition(render_query=rendered),
315            other.render_definition(render_query=rendered),
316            self.dialect,
317            other.dialect,
318        ).strip()

Produce a text diff against another node.

Arguments:
  • other: The node to diff against.
  • rendered: Whether the diff should be between raw vs rendered nodes
Returns:

A unified text diff showing additions and deletions.

def render_definition( self, include_python: bool = True, include_defaults: bool = False, render_query: bool = False) -> List[sqlglot.expressions.core.Expr]:
320    def render_definition(
321        self,
322        include_python: bool = True,
323        include_defaults: bool = False,
324        render_query: bool = False,
325    ) -> t.List[exp.Expr]:
326        """Returns the original list of sql expressions comprising the model definition.
327
328        Args:
329            include_python: Whether or not to include Python code in the rendered definition.
330        """
331        expressions: t.List[exp.Expr] = []
332        comment = None
333        for field_name in sorted(self.meta_fields):
334            field_value = getattr(self, field_name)
335            field_info = self.all_field_infos()[field_name]
336            if (
337                field_name == "standalone"
338                or (include_defaults and field_value)
339                or field_value != field_info.default
340            ):
341                if field_name == "description":
342                    comment = field_value
343                else:
344                    expression = exp.Property(
345                        this=field_info.alias or field_name,
346                        value=META_FIELD_CONVERTER.get(field_name, exp.to_identifier)(field_value),
347                    )
348                    if field_name == "name":
349                        expressions.insert(0, expression)
350                    else:
351                        expressions.append(expression)
352
353        audit = d.Audit(expressions=expressions)
354        audit.comments = [comment] if comment else None
355
356        jinja_expressions = []
357        python_expressions = []
358        if include_python:
359            python_env = d.PythonCode(expressions=sorted_python_env_payloads(self.python_env))
360            if python_env.expressions:
361                python_expressions.append(python_env)
362
363            jinja_expressions = self.jinja_macros.to_expressions()
364
365        return [
366            audit,
367            *python_expressions,
368            *jinja_expressions,
369            *self.expressions,
370            self.render_audit_query() if render_query else self.query,
371        ]

Returns the original list of sql expressions comprising the model definition.

Arguments:
  • include_python: Whether or not to include Python code in the rendered definition.
is_audit: bool
373    @property
374    def is_audit(self) -> bool:
375        """Return True if this is an audit node"""
376        return True

Return True if this is an audit node

meta_fields: Iterable[str]
378    @property
379    def meta_fields(self) -> t.Iterable[str]:
380        return set(AuditCommonMetaMixin.__annotations__) | set(_Node.all_field_infos())
audits_with_args: List[Tuple[Union[ModelAudit, StandaloneAudit], Dict[str, sqlglot.expressions.core.Expr]]]
382    @property
383    def audits_with_args(self) -> t.List[t.Tuple[Audit, t.Dict[str, exp.Expr]]]:
384        return [(self, {})]
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
358def init_private_attributes(self: BaseModel, context: Any, /) -> None:
359    """This function is meant to behave like a BaseModel method to initialise private attributes.
360
361    It takes context as an argument since that's what pydantic-core passes when calling it.
362
363    Args:
364        self: The BaseModel instance.
365        context: The context.
366    """
367    if getattr(self, '__pydantic_private__', None) is None:
368        pydantic_private = {}
369        for name, private_attr in self.__private_attributes__.items():
370            default = private_attr.get_default()
371            if default is not PydanticUndefined:
372                pydantic_private[name] = default
373        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Arguments:
  • self: The BaseModel instance.
  • context: The context.
Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.core.node._Node
project
description
owner
start
end
cron
cron_tz
interval_unit_
tags
stamp
dbt_node_info_
copy
batch_size
batch_concurrency
interval_unit
fqn
is_metadata_only_change
is_data_change
croniter
cron_next
cron_prev
cron_floor
is_model
dbt_node_info
sqlmesh.core.node.DbtInfoMixin
dbt_unique_id
dbt_fqn
sqlmesh.utils.pydantic.PydanticModel
dict
json
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
AuditMixin
query
expressions
macro_definitions
Audit = typing.Union[ModelAudit, StandaloneAudit]
def load_audit( expressions: List[sqlglot.expressions.core.Expr], *, path: pathlib.Path = PosixPath('.'), module_path: pathlib.Path = PosixPath('.'), macros: Optional[sqlmesh.utils.UniqueKeyDict[str, Union[sqlmesh.utils.metaprogramming.Executable, sqlmesh.core.macros.macro]]] = None, jinja_macros: Optional[sqlmesh.utils.jinja.JinjaMacroRegistry] = None, dialect: Optional[str] = None, default_catalog: Optional[str] = None, variables: Optional[Dict[str, Any]] = None, project: Optional[str] = None) -> Union[ModelAudit, StandaloneAudit]:
390def load_audit(
391    expressions: t.List[exp.Expr],
392    *,
393    path: Path = Path(),
394    module_path: Path = Path(),
395    macros: t.Optional[MacroRegistry] = None,
396    jinja_macros: t.Optional[JinjaMacroRegistry] = None,
397    dialect: t.Optional[str] = None,
398    default_catalog: t.Optional[str] = None,
399    variables: t.Optional[t.Dict[str, t.Any]] = None,
400    project: t.Optional[str] = None,
401) -> Audit:
402    """Load an audit from a parsed SQLMesh audit file.
403
404    Args:
405        expressions: Audit, *Statements, Query
406        path: An optional path of the file.
407        dialect: The default dialect if no audit dialect is configured.
408    """
409    if len(expressions) < 2:
410        _raise_config_error("Incomplete audit definition, missing AUDIT or QUERY", path)
411
412    meta, *statements, query = expressions
413
414    if not isinstance(meta, d.Audit):
415        _raise_config_error(
416            "AUDIT statement is required as the first statement in the definition",
417            path,
418        )
419        raise
420
421    meta_fields = {p.name: p.args.get("value") for p in meta.expressions if p}
422
423    standalone_field = meta_fields.pop("standalone", None)
424    if standalone_field and not isinstance(standalone_field, exp.Boolean):
425        _raise_config_error(
426            f"""Standalone must be a boolean for '{meta_fields.get("name")}'""",
427            path,
428        )
429        raise
430    is_standalone = standalone_field and standalone_field.this
431
432    audit_class: t.Union[t.Type[StandaloneAudit], t.Type[ModelAudit]] = (
433        StandaloneAudit if is_standalone else ModelAudit
434    )
435
436    missing_required_fields = audit_class.missing_required_fields(set(meta_fields))
437    missing_required_fields -= {"query"}
438    if missing_required_fields:
439        _raise_config_error(
440            f"Missing required fields {missing_required_fields} in the audit definition",
441            path,
442        )
443
444    extra_fields = audit_class.extra_fields(set(meta_fields))
445
446    if extra_fields:
447        _raise_config_error(f"Invalid extra fields {extra_fields} in the audit definition", path)
448
449    if not isinstance(query, exp.Query) and not isinstance(query, d.JinjaQuery):
450        _raise_config_error("Missing SELECT query in the audit definition", path)
451        raise
452
453    extra_kwargs: t.Dict[str, t.Any] = {}
454    if is_standalone:
455        jinja_macro_refrences, referenced_variables = extract_macro_references_and_variables(
456            *(gen(s) for s in statements),
457            gen(query),
458        )
459        jinja_macros = (jinja_macros or JinjaMacroRegistry()).trim(jinja_macro_refrences)
460        for jinja_macro in jinja_macros.root_macros.values():
461            referenced_variables.update(
462                extract_macro_references_and_variables(jinja_macro.definition)[1]
463            )
464
465        extra_kwargs["jinja_macros"] = jinja_macros
466        extra_kwargs["python_env"] = make_python_env(
467            [*statements, query],
468            jinja_macro_refrences,
469            module_path,
470            macros or macro.get_registry(),
471            variables=variables,
472            referenced_variables=referenced_variables,
473        )
474        extra_kwargs["default_catalog"] = default_catalog
475        if project is not None:
476            extra_kwargs["project"] = project
477
478    dialect = meta_fields.pop("dialect", dialect) or ""
479
480    parsable_query = ParsableSql.from_parsed_expression(query, dialect, use_meta_sql=True)
481    parsable_statements = [
482        ParsableSql.from_parsed_expression(s, dialect, use_meta_sql=True) for s in statements
483    ]
484
485    try:
486        audit = audit_class(
487            query=parsable_query,
488            expressions=parsable_statements,
489            dialect=dialect,
490            **extra_kwargs,
491            **meta_fields,
492        )
493    except Exception as ex:
494        _raise_config_error(str(ex), path)
495
496    audit._path = path
497    return audit

Load an audit from a parsed SQLMesh audit file.

Arguments:
  • expressions: Audit, *Statements, Query
  • path: An optional path of the file.
  • dialect: The default dialect if no audit dialect is configured.
def load_multiple_audits( expressions: List[sqlglot.expressions.core.Expr], *, path: pathlib.Path = PosixPath('.'), module_path: pathlib.Path = PosixPath('.'), macros: Optional[sqlmesh.utils.UniqueKeyDict[str, Union[sqlmesh.utils.metaprogramming.Executable, sqlmesh.core.macros.macro]]] = None, jinja_macros: Optional[sqlmesh.utils.jinja.JinjaMacroRegistry] = None, dialect: Optional[str] = None, default_catalog: Optional[str] = None, variables: Optional[Dict[str, Any]] = None, project: Optional[str] = None) -> Generator[Union[ModelAudit, StandaloneAudit], NoneType, NoneType]:
500def load_multiple_audits(
501    expressions: t.List[exp.Expr],
502    *,
503    path: Path = Path(),
504    module_path: Path = Path(),
505    macros: t.Optional[MacroRegistry] = None,
506    jinja_macros: t.Optional[JinjaMacroRegistry] = None,
507    dialect: t.Optional[str] = None,
508    default_catalog: t.Optional[str] = None,
509    variables: t.Optional[t.Dict[str, t.Any]] = None,
510    project: t.Optional[str] = None,
511) -> t.Generator[Audit, None, None]:
512    audit_block: t.List[exp.Expr] = []
513    for expression in expressions:
514        if isinstance(expression, d.Audit):
515            if audit_block:
516                yield load_audit(
517                    expressions=audit_block,
518                    path=path,
519                    module_path=module_path,
520                    macros=macros,
521                    jinja_macros=jinja_macros,
522                    dialect=dialect,
523                    default_catalog=default_catalog,
524                    variables=variables,
525                    project=project,
526                )
527                audit_block.clear()
528        audit_block.append(expression)
529    yield load_audit(
530        expressions=audit_block,
531        path=path,
532        dialect=dialect,
533        default_catalog=default_catalog,
534        variables=variables,
535        project=project,
536    )
META_FIELD_CONVERTER: Dict[str, Callable] = {'start': <function <lambda>>, 'cron': <function <lambda>>, 'skip': <function convert>, 'blocking': <function convert>, 'standalone': <function convert>, 'depends_on_': <function <lambda>>, 'tags': <function single_value_or_tuple>, 'default_catalog': <function to_identifier>, 'dbt_node_info_': <function <lambda>>}