sqlmesh.core.audit.definition
1from __future__ import annotations 2 3import pathlib 4import typing as t 5from functools import cached_property 6from pathlib import Path 7 8from pydantic import Field 9from sqlglot import exp 10from sqlglot.optimizer.simplify import gen 11 12from sqlmesh.core import dialect as d 13from sqlmesh.core.macros import MacroRegistry, macro 14from sqlmesh.core.model.common import ( 15 bool_validator, 16 default_catalog_validator, 17 depends_on_validator, 18 sort_python_env, 19 sorted_python_env_payloads, 20) 21from sqlmesh.core.model.common import make_python_env, single_value_or_tuple, ParsableSql 22from sqlmesh.core.node import _Node, DbtInfoMixin, DbtNodeInfo 23from sqlmesh.core.renderer import QueryRenderer 24from sqlmesh.utils.date import TimeLike 25from sqlmesh.utils.errors import AuditConfigError, SQLMeshError, raise_config_error 26from sqlmesh.utils.hashing import hash_data 27from sqlmesh.utils.jinja import ( 28 JinjaMacroRegistry, 29 extract_macro_references_and_variables, 30) 31from sqlmesh.utils.metaprogramming import Executable 32from sqlmesh.utils.pydantic import PydanticModel, field_validator, model_validator 33 34if t.TYPE_CHECKING: 35 from sqlmesh.core._typing import Self 36 from sqlmesh.core.snapshot import DeployabilityIndex, Node, Snapshot 37 38 39class AuditCommonMetaMixin: 40 """ 41 Metadata for audits which can be defined in SQL. 42 43 Args: 44 name: The unique name of the audit. 45 dialect: The dialect of the audit query. 46 skip: Setting this to `true` will cause this audit to be skipped. Defaults to `false`. 47 blocking: Setting this to `true` will cause the pipeline execution to stop if this audit fails. 48 standalone: Setting this to `true` will cause this audit to be executed as a standalone audit. 49 """ 50 51 name: str 52 dialect: str 53 skip: bool 54 blocking: bool 55 standalone: bool 56 57 58class AuditMixin(AuditCommonMetaMixin): 59 """ 60 Mixin for common Audit functionality 61 62 Args: 63 query: The audit query. 64 defaults: Default values for the audit query. 65 expressions: Additional sql statements to execute alongside the audit. 66 jinja_macros: A registry of jinja macros to use when rendering the audit query. 67 """ 68 69 query_: ParsableSql 70 defaults: t.Dict[str, exp.Expr] 71 expressions_: t.Optional[t.List[ParsableSql]] 72 jinja_macros: JinjaMacroRegistry 73 formatting: t.Optional[bool] 74 75 @property 76 def query(self) -> t.Union[exp.Query, d.JinjaQuery]: 77 return t.cast(t.Union[exp.Query, d.JinjaQuery], self.query_.parse(self.dialect)) 78 79 @property 80 def expressions(self) -> t.List[exp.Expr]: 81 if not self.expressions_: 82 return [] 83 result: t.List[exp.Expr] = [] 84 for e in self.expressions_: 85 parsed = e.parse(self.dialect) 86 if not isinstance(parsed, exp.Semicolon): 87 result.append(parsed) 88 return result 89 90 @property 91 def macro_definitions(self) -> t.List[d.MacroDef]: 92 """All macro definitions from the list of expressions.""" 93 return [s for s in self.expressions if isinstance(s, d.MacroDef)] 94 95 96@field_validator("name", "dialect", mode="before", check_fields=False) 97def audit_string_validator(cls: t.Type, v: t.Any) -> t.Optional[str]: 98 if isinstance(v, exp.Expr): 99 return v.name.lower() 100 return str(v).lower() if v is not None else None 101 102 103@field_validator("defaults", mode="before", check_fields=False) 104def audit_map_validator(cls: t.Type, v: t.Any, values: t.Any) -> t.Dict[str, t.Any]: 105 from sqlmesh.utils.pydantic import get_dialect 106 107 if isinstance(v, exp.Paren): 108 return dict([_maybe_parse_arg_pair(v.unnest())]) 109 if isinstance(v, (exp.Tuple, exp.Array)): 110 return dict(map(_maybe_parse_arg_pair, v.expressions)) 111 if isinstance(v, dict): 112 dialect = get_dialect(values) 113 return { 114 key: value if isinstance(value, exp.Expr) else d.parse_one(str(value), dialect=dialect) 115 for key, value in v.items() 116 } 117 raise_config_error("Defaults must be a tuple of exp.EQ or a dict", error_type=AuditConfigError) 118 return {} 119 120 121class ModelAudit(PydanticModel, AuditMixin, DbtInfoMixin, frozen=True): 122 """ 123 Audit is an assertion made about your tables. 124 125 An audit is a SQL query that returns bad records. 126 """ 127 128 name: str 129 dialect: str = "" 130 skip: bool = False 131 blocking: bool = True 132 standalone: t.Literal[False] = False 133 query_: ParsableSql = Field(alias="query") 134 defaults: t.Dict[str, exp.Expr] = {} 135 expressions_: t.Optional[t.List[ParsableSql]] = Field(default=None, alias="expressions") 136 jinja_macros: JinjaMacroRegistry = JinjaMacroRegistry() 137 formatting: t.Optional[bool] = Field(default=None, exclude=True) 138 dbt_node_info_: t.Optional[DbtNodeInfo] = Field(alias="dbt_node_info", default=None) 139 140 _path: t.Optional[Path] = None 141 142 # Validators 143 _query_validator = ParsableSql.validator() 144 _bool_validator = bool_validator 145 _string_validator = audit_string_validator 146 _map_validator = audit_map_validator 147 148 def __str__(self) -> str: 149 path = f": {self._path.name}" if self._path else "" 150 return f"{self.__class__.__name__}<{self.name}{path}>" 151 152 @property 153 def dbt_node_info(self) -> t.Optional[DbtNodeInfo]: 154 return self.dbt_node_info_ 155 156 157class StandaloneAudit(_Node, AuditMixin): 158 """ 159 Args: 160 depends_on: A list of tables this audit depends on. 161 python_env: Dictionary containing all global variables needed to render the audit's macros. 162 """ 163 164 name: str 165 dialect: str = "" 166 skip: bool = False 167 blocking: bool = False 168 standalone: t.Literal[True] = True 169 query_: ParsableSql = Field(alias="query") 170 defaults: t.Dict[str, exp.Expr] = {} 171 expressions_: t.Optional[t.List[ParsableSql]] = Field(default=None, alias="expressions") 172 jinja_macros: JinjaMacroRegistry = JinjaMacroRegistry() 173 default_catalog: t.Optional[str] = None 174 depends_on_: t.Optional[t.Set[str]] = Field(default=None, alias="depends_on") 175 python_env: t.Dict[str, Executable] = {} 176 formatting: t.Optional[bool] = Field(default=None, exclude=True) 177 178 source_type: t.Literal["audit"] = "audit" 179 180 # Validators 181 _query_validator = ParsableSql.validator() 182 _bool_validator = bool_validator 183 _string_validator = audit_string_validator 184 _map_validator = audit_map_validator 185 _default_catalog_validator = default_catalog_validator 186 _depends_on_validator = depends_on_validator 187 188 @model_validator(mode="after") 189 def _node_root_validator(self) -> Self: 190 if self.blocking: 191 raise AuditConfigError(f"Standalone audits cannot be blocking: '{self.name}'.") 192 return self 193 194 def render_audit_query( 195 self, 196 *, 197 start: t.Optional[TimeLike] = None, 198 end: t.Optional[TimeLike] = None, 199 execution_time: t.Optional[TimeLike] = None, 200 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 201 deployability_index: t.Optional[DeployabilityIndex] = None, 202 **kwargs: t.Any, 203 ) -> exp.Query: 204 """Renders the audit's query. 205 206 Args: 207 start: The start datetime to render. Defaults to epoch start. 208 end: The end datetime to render. Defaults to epoch start. 209 execution_time: The date/time time reference to use for execution time. 210 snapshots: All snapshots (by name) to use for mapping of physical locations. 211 deployability_index: Determines snapshots that are deployable in the context of this render. 212 kwargs: Additional kwargs to pass to the renderer. 213 214 Returns: 215 The rendered expression. 216 """ 217 query_renderer = QueryRenderer( 218 self.query, 219 self.dialect, 220 self.macro_definitions, 221 path=self._path or Path(), 222 jinja_macro_registry=self.jinja_macros, 223 python_env=self.python_env, 224 default_catalog=self.default_catalog, 225 ) 226 227 rendered_query = query_renderer.render( 228 start=start, 229 end=end, 230 execution_time=execution_time, 231 snapshots=snapshots, 232 deployability_index=deployability_index, 233 **{**self.defaults, **kwargs}, # type: ignore 234 ) 235 236 if rendered_query is None: 237 raise SQLMeshError(f"Failed to render query for audit '{self.name}'.") 238 239 return rendered_query 240 241 @cached_property 242 def depends_on(self) -> t.Set[str]: 243 depends_on = self.depends_on_ or set() 244 245 query = self.render_audit_query() 246 if query is not None: 247 depends_on |= d.find_tables( 248 query, default_catalog=self.default_catalog, dialect=self.dialect 249 ) 250 251 depends_on -= {self.name} 252 return depends_on 253 254 @property 255 def sorted_python_env(self) -> t.List[t.Tuple[str, Executable]]: 256 """Returns the python env sorted by executable kind and then var name.""" 257 return sort_python_env(self.python_env) 258 259 @property 260 def data_hash(self) -> str: 261 """ 262 Computes the data hash for the node. 263 264 Returns: 265 The data hash for the node. 266 """ 267 # StandaloneAudits do not have a data hash 268 return hash_data("") 269 270 @property 271 def metadata_hash(self) -> str: 272 """ 273 Computes the metadata hash for the node. 274 275 Args: 276 audits: Available audits by name. 277 278 Returns: 279 The metadata hash for the node. 280 """ 281 if self._metadata_hash is None: 282 data = [ 283 self.owner, 284 self.description, 285 *sorted(self.tags), 286 str(self.sorted_python_env), 287 self.stamp, 288 self.cron, 289 self.cron_tz.key if self.cron_tz else None, 290 ] 291 292 data.append(self.query_.sql) 293 data.extend([e.sql for e in self.expressions_ or []]) 294 self._metadata_hash = hash_data(data) 295 return self._metadata_hash 296 297 def text_diff(self, other: Node, rendered: bool = False) -> str: 298 """Produce a text diff against another node. 299 300 Args: 301 other: The node to diff against. 302 rendered: Whether the diff should be between raw vs rendered nodes 303 304 Returns: 305 A unified text diff showing additions and deletions. 306 """ 307 if not isinstance(other, StandaloneAudit): 308 raise SQLMeshError( 309 f"Cannot diff audit '{self.name} against a non-audit node '{other.name}'" 310 ) 311 312 return d.text_diff( 313 self.render_definition(render_query=rendered), 314 other.render_definition(render_query=rendered), 315 self.dialect, 316 other.dialect, 317 ).strip() 318 319 def render_definition( 320 self, 321 include_python: bool = True, 322 include_defaults: bool = False, 323 render_query: bool = False, 324 ) -> t.List[exp.Expr]: 325 """Returns the original list of sql expressions comprising the model definition. 326 327 Args: 328 include_python: Whether or not to include Python code in the rendered definition. 329 """ 330 expressions: t.List[exp.Expr] = [] 331 comment = None 332 for field_name in sorted(self.meta_fields): 333 field_value = getattr(self, field_name) 334 field_info = self.all_field_infos()[field_name] 335 if ( 336 field_name == "standalone" 337 or (include_defaults and field_value) 338 or field_value != field_info.default 339 ): 340 if field_name == "description": 341 comment = field_value 342 else: 343 expression = exp.Property( 344 this=field_info.alias or field_name, 345 value=META_FIELD_CONVERTER.get(field_name, exp.to_identifier)(field_value), 346 ) 347 if field_name == "name": 348 expressions.insert(0, expression) 349 else: 350 expressions.append(expression) 351 352 audit = d.Audit(expressions=expressions) 353 audit.comments = [comment] if comment else None 354 355 jinja_expressions = [] 356 python_expressions = [] 357 if include_python: 358 python_env = d.PythonCode(expressions=sorted_python_env_payloads(self.python_env)) 359 if python_env.expressions: 360 python_expressions.append(python_env) 361 362 jinja_expressions = self.jinja_macros.to_expressions() 363 364 return [ 365 audit, 366 *python_expressions, 367 *jinja_expressions, 368 *self.expressions, 369 self.render_audit_query() if render_query else self.query, 370 ] 371 372 @property 373 def is_audit(self) -> bool: 374 """Return True if this is an audit node""" 375 return True 376 377 @property 378 def meta_fields(self) -> t.Iterable[str]: 379 return set(AuditCommonMetaMixin.__annotations__) | set(_Node.all_field_infos()) 380 381 @property 382 def audits_with_args(self) -> t.List[t.Tuple[Audit, t.Dict[str, exp.Expr]]]: 383 return [(self, {})] 384 385 386Audit = t.Union[ModelAudit, StandaloneAudit] 387 388 389def load_audit( 390 expressions: t.List[exp.Expr], 391 *, 392 path: Path = Path(), 393 module_path: Path = Path(), 394 macros: t.Optional[MacroRegistry] = None, 395 jinja_macros: t.Optional[JinjaMacroRegistry] = None, 396 dialect: t.Optional[str] = None, 397 default_catalog: t.Optional[str] = None, 398 variables: t.Optional[t.Dict[str, t.Any]] = None, 399 project: t.Optional[str] = None, 400) -> Audit: 401 """Load an audit from a parsed SQLMesh audit file. 402 403 Args: 404 expressions: Audit, *Statements, Query 405 path: An optional path of the file. 406 dialect: The default dialect if no audit dialect is configured. 407 """ 408 if len(expressions) < 2: 409 _raise_config_error("Incomplete audit definition, missing AUDIT or QUERY", path) 410 411 meta, *statements, query = expressions 412 413 if not isinstance(meta, d.Audit): 414 _raise_config_error( 415 "AUDIT statement is required as the first statement in the definition", 416 path, 417 ) 418 raise 419 420 meta_fields = {p.name: p.args.get("value") for p in meta.expressions if p} 421 422 standalone_field = meta_fields.pop("standalone", None) 423 if standalone_field and not isinstance(standalone_field, exp.Boolean): 424 _raise_config_error( 425 f"""Standalone must be a boolean for '{meta_fields.get("name")}'""", 426 path, 427 ) 428 raise 429 is_standalone = standalone_field and standalone_field.this 430 431 audit_class: t.Union[t.Type[StandaloneAudit], t.Type[ModelAudit]] = ( 432 StandaloneAudit if is_standalone else ModelAudit 433 ) 434 435 missing_required_fields = audit_class.missing_required_fields(set(meta_fields)) 436 missing_required_fields -= {"query"} 437 if missing_required_fields: 438 _raise_config_error( 439 f"Missing required fields {missing_required_fields} in the audit definition", 440 path, 441 ) 442 443 extra_fields = audit_class.extra_fields(set(meta_fields)) 444 445 if extra_fields: 446 _raise_config_error(f"Invalid extra fields {extra_fields} in the audit definition", path) 447 448 if not isinstance(query, exp.Query) and not isinstance(query, d.JinjaQuery): 449 _raise_config_error("Missing SELECT query in the audit definition", path) 450 raise 451 452 extra_kwargs: t.Dict[str, t.Any] = {} 453 if is_standalone: 454 jinja_macro_refrences, referenced_variables = extract_macro_references_and_variables( 455 *(gen(s) for s in statements), 456 gen(query), 457 ) 458 jinja_macros = (jinja_macros or JinjaMacroRegistry()).trim(jinja_macro_refrences) 459 for jinja_macro in jinja_macros.root_macros.values(): 460 referenced_variables.update( 461 extract_macro_references_and_variables(jinja_macro.definition)[1] 462 ) 463 464 extra_kwargs["jinja_macros"] = jinja_macros 465 extra_kwargs["python_env"] = make_python_env( 466 [*statements, query], 467 jinja_macro_refrences, 468 module_path, 469 macros or macro.get_registry(), 470 variables=variables, 471 referenced_variables=referenced_variables, 472 ) 473 extra_kwargs["default_catalog"] = default_catalog 474 if project is not None: 475 extra_kwargs["project"] = project 476 477 dialect = meta_fields.pop("dialect", dialect) or "" 478 479 parsable_query = ParsableSql.from_parsed_expression(query, dialect, use_meta_sql=True) 480 parsable_statements = [ 481 ParsableSql.from_parsed_expression(s, dialect, use_meta_sql=True) for s in statements 482 ] 483 484 try: 485 audit = audit_class( 486 query=parsable_query, 487 expressions=parsable_statements, 488 dialect=dialect, 489 **extra_kwargs, 490 **meta_fields, 491 ) 492 except Exception as ex: 493 _raise_config_error(str(ex), path) 494 495 audit._path = path 496 return audit 497 498 499def load_multiple_audits( 500 expressions: t.List[exp.Expr], 501 *, 502 path: Path = Path(), 503 module_path: Path = Path(), 504 macros: t.Optional[MacroRegistry] = None, 505 jinja_macros: t.Optional[JinjaMacroRegistry] = None, 506 dialect: t.Optional[str] = None, 507 default_catalog: t.Optional[str] = None, 508 variables: t.Optional[t.Dict[str, t.Any]] = None, 509 project: t.Optional[str] = None, 510) -> t.Generator[Audit, None, None]: 511 audit_block: t.List[exp.Expr] = [] 512 for expression in expressions: 513 if isinstance(expression, d.Audit): 514 if audit_block: 515 yield load_audit( 516 expressions=audit_block, 517 path=path, 518 module_path=module_path, 519 macros=macros, 520 jinja_macros=jinja_macros, 521 dialect=dialect, 522 default_catalog=default_catalog, 523 variables=variables, 524 project=project, 525 ) 526 audit_block.clear() 527 audit_block.append(expression) 528 yield load_audit( 529 expressions=audit_block, 530 path=path, 531 dialect=dialect, 532 default_catalog=default_catalog, 533 variables=variables, 534 project=project, 535 ) 536 537 538def _raise_config_error(msg: str, path: pathlib.Path) -> None: 539 raise_config_error(msg, location=path, error_type=AuditConfigError) 540 541 542# mypy doesn't realize raise_config_error raises an exception 543@t.no_type_check 544def _maybe_parse_arg_pair(e: exp.Expr) -> t.Tuple[str, exp.Expr]: 545 if isinstance(e, exp.EQ): 546 return e.left.name, e.right 547 548 549META_FIELD_CONVERTER: t.Dict[str, t.Callable] = { 550 "start": lambda value: exp.Literal.string(value), 551 "cron": lambda value: exp.Literal.string(value), 552 "skip": exp.convert, 553 "blocking": exp.convert, 554 "standalone": exp.convert, 555 "depends_on_": lambda value: exp.Tuple(expressions=sorted(value)), 556 "tags": single_value_or_tuple, 557 "default_catalog": exp.to_identifier, 558 "dbt_node_info_": lambda value: value.to_expression(), 559}
40class AuditCommonMetaMixin: 41 """ 42 Metadata for audits which can be defined in SQL. 43 44 Args: 45 name: The unique name of the audit. 46 dialect: The dialect of the audit query. 47 skip: Setting this to `true` will cause this audit to be skipped. Defaults to `false`. 48 blocking: Setting this to `true` will cause the pipeline execution to stop if this audit fails. 49 standalone: Setting this to `true` will cause this audit to be executed as a standalone audit. 50 """ 51 52 name: str 53 dialect: str 54 skip: bool 55 blocking: bool 56 standalone: bool
Metadata for audits which can be defined in SQL.
Arguments:
- name: The unique name of the audit.
- dialect: The dialect of the audit query.
- skip: Setting this to
truewill cause this audit to be skipped. Defaults tofalse. - blocking: Setting this to
truewill cause the pipeline execution to stop if this audit fails. - standalone: Setting this to
truewill cause this audit to be executed as a standalone audit.
59class AuditMixin(AuditCommonMetaMixin): 60 """ 61 Mixin for common Audit functionality 62 63 Args: 64 query: The audit query. 65 defaults: Default values for the audit query. 66 expressions: Additional sql statements to execute alongside the audit. 67 jinja_macros: A registry of jinja macros to use when rendering the audit query. 68 """ 69 70 query_: ParsableSql 71 defaults: t.Dict[str, exp.Expr] 72 expressions_: t.Optional[t.List[ParsableSql]] 73 jinja_macros: JinjaMacroRegistry 74 formatting: t.Optional[bool] 75 76 @property 77 def query(self) -> t.Union[exp.Query, d.JinjaQuery]: 78 return t.cast(t.Union[exp.Query, d.JinjaQuery], self.query_.parse(self.dialect)) 79 80 @property 81 def expressions(self) -> t.List[exp.Expr]: 82 if not self.expressions_: 83 return [] 84 result: t.List[exp.Expr] = [] 85 for e in self.expressions_: 86 parsed = e.parse(self.dialect) 87 if not isinstance(parsed, exp.Semicolon): 88 result.append(parsed) 89 return result 90 91 @property 92 def macro_definitions(self) -> t.List[d.MacroDef]: 93 """All macro definitions from the list of expressions.""" 94 return [s for s in self.expressions if isinstance(s, d.MacroDef)]
Mixin for common Audit functionality
Arguments:
- query: The audit query.
- defaults: Default values for the audit query.
- expressions: Additional sql statements to execute alongside the audit.
- jinja_macros: A registry of jinja macros to use when rendering the audit query.
91 @property 92 def macro_definitions(self) -> t.List[d.MacroDef]: 93 """All macro definitions from the list of expressions.""" 94 return [s for s in self.expressions if isinstance(s, d.MacroDef)]
All macro definitions from the list of expressions.
Inherited Members
122class ModelAudit(PydanticModel, AuditMixin, DbtInfoMixin, frozen=True): 123 """ 124 Audit is an assertion made about your tables. 125 126 An audit is a SQL query that returns bad records. 127 """ 128 129 name: str 130 dialect: str = "" 131 skip: bool = False 132 blocking: bool = True 133 standalone: t.Literal[False] = False 134 query_: ParsableSql = Field(alias="query") 135 defaults: t.Dict[str, exp.Expr] = {} 136 expressions_: t.Optional[t.List[ParsableSql]] = Field(default=None, alias="expressions") 137 jinja_macros: JinjaMacroRegistry = JinjaMacroRegistry() 138 formatting: t.Optional[bool] = Field(default=None, exclude=True) 139 dbt_node_info_: t.Optional[DbtNodeInfo] = Field(alias="dbt_node_info", default=None) 140 141 _path: t.Optional[Path] = None 142 143 # Validators 144 _query_validator = ParsableSql.validator() 145 _bool_validator = bool_validator 146 _string_validator = audit_string_validator 147 _map_validator = audit_map_validator 148 149 def __str__(self) -> str: 150 path = f": {self._path.name}" if self._path else "" 151 return f"{self.__class__.__name__}<{self.name}{path}>" 152 153 @property 154 def dbt_node_info(self) -> t.Optional[DbtNodeInfo]: 155 return self.dbt_node_info_
Audit is an assertion made about your tables.
An audit is a SQL query that returns bad records.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
358def init_private_attributes(self: BaseModel, context: Any, /) -> None: 359 """This function is meant to behave like a BaseModel method to initialise private attributes. 360 361 It takes context as an argument since that's what pydantic-core passes when calling it. 362 363 Args: 364 self: The BaseModel instance. 365 context: The context. 366 """ 367 if getattr(self, '__pydantic_private__', None) is None: 368 pydantic_private = {} 369 for name, private_attr in self.__private_attributes__.items(): 370 default = private_attr.get_default() 371 if default is not PydanticUndefined: 372 pydantic_private[name] = default 373 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
158class StandaloneAudit(_Node, AuditMixin): 159 """ 160 Args: 161 depends_on: A list of tables this audit depends on. 162 python_env: Dictionary containing all global variables needed to render the audit's macros. 163 """ 164 165 name: str 166 dialect: str = "" 167 skip: bool = False 168 blocking: bool = False 169 standalone: t.Literal[True] = True 170 query_: ParsableSql = Field(alias="query") 171 defaults: t.Dict[str, exp.Expr] = {} 172 expressions_: t.Optional[t.List[ParsableSql]] = Field(default=None, alias="expressions") 173 jinja_macros: JinjaMacroRegistry = JinjaMacroRegistry() 174 default_catalog: t.Optional[str] = None 175 depends_on_: t.Optional[t.Set[str]] = Field(default=None, alias="depends_on") 176 python_env: t.Dict[str, Executable] = {} 177 formatting: t.Optional[bool] = Field(default=None, exclude=True) 178 179 source_type: t.Literal["audit"] = "audit" 180 181 # Validators 182 _query_validator = ParsableSql.validator() 183 _bool_validator = bool_validator 184 _string_validator = audit_string_validator 185 _map_validator = audit_map_validator 186 _default_catalog_validator = default_catalog_validator 187 _depends_on_validator = depends_on_validator 188 189 @model_validator(mode="after") 190 def _node_root_validator(self) -> Self: 191 if self.blocking: 192 raise AuditConfigError(f"Standalone audits cannot be blocking: '{self.name}'.") 193 return self 194 195 def render_audit_query( 196 self, 197 *, 198 start: t.Optional[TimeLike] = None, 199 end: t.Optional[TimeLike] = None, 200 execution_time: t.Optional[TimeLike] = None, 201 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 202 deployability_index: t.Optional[DeployabilityIndex] = None, 203 **kwargs: t.Any, 204 ) -> exp.Query: 205 """Renders the audit's query. 206 207 Args: 208 start: The start datetime to render. Defaults to epoch start. 209 end: The end datetime to render. Defaults to epoch start. 210 execution_time: The date/time time reference to use for execution time. 211 snapshots: All snapshots (by name) to use for mapping of physical locations. 212 deployability_index: Determines snapshots that are deployable in the context of this render. 213 kwargs: Additional kwargs to pass to the renderer. 214 215 Returns: 216 The rendered expression. 217 """ 218 query_renderer = QueryRenderer( 219 self.query, 220 self.dialect, 221 self.macro_definitions, 222 path=self._path or Path(), 223 jinja_macro_registry=self.jinja_macros, 224 python_env=self.python_env, 225 default_catalog=self.default_catalog, 226 ) 227 228 rendered_query = query_renderer.render( 229 start=start, 230 end=end, 231 execution_time=execution_time, 232 snapshots=snapshots, 233 deployability_index=deployability_index, 234 **{**self.defaults, **kwargs}, # type: ignore 235 ) 236 237 if rendered_query is None: 238 raise SQLMeshError(f"Failed to render query for audit '{self.name}'.") 239 240 return rendered_query 241 242 @cached_property 243 def depends_on(self) -> t.Set[str]: 244 depends_on = self.depends_on_ or set() 245 246 query = self.render_audit_query() 247 if query is not None: 248 depends_on |= d.find_tables( 249 query, default_catalog=self.default_catalog, dialect=self.dialect 250 ) 251 252 depends_on -= {self.name} 253 return depends_on 254 255 @property 256 def sorted_python_env(self) -> t.List[t.Tuple[str, Executable]]: 257 """Returns the python env sorted by executable kind and then var name.""" 258 return sort_python_env(self.python_env) 259 260 @property 261 def data_hash(self) -> str: 262 """ 263 Computes the data hash for the node. 264 265 Returns: 266 The data hash for the node. 267 """ 268 # StandaloneAudits do not have a data hash 269 return hash_data("") 270 271 @property 272 def metadata_hash(self) -> str: 273 """ 274 Computes the metadata hash for the node. 275 276 Args: 277 audits: Available audits by name. 278 279 Returns: 280 The metadata hash for the node. 281 """ 282 if self._metadata_hash is None: 283 data = [ 284 self.owner, 285 self.description, 286 *sorted(self.tags), 287 str(self.sorted_python_env), 288 self.stamp, 289 self.cron, 290 self.cron_tz.key if self.cron_tz else None, 291 ] 292 293 data.append(self.query_.sql) 294 data.extend([e.sql for e in self.expressions_ or []]) 295 self._metadata_hash = hash_data(data) 296 return self._metadata_hash 297 298 def text_diff(self, other: Node, rendered: bool = False) -> str: 299 """Produce a text diff against another node. 300 301 Args: 302 other: The node to diff against. 303 rendered: Whether the diff should be between raw vs rendered nodes 304 305 Returns: 306 A unified text diff showing additions and deletions. 307 """ 308 if not isinstance(other, StandaloneAudit): 309 raise SQLMeshError( 310 f"Cannot diff audit '{self.name} against a non-audit node '{other.name}'" 311 ) 312 313 return d.text_diff( 314 self.render_definition(render_query=rendered), 315 other.render_definition(render_query=rendered), 316 self.dialect, 317 other.dialect, 318 ).strip() 319 320 def render_definition( 321 self, 322 include_python: bool = True, 323 include_defaults: bool = False, 324 render_query: bool = False, 325 ) -> t.List[exp.Expr]: 326 """Returns the original list of sql expressions comprising the model definition. 327 328 Args: 329 include_python: Whether or not to include Python code in the rendered definition. 330 """ 331 expressions: t.List[exp.Expr] = [] 332 comment = None 333 for field_name in sorted(self.meta_fields): 334 field_value = getattr(self, field_name) 335 field_info = self.all_field_infos()[field_name] 336 if ( 337 field_name == "standalone" 338 or (include_defaults and field_value) 339 or field_value != field_info.default 340 ): 341 if field_name == "description": 342 comment = field_value 343 else: 344 expression = exp.Property( 345 this=field_info.alias or field_name, 346 value=META_FIELD_CONVERTER.get(field_name, exp.to_identifier)(field_value), 347 ) 348 if field_name == "name": 349 expressions.insert(0, expression) 350 else: 351 expressions.append(expression) 352 353 audit = d.Audit(expressions=expressions) 354 audit.comments = [comment] if comment else None 355 356 jinja_expressions = [] 357 python_expressions = [] 358 if include_python: 359 python_env = d.PythonCode(expressions=sorted_python_env_payloads(self.python_env)) 360 if python_env.expressions: 361 python_expressions.append(python_env) 362 363 jinja_expressions = self.jinja_macros.to_expressions() 364 365 return [ 366 audit, 367 *python_expressions, 368 *jinja_expressions, 369 *self.expressions, 370 self.render_audit_query() if render_query else self.query, 371 ] 372 373 @property 374 def is_audit(self) -> bool: 375 """Return True if this is an audit node""" 376 return True 377 378 @property 379 def meta_fields(self) -> t.Iterable[str]: 380 return set(AuditCommonMetaMixin.__annotations__) | set(_Node.all_field_infos()) 381 382 @property 383 def audits_with_args(self) -> t.List[t.Tuple[Audit, t.Dict[str, exp.Expr]]]: 384 return [(self, {})]
Arguments:
- depends_on: A list of tables this audit depends on.
- python_env: Dictionary containing all global variables needed to render the audit's macros.
195 def render_audit_query( 196 self, 197 *, 198 start: t.Optional[TimeLike] = None, 199 end: t.Optional[TimeLike] = None, 200 execution_time: t.Optional[TimeLike] = None, 201 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 202 deployability_index: t.Optional[DeployabilityIndex] = None, 203 **kwargs: t.Any, 204 ) -> exp.Query: 205 """Renders the audit's query. 206 207 Args: 208 start: The start datetime to render. Defaults to epoch start. 209 end: The end datetime to render. Defaults to epoch start. 210 execution_time: The date/time time reference to use for execution time. 211 snapshots: All snapshots (by name) to use for mapping of physical locations. 212 deployability_index: Determines snapshots that are deployable in the context of this render. 213 kwargs: Additional kwargs to pass to the renderer. 214 215 Returns: 216 The rendered expression. 217 """ 218 query_renderer = QueryRenderer( 219 self.query, 220 self.dialect, 221 self.macro_definitions, 222 path=self._path or Path(), 223 jinja_macro_registry=self.jinja_macros, 224 python_env=self.python_env, 225 default_catalog=self.default_catalog, 226 ) 227 228 rendered_query = query_renderer.render( 229 start=start, 230 end=end, 231 execution_time=execution_time, 232 snapshots=snapshots, 233 deployability_index=deployability_index, 234 **{**self.defaults, **kwargs}, # type: ignore 235 ) 236 237 if rendered_query is None: 238 raise SQLMeshError(f"Failed to render query for audit '{self.name}'.") 239 240 return rendered_query
Renders the audit's query.
Arguments:
- start: The start datetime to render. Defaults to epoch start.
- end: The end datetime to render. Defaults to epoch start.
- execution_time: The date/time time reference to use for execution time.
- snapshots: All snapshots (by name) to use for mapping of physical locations.
- deployability_index: Determines snapshots that are deployable in the context of this render.
- kwargs: Additional kwargs to pass to the renderer.
Returns:
The rendered expression.
242 @cached_property 243 def depends_on(self) -> t.Set[str]: 244 depends_on = self.depends_on_ or set() 245 246 query = self.render_audit_query() 247 if query is not None: 248 depends_on |= d.find_tables( 249 query, default_catalog=self.default_catalog, dialect=self.dialect 250 ) 251 252 depends_on -= {self.name} 253 return depends_on
255 @property 256 def sorted_python_env(self) -> t.List[t.Tuple[str, Executable]]: 257 """Returns the python env sorted by executable kind and then var name.""" 258 return sort_python_env(self.python_env)
Returns the python env sorted by executable kind and then var name.
260 @property 261 def data_hash(self) -> str: 262 """ 263 Computes the data hash for the node. 264 265 Returns: 266 The data hash for the node. 267 """ 268 # StandaloneAudits do not have a data hash 269 return hash_data("")
Computes the data hash for the node.
Returns:
The data hash for the node.
271 @property 272 def metadata_hash(self) -> str: 273 """ 274 Computes the metadata hash for the node. 275 276 Args: 277 audits: Available audits by name. 278 279 Returns: 280 The metadata hash for the node. 281 """ 282 if self._metadata_hash is None: 283 data = [ 284 self.owner, 285 self.description, 286 *sorted(self.tags), 287 str(self.sorted_python_env), 288 self.stamp, 289 self.cron, 290 self.cron_tz.key if self.cron_tz else None, 291 ] 292 293 data.append(self.query_.sql) 294 data.extend([e.sql for e in self.expressions_ or []]) 295 self._metadata_hash = hash_data(data) 296 return self._metadata_hash
Computes the metadata hash for the node.
Arguments:
- audits: Available audits by name.
Returns:
The metadata hash for the node.
298 def text_diff(self, other: Node, rendered: bool = False) -> str: 299 """Produce a text diff against another node. 300 301 Args: 302 other: The node to diff against. 303 rendered: Whether the diff should be between raw vs rendered nodes 304 305 Returns: 306 A unified text diff showing additions and deletions. 307 """ 308 if not isinstance(other, StandaloneAudit): 309 raise SQLMeshError( 310 f"Cannot diff audit '{self.name} against a non-audit node '{other.name}'" 311 ) 312 313 return d.text_diff( 314 self.render_definition(render_query=rendered), 315 other.render_definition(render_query=rendered), 316 self.dialect, 317 other.dialect, 318 ).strip()
Produce a text diff against another node.
Arguments:
- other: The node to diff against.
- rendered: Whether the diff should be between raw vs rendered nodes
Returns:
A unified text diff showing additions and deletions.
320 def render_definition( 321 self, 322 include_python: bool = True, 323 include_defaults: bool = False, 324 render_query: bool = False, 325 ) -> t.List[exp.Expr]: 326 """Returns the original list of sql expressions comprising the model definition. 327 328 Args: 329 include_python: Whether or not to include Python code in the rendered definition. 330 """ 331 expressions: t.List[exp.Expr] = [] 332 comment = None 333 for field_name in sorted(self.meta_fields): 334 field_value = getattr(self, field_name) 335 field_info = self.all_field_infos()[field_name] 336 if ( 337 field_name == "standalone" 338 or (include_defaults and field_value) 339 or field_value != field_info.default 340 ): 341 if field_name == "description": 342 comment = field_value 343 else: 344 expression = exp.Property( 345 this=field_info.alias or field_name, 346 value=META_FIELD_CONVERTER.get(field_name, exp.to_identifier)(field_value), 347 ) 348 if field_name == "name": 349 expressions.insert(0, expression) 350 else: 351 expressions.append(expression) 352 353 audit = d.Audit(expressions=expressions) 354 audit.comments = [comment] if comment else None 355 356 jinja_expressions = [] 357 python_expressions = [] 358 if include_python: 359 python_env = d.PythonCode(expressions=sorted_python_env_payloads(self.python_env)) 360 if python_env.expressions: 361 python_expressions.append(python_env) 362 363 jinja_expressions = self.jinja_macros.to_expressions() 364 365 return [ 366 audit, 367 *python_expressions, 368 *jinja_expressions, 369 *self.expressions, 370 self.render_audit_query() if render_query else self.query, 371 ]
Returns the original list of sql expressions comprising the model definition.
Arguments:
- include_python: Whether or not to include Python code in the rendered definition.
373 @property 374 def is_audit(self) -> bool: 375 """Return True if this is an audit node""" 376 return True
Return True if this is an audit node
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
358def init_private_attributes(self: BaseModel, context: Any, /) -> None: 359 """This function is meant to behave like a BaseModel method to initialise private attributes. 360 361 It takes context as an argument since that's what pydantic-core passes when calling it. 362 363 Args: 364 self: The BaseModel instance. 365 context: The context. 366 """ 367 if getattr(self, '__pydantic_private__', None) is None: 368 pydantic_private = {} 369 for name, private_attr in self.__private_attributes__.items(): 370 default = private_attr.get_default() 371 if default is not PydanticUndefined: 372 pydantic_private[name] = default 373 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
- sqlmesh.core.node._Node
- project
- description
- owner
- start
- end
- cron
- cron_tz
- interval_unit_
- stamp
- dbt_node_info_
- copy
- batch_size
- batch_concurrency
- interval_unit
- fqn
- is_metadata_only_change
- is_data_change
- croniter
- cron_next
- cron_prev
- cron_floor
- is_model
- dbt_node_info
390def load_audit( 391 expressions: t.List[exp.Expr], 392 *, 393 path: Path = Path(), 394 module_path: Path = Path(), 395 macros: t.Optional[MacroRegistry] = None, 396 jinja_macros: t.Optional[JinjaMacroRegistry] = None, 397 dialect: t.Optional[str] = None, 398 default_catalog: t.Optional[str] = None, 399 variables: t.Optional[t.Dict[str, t.Any]] = None, 400 project: t.Optional[str] = None, 401) -> Audit: 402 """Load an audit from a parsed SQLMesh audit file. 403 404 Args: 405 expressions: Audit, *Statements, Query 406 path: An optional path of the file. 407 dialect: The default dialect if no audit dialect is configured. 408 """ 409 if len(expressions) < 2: 410 _raise_config_error("Incomplete audit definition, missing AUDIT or QUERY", path) 411 412 meta, *statements, query = expressions 413 414 if not isinstance(meta, d.Audit): 415 _raise_config_error( 416 "AUDIT statement is required as the first statement in the definition", 417 path, 418 ) 419 raise 420 421 meta_fields = {p.name: p.args.get("value") for p in meta.expressions if p} 422 423 standalone_field = meta_fields.pop("standalone", None) 424 if standalone_field and not isinstance(standalone_field, exp.Boolean): 425 _raise_config_error( 426 f"""Standalone must be a boolean for '{meta_fields.get("name")}'""", 427 path, 428 ) 429 raise 430 is_standalone = standalone_field and standalone_field.this 431 432 audit_class: t.Union[t.Type[StandaloneAudit], t.Type[ModelAudit]] = ( 433 StandaloneAudit if is_standalone else ModelAudit 434 ) 435 436 missing_required_fields = audit_class.missing_required_fields(set(meta_fields)) 437 missing_required_fields -= {"query"} 438 if missing_required_fields: 439 _raise_config_error( 440 f"Missing required fields {missing_required_fields} in the audit definition", 441 path, 442 ) 443 444 extra_fields = audit_class.extra_fields(set(meta_fields)) 445 446 if extra_fields: 447 _raise_config_error(f"Invalid extra fields {extra_fields} in the audit definition", path) 448 449 if not isinstance(query, exp.Query) and not isinstance(query, d.JinjaQuery): 450 _raise_config_error("Missing SELECT query in the audit definition", path) 451 raise 452 453 extra_kwargs: t.Dict[str, t.Any] = {} 454 if is_standalone: 455 jinja_macro_refrences, referenced_variables = extract_macro_references_and_variables( 456 *(gen(s) for s in statements), 457 gen(query), 458 ) 459 jinja_macros = (jinja_macros or JinjaMacroRegistry()).trim(jinja_macro_refrences) 460 for jinja_macro in jinja_macros.root_macros.values(): 461 referenced_variables.update( 462 extract_macro_references_and_variables(jinja_macro.definition)[1] 463 ) 464 465 extra_kwargs["jinja_macros"] = jinja_macros 466 extra_kwargs["python_env"] = make_python_env( 467 [*statements, query], 468 jinja_macro_refrences, 469 module_path, 470 macros or macro.get_registry(), 471 variables=variables, 472 referenced_variables=referenced_variables, 473 ) 474 extra_kwargs["default_catalog"] = default_catalog 475 if project is not None: 476 extra_kwargs["project"] = project 477 478 dialect = meta_fields.pop("dialect", dialect) or "" 479 480 parsable_query = ParsableSql.from_parsed_expression(query, dialect, use_meta_sql=True) 481 parsable_statements = [ 482 ParsableSql.from_parsed_expression(s, dialect, use_meta_sql=True) for s in statements 483 ] 484 485 try: 486 audit = audit_class( 487 query=parsable_query, 488 expressions=parsable_statements, 489 dialect=dialect, 490 **extra_kwargs, 491 **meta_fields, 492 ) 493 except Exception as ex: 494 _raise_config_error(str(ex), path) 495 496 audit._path = path 497 return audit
Load an audit from a parsed SQLMesh audit file.
Arguments:
- expressions: Audit, *Statements, Query
- path: An optional path of the file.
- dialect: The default dialect if no audit dialect is configured.
500def load_multiple_audits( 501 expressions: t.List[exp.Expr], 502 *, 503 path: Path = Path(), 504 module_path: Path = Path(), 505 macros: t.Optional[MacroRegistry] = None, 506 jinja_macros: t.Optional[JinjaMacroRegistry] = None, 507 dialect: t.Optional[str] = None, 508 default_catalog: t.Optional[str] = None, 509 variables: t.Optional[t.Dict[str, t.Any]] = None, 510 project: t.Optional[str] = None, 511) -> t.Generator[Audit, None, None]: 512 audit_block: t.List[exp.Expr] = [] 513 for expression in expressions: 514 if isinstance(expression, d.Audit): 515 if audit_block: 516 yield load_audit( 517 expressions=audit_block, 518 path=path, 519 module_path=module_path, 520 macros=macros, 521 jinja_macros=jinja_macros, 522 dialect=dialect, 523 default_catalog=default_catalog, 524 variables=variables, 525 project=project, 526 ) 527 audit_block.clear() 528 audit_block.append(expression) 529 yield load_audit( 530 expressions=audit_block, 531 path=path, 532 dialect=dialect, 533 default_catalog=default_catalog, 534 variables=variables, 535 project=project, 536 )