sqlmesh.core.audit.definition
1from __future__ import annotations 2 3import pathlib 4import typing as t 5from functools import cached_property 6from pathlib import Path 7 8from pydantic import Field 9from sqlglot import exp 10from sqlglot.optimizer.simplify import gen 11 12from sqlmesh.core import dialect as d 13from sqlmesh.core.macros import MacroRegistry, macro 14from sqlmesh.core.model.common import ( 15 bool_validator, 16 default_catalog_validator, 17 depends_on_validator, 18 sort_python_env, 19 sorted_python_env_payloads, 20) 21from sqlmesh.core.model.common import make_python_env, single_value_or_tuple, ParsableSql 22from sqlmesh.core.node import _Node, DbtInfoMixin, DbtNodeInfo 23from sqlmesh.core.renderer import QueryRenderer 24from sqlmesh.utils.date import TimeLike 25from sqlmesh.utils.errors import AuditConfigError, SQLMeshError, raise_config_error 26from sqlmesh.utils.hashing import hash_data 27from sqlmesh.utils.jinja import ( 28 JinjaMacroRegistry, 29 extract_macro_references_and_variables, 30) 31from sqlmesh.utils.metaprogramming import Executable 32from sqlmesh.utils.pydantic import PydanticModel, field_validator, model_validator 33 34if t.TYPE_CHECKING: 35 from sqlmesh.core._typing import Self 36 from sqlmesh.core.snapshot import DeployabilityIndex, Node, Snapshot 37 38 39class AuditCommonMetaMixin: 40 """ 41 Metadata for audits which can be defined in SQL. 42 43 Args: 44 name: The unique name of the audit. 45 dialect: The dialect of the audit query. 46 skip: Setting this to `true` will cause this audit to be skipped. Defaults to `false`. 47 blocking: Setting this to `true` will cause the pipeline execution to stop if this audit fails. 48 standalone: Setting this to `true` will cause this audit to be executed as a standalone audit. 49 """ 50 51 name: str 52 dialect: str 53 skip: bool 54 blocking: bool 55 standalone: bool 56 57 58class AuditMixin(AuditCommonMetaMixin): 59 """ 60 Mixin for common Audit functionality 61 62 Args: 63 query: The audit query. 64 defaults: Default values for the audit query. 65 expressions: Additional sql statements to execute alongside the audit. 66 jinja_macros: A registry of jinja macros to use when rendering the audit query. 67 """ 68 69 query_: ParsableSql 70 defaults: t.Dict[str, exp.Expression] 71 expressions_: t.Optional[t.List[ParsableSql]] 72 jinja_macros: JinjaMacroRegistry 73 formatting: t.Optional[bool] 74 75 @property 76 def query(self) -> t.Union[exp.Query, d.JinjaQuery]: 77 return t.cast(t.Union[exp.Query, d.JinjaQuery], self.query_.parse(self.dialect)) 78 79 @property 80 def expressions(self) -> t.List[exp.Expression]: 81 if not self.expressions_: 82 return [] 83 result = [] 84 for e in self.expressions_: 85 parsed = e.parse(self.dialect) 86 if not isinstance(parsed, exp.Semicolon): 87 result.append(parsed) 88 return result 89 90 @property 91 def macro_definitions(self) -> t.List[d.MacroDef]: 92 """All macro definitions from the list of expressions.""" 93 return [s for s in self.expressions if isinstance(s, d.MacroDef)] 94 95 96@field_validator("name", "dialect", mode="before", check_fields=False) 97def audit_string_validator(cls: t.Type, v: t.Any) -> t.Optional[str]: 98 if isinstance(v, exp.Expression): 99 return v.name.lower() 100 return str(v).lower() if v is not None else None 101 102 103@field_validator("defaults", mode="before", check_fields=False) 104def audit_map_validator(cls: t.Type, v: t.Any, values: t.Any) -> t.Dict[str, t.Any]: 105 from sqlmesh.utils.pydantic import get_dialect 106 107 if isinstance(v, exp.Paren): 108 return dict([_maybe_parse_arg_pair(v.unnest())]) 109 if isinstance(v, (exp.Tuple, exp.Array)): 110 return dict(map(_maybe_parse_arg_pair, v.expressions)) 111 if isinstance(v, dict): 112 dialect = get_dialect(values) 113 return { 114 key: value 115 if isinstance(value, exp.Expression) 116 else d.parse_one(str(value), dialect=dialect) 117 for key, value in v.items() 118 } 119 raise_config_error("Defaults must be a tuple of exp.EQ or a dict", error_type=AuditConfigError) 120 return {} 121 122 123class ModelAudit(PydanticModel, AuditMixin, DbtInfoMixin, frozen=True): 124 """ 125 Audit is an assertion made about your tables. 126 127 An audit is a SQL query that returns bad records. 128 """ 129 130 name: str 131 dialect: str = "" 132 skip: bool = False 133 blocking: bool = True 134 standalone: t.Literal[False] = False 135 query_: ParsableSql = Field(alias="query") 136 defaults: t.Dict[str, exp.Expression] = {} 137 expressions_: t.Optional[t.List[ParsableSql]] = Field(default=None, alias="expressions") 138 jinja_macros: JinjaMacroRegistry = JinjaMacroRegistry() 139 formatting: t.Optional[bool] = Field(default=None, exclude=True) 140 dbt_node_info_: t.Optional[DbtNodeInfo] = Field(alias="dbt_node_info", default=None) 141 142 _path: t.Optional[Path] = None 143 144 # Validators 145 _query_validator = ParsableSql.validator() 146 _bool_validator = bool_validator 147 _string_validator = audit_string_validator 148 _map_validator = audit_map_validator 149 150 def __str__(self) -> str: 151 path = f": {self._path.name}" if self._path else "" 152 return f"{self.__class__.__name__}<{self.name}{path}>" 153 154 @property 155 def dbt_node_info(self) -> t.Optional[DbtNodeInfo]: 156 return self.dbt_node_info_ 157 158 159class StandaloneAudit(_Node, AuditMixin): 160 """ 161 Args: 162 depends_on: A list of tables this audit depends on. 163 python_env: Dictionary containing all global variables needed to render the audit's macros. 164 """ 165 166 name: str 167 dialect: str = "" 168 skip: bool = False 169 blocking: bool = False 170 standalone: t.Literal[True] = True 171 query_: ParsableSql = Field(alias="query") 172 defaults: t.Dict[str, exp.Expression] = {} 173 expressions_: t.Optional[t.List[ParsableSql]] = Field(default=None, alias="expressions") 174 jinja_macros: JinjaMacroRegistry = JinjaMacroRegistry() 175 default_catalog: t.Optional[str] = None 176 depends_on_: t.Optional[t.Set[str]] = Field(default=None, alias="depends_on") 177 python_env: t.Dict[str, Executable] = {} 178 formatting: t.Optional[bool] = Field(default=None, exclude=True) 179 180 source_type: t.Literal["audit"] = "audit" 181 182 # Validators 183 _query_validator = ParsableSql.validator() 184 _bool_validator = bool_validator 185 _string_validator = audit_string_validator 186 _map_validator = audit_map_validator 187 _default_catalog_validator = default_catalog_validator 188 _depends_on_validator = depends_on_validator 189 190 @model_validator(mode="after") 191 def _node_root_validator(self) -> Self: 192 if self.blocking: 193 raise AuditConfigError(f"Standalone audits cannot be blocking: '{self.name}'.") 194 return self 195 196 def render_audit_query( 197 self, 198 *, 199 start: t.Optional[TimeLike] = None, 200 end: t.Optional[TimeLike] = None, 201 execution_time: t.Optional[TimeLike] = None, 202 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 203 deployability_index: t.Optional[DeployabilityIndex] = None, 204 **kwargs: t.Any, 205 ) -> exp.Query: 206 """Renders the audit's query. 207 208 Args: 209 start: The start datetime to render. Defaults to epoch start. 210 end: The end datetime to render. Defaults to epoch start. 211 execution_time: The date/time time reference to use for execution time. 212 snapshots: All snapshots (by name) to use for mapping of physical locations. 213 deployability_index: Determines snapshots that are deployable in the context of this render. 214 kwargs: Additional kwargs to pass to the renderer. 215 216 Returns: 217 The rendered expression. 218 """ 219 query_renderer = QueryRenderer( 220 self.query, 221 self.dialect, 222 self.macro_definitions, 223 path=self._path or Path(), 224 jinja_macro_registry=self.jinja_macros, 225 python_env=self.python_env, 226 default_catalog=self.default_catalog, 227 ) 228 229 rendered_query = query_renderer.render( 230 start=start, 231 end=end, 232 execution_time=execution_time, 233 snapshots=snapshots, 234 deployability_index=deployability_index, 235 **{**self.defaults, **kwargs}, # type: ignore 236 ) 237 238 if rendered_query is None: 239 raise SQLMeshError(f"Failed to render query for audit '{self.name}'.") 240 241 return rendered_query 242 243 @cached_property 244 def depends_on(self) -> t.Set[str]: 245 depends_on = self.depends_on_ or set() 246 247 query = self.render_audit_query() 248 if query is not None: 249 depends_on |= d.find_tables( 250 query, default_catalog=self.default_catalog, dialect=self.dialect 251 ) 252 253 depends_on -= {self.name} 254 return depends_on 255 256 @property 257 def sorted_python_env(self) -> t.List[t.Tuple[str, Executable]]: 258 """Returns the python env sorted by executable kind and then var name.""" 259 return sort_python_env(self.python_env) 260 261 @property 262 def data_hash(self) -> str: 263 """ 264 Computes the data hash for the node. 265 266 Returns: 267 The data hash for the node. 268 """ 269 # StandaloneAudits do not have a data hash 270 return hash_data("") 271 272 @property 273 def metadata_hash(self) -> str: 274 """ 275 Computes the metadata hash for the node. 276 277 Args: 278 audits: Available audits by name. 279 280 Returns: 281 The metadata hash for the node. 282 """ 283 if self._metadata_hash is None: 284 data = [ 285 self.owner, 286 self.description, 287 *sorted(self.tags), 288 str(self.sorted_python_env), 289 self.stamp, 290 self.cron, 291 self.cron_tz.key if self.cron_tz else None, 292 ] 293 294 data.append(self.query_.sql) 295 data.extend([e.sql for e in self.expressions_ or []]) 296 self._metadata_hash = hash_data(data) 297 return self._metadata_hash 298 299 def text_diff(self, other: Node, rendered: bool = False) -> str: 300 """Produce a text diff against another node. 301 302 Args: 303 other: The node to diff against. 304 rendered: Whether the diff should be between raw vs rendered nodes 305 306 Returns: 307 A unified text diff showing additions and deletions. 308 """ 309 if not isinstance(other, StandaloneAudit): 310 raise SQLMeshError( 311 f"Cannot diff audit '{self.name} against a non-audit node '{other.name}'" 312 ) 313 314 return d.text_diff( 315 self.render_definition(render_query=rendered), 316 other.render_definition(render_query=rendered), 317 self.dialect, 318 other.dialect, 319 ).strip() 320 321 def render_definition( 322 self, 323 include_python: bool = True, 324 include_defaults: bool = False, 325 render_query: bool = False, 326 ) -> t.List[exp.Expression]: 327 """Returns the original list of sql expressions comprising the model definition. 328 329 Args: 330 include_python: Whether or not to include Python code in the rendered definition. 331 """ 332 expressions: t.List[exp.Expression] = [] 333 comment = None 334 for field_name in sorted(self.meta_fields): 335 field_value = getattr(self, field_name) 336 field_info = self.all_field_infos()[field_name] 337 if ( 338 field_name == "standalone" 339 or (include_defaults and field_value) 340 or field_value != field_info.default 341 ): 342 if field_name == "description": 343 comment = field_value 344 else: 345 expression = exp.Property( 346 this=field_info.alias or field_name, 347 value=META_FIELD_CONVERTER.get(field_name, exp.to_identifier)(field_value), 348 ) 349 if field_name == "name": 350 expressions.insert(0, expression) 351 else: 352 expressions.append(expression) 353 354 audit = d.Audit(expressions=expressions) 355 audit.comments = [comment] if comment else None 356 357 jinja_expressions = [] 358 python_expressions = [] 359 if include_python: 360 python_env = d.PythonCode(expressions=sorted_python_env_payloads(self.python_env)) 361 if python_env.expressions: 362 python_expressions.append(python_env) 363 364 jinja_expressions = self.jinja_macros.to_expressions() 365 366 return [ 367 audit, 368 *python_expressions, 369 *jinja_expressions, 370 *self.expressions, 371 self.render_audit_query() if render_query else self.query, 372 ] 373 374 @property 375 def is_audit(self) -> bool: 376 """Return True if this is an audit node""" 377 return True 378 379 @property 380 def meta_fields(self) -> t.Iterable[str]: 381 return set(AuditCommonMetaMixin.__annotations__) | set(_Node.all_field_infos()) 382 383 @property 384 def audits_with_args(self) -> t.List[t.Tuple[Audit, t.Dict[str, exp.Expression]]]: 385 return [(self, {})] 386 387 388Audit = t.Union[ModelAudit, StandaloneAudit] 389 390 391def load_audit( 392 expressions: t.List[exp.Expression], 393 *, 394 path: Path = Path(), 395 module_path: Path = Path(), 396 macros: t.Optional[MacroRegistry] = None, 397 jinja_macros: t.Optional[JinjaMacroRegistry] = None, 398 dialect: t.Optional[str] = None, 399 default_catalog: t.Optional[str] = None, 400 variables: t.Optional[t.Dict[str, t.Any]] = None, 401 project: t.Optional[str] = None, 402) -> Audit: 403 """Load an audit from a parsed SQLMesh audit file. 404 405 Args: 406 expressions: Audit, *Statements, Query 407 path: An optional path of the file. 408 dialect: The default dialect if no audit dialect is configured. 409 """ 410 if len(expressions) < 2: 411 _raise_config_error("Incomplete audit definition, missing AUDIT or QUERY", path) 412 413 meta, *statements, query = expressions 414 415 if not isinstance(meta, d.Audit): 416 _raise_config_error( 417 "AUDIT statement is required as the first statement in the definition", 418 path, 419 ) 420 raise 421 422 meta_fields = {p.name: p.args.get("value") for p in meta.expressions if p} 423 424 standalone_field = meta_fields.pop("standalone", None) 425 if standalone_field and not isinstance(standalone_field, exp.Boolean): 426 _raise_config_error( 427 f"""Standalone must be a boolean for '{meta_fields.get("name")}'""", 428 path, 429 ) 430 raise 431 is_standalone = standalone_field and standalone_field.this 432 433 audit_class: t.Union[t.Type[StandaloneAudit], t.Type[ModelAudit]] = ( 434 StandaloneAudit if is_standalone else ModelAudit 435 ) 436 437 missing_required_fields = audit_class.missing_required_fields(set(meta_fields)) 438 missing_required_fields -= {"query"} 439 if missing_required_fields: 440 _raise_config_error( 441 f"Missing required fields {missing_required_fields} in the audit definition", 442 path, 443 ) 444 445 extra_fields = audit_class.extra_fields(set(meta_fields)) 446 447 if extra_fields: 448 _raise_config_error(f"Invalid extra fields {extra_fields} in the audit definition", path) 449 450 if not isinstance(query, exp.Query) and not isinstance(query, d.JinjaQuery): 451 _raise_config_error("Missing SELECT query in the audit definition", path) 452 raise 453 454 extra_kwargs: t.Dict[str, t.Any] = {} 455 if is_standalone: 456 jinja_macro_refrences, referenced_variables = extract_macro_references_and_variables( 457 *(gen(s) for s in statements), 458 gen(query), 459 ) 460 jinja_macros = (jinja_macros or JinjaMacroRegistry()).trim(jinja_macro_refrences) 461 for jinja_macro in jinja_macros.root_macros.values(): 462 referenced_variables.update( 463 extract_macro_references_and_variables(jinja_macro.definition)[1] 464 ) 465 466 extra_kwargs["jinja_macros"] = jinja_macros 467 extra_kwargs["python_env"] = make_python_env( 468 [*statements, query], 469 jinja_macro_refrences, 470 module_path, 471 macros or macro.get_registry(), 472 variables=variables, 473 referenced_variables=referenced_variables, 474 ) 475 extra_kwargs["default_catalog"] = default_catalog 476 if project is not None: 477 extra_kwargs["project"] = project 478 479 dialect = meta_fields.pop("dialect", dialect) or "" 480 481 parsable_query = ParsableSql.from_parsed_expression(query, dialect, use_meta_sql=True) 482 parsable_statements = [ 483 ParsableSql.from_parsed_expression(s, dialect, use_meta_sql=True) for s in statements 484 ] 485 486 try: 487 audit = audit_class( 488 query=parsable_query, 489 expressions=parsable_statements, 490 dialect=dialect, 491 **extra_kwargs, 492 **meta_fields, 493 ) 494 except Exception as ex: 495 _raise_config_error(str(ex), path) 496 497 audit._path = path 498 return audit 499 500 501def load_multiple_audits( 502 expressions: t.List[exp.Expression], 503 *, 504 path: Path = Path(), 505 module_path: Path = Path(), 506 macros: t.Optional[MacroRegistry] = None, 507 jinja_macros: t.Optional[JinjaMacroRegistry] = None, 508 dialect: t.Optional[str] = None, 509 default_catalog: t.Optional[str] = None, 510 variables: t.Optional[t.Dict[str, t.Any]] = None, 511 project: t.Optional[str] = None, 512) -> t.Generator[Audit, None, None]: 513 audit_block: t.List[exp.Expression] = [] 514 for expression in expressions: 515 if isinstance(expression, d.Audit): 516 if audit_block: 517 yield load_audit( 518 expressions=audit_block, 519 path=path, 520 module_path=module_path, 521 macros=macros, 522 jinja_macros=jinja_macros, 523 dialect=dialect, 524 default_catalog=default_catalog, 525 variables=variables, 526 project=project, 527 ) 528 audit_block.clear() 529 audit_block.append(expression) 530 yield load_audit( 531 expressions=audit_block, 532 path=path, 533 dialect=dialect, 534 default_catalog=default_catalog, 535 variables=variables, 536 project=project, 537 ) 538 539 540def _raise_config_error(msg: str, path: pathlib.Path) -> None: 541 raise_config_error(msg, location=path, error_type=AuditConfigError) 542 543 544# mypy doesn't realize raise_config_error raises an exception 545@t.no_type_check 546def _maybe_parse_arg_pair(e: exp.Expression) -> t.Tuple[str, exp.Expression]: 547 if isinstance(e, exp.EQ): 548 return e.left.name, e.right 549 550 551META_FIELD_CONVERTER: t.Dict[str, t.Callable] = { 552 "start": lambda value: exp.Literal.string(value), 553 "cron": lambda value: exp.Literal.string(value), 554 "skip": exp.convert, 555 "blocking": exp.convert, 556 "standalone": exp.convert, 557 "depends_on_": lambda value: exp.Tuple(expressions=sorted(value)), 558 "tags": single_value_or_tuple, 559 "default_catalog": exp.to_identifier, 560 "dbt_node_info_": lambda value: value.to_expression(), 561}
40class AuditCommonMetaMixin: 41 """ 42 Metadata for audits which can be defined in SQL. 43 44 Args: 45 name: The unique name of the audit. 46 dialect: The dialect of the audit query. 47 skip: Setting this to `true` will cause this audit to be skipped. Defaults to `false`. 48 blocking: Setting this to `true` will cause the pipeline execution to stop if this audit fails. 49 standalone: Setting this to `true` will cause this audit to be executed as a standalone audit. 50 """ 51 52 name: str 53 dialect: str 54 skip: bool 55 blocking: bool 56 standalone: bool
Metadata for audits which can be defined in SQL.
Arguments:
- name: The unique name of the audit.
- dialect: The dialect of the audit query.
- skip: Setting this to
truewill cause this audit to be skipped. Defaults tofalse. - blocking: Setting this to
truewill cause the pipeline execution to stop if this audit fails. - standalone: Setting this to
truewill cause this audit to be executed as a standalone audit.
59class AuditMixin(AuditCommonMetaMixin): 60 """ 61 Mixin for common Audit functionality 62 63 Args: 64 query: The audit query. 65 defaults: Default values for the audit query. 66 expressions: Additional sql statements to execute alongside the audit. 67 jinja_macros: A registry of jinja macros to use when rendering the audit query. 68 """ 69 70 query_: ParsableSql 71 defaults: t.Dict[str, exp.Expression] 72 expressions_: t.Optional[t.List[ParsableSql]] 73 jinja_macros: JinjaMacroRegistry 74 formatting: t.Optional[bool] 75 76 @property 77 def query(self) -> t.Union[exp.Query, d.JinjaQuery]: 78 return t.cast(t.Union[exp.Query, d.JinjaQuery], self.query_.parse(self.dialect)) 79 80 @property 81 def expressions(self) -> t.List[exp.Expression]: 82 if not self.expressions_: 83 return [] 84 result = [] 85 for e in self.expressions_: 86 parsed = e.parse(self.dialect) 87 if not isinstance(parsed, exp.Semicolon): 88 result.append(parsed) 89 return result 90 91 @property 92 def macro_definitions(self) -> t.List[d.MacroDef]: 93 """All macro definitions from the list of expressions.""" 94 return [s for s in self.expressions if isinstance(s, d.MacroDef)]
Mixin for common Audit functionality
Arguments:
- query: The audit query.
- defaults: Default values for the audit query.
- expressions: Additional sql statements to execute alongside the audit.
- jinja_macros: A registry of jinja macros to use when rendering the audit query.
91 @property 92 def macro_definitions(self) -> t.List[d.MacroDef]: 93 """All macro definitions from the list of expressions.""" 94 return [s for s in self.expressions if isinstance(s, d.MacroDef)]
All macro definitions from the list of expressions.
Inherited Members
124class ModelAudit(PydanticModel, AuditMixin, DbtInfoMixin, frozen=True): 125 """ 126 Audit is an assertion made about your tables. 127 128 An audit is a SQL query that returns bad records. 129 """ 130 131 name: str 132 dialect: str = "" 133 skip: bool = False 134 blocking: bool = True 135 standalone: t.Literal[False] = False 136 query_: ParsableSql = Field(alias="query") 137 defaults: t.Dict[str, exp.Expression] = {} 138 expressions_: t.Optional[t.List[ParsableSql]] = Field(default=None, alias="expressions") 139 jinja_macros: JinjaMacroRegistry = JinjaMacroRegistry() 140 formatting: t.Optional[bool] = Field(default=None, exclude=True) 141 dbt_node_info_: t.Optional[DbtNodeInfo] = Field(alias="dbt_node_info", default=None) 142 143 _path: t.Optional[Path] = None 144 145 # Validators 146 _query_validator = ParsableSql.validator() 147 _bool_validator = bool_validator 148 _string_validator = audit_string_validator 149 _map_validator = audit_map_validator 150 151 def __str__(self) -> str: 152 path = f": {self._path.name}" if self._path else "" 153 return f"{self.__class__.__name__}<{self.name}{path}>" 154 155 @property 156 def dbt_node_info(self) -> t.Optional[DbtNodeInfo]: 157 return self.dbt_node_info_
Audit is an assertion made about your tables.
An audit is a SQL query that returns bad records.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
358def init_private_attributes(self: BaseModel, context: Any, /) -> None: 359 """This function is meant to behave like a BaseModel method to initialise private attributes. 360 361 It takes context as an argument since that's what pydantic-core passes when calling it. 362 363 Args: 364 self: The BaseModel instance. 365 context: The context. 366 """ 367 if getattr(self, '__pydantic_private__', None) is None: 368 pydantic_private = {} 369 for name, private_attr in self.__private_attributes__.items(): 370 default = private_attr.get_default() 371 if default is not PydanticUndefined: 372 pydantic_private[name] = default 373 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
160class StandaloneAudit(_Node, AuditMixin): 161 """ 162 Args: 163 depends_on: A list of tables this audit depends on. 164 python_env: Dictionary containing all global variables needed to render the audit's macros. 165 """ 166 167 name: str 168 dialect: str = "" 169 skip: bool = False 170 blocking: bool = False 171 standalone: t.Literal[True] = True 172 query_: ParsableSql = Field(alias="query") 173 defaults: t.Dict[str, exp.Expression] = {} 174 expressions_: t.Optional[t.List[ParsableSql]] = Field(default=None, alias="expressions") 175 jinja_macros: JinjaMacroRegistry = JinjaMacroRegistry() 176 default_catalog: t.Optional[str] = None 177 depends_on_: t.Optional[t.Set[str]] = Field(default=None, alias="depends_on") 178 python_env: t.Dict[str, Executable] = {} 179 formatting: t.Optional[bool] = Field(default=None, exclude=True) 180 181 source_type: t.Literal["audit"] = "audit" 182 183 # Validators 184 _query_validator = ParsableSql.validator() 185 _bool_validator = bool_validator 186 _string_validator = audit_string_validator 187 _map_validator = audit_map_validator 188 _default_catalog_validator = default_catalog_validator 189 _depends_on_validator = depends_on_validator 190 191 @model_validator(mode="after") 192 def _node_root_validator(self) -> Self: 193 if self.blocking: 194 raise AuditConfigError(f"Standalone audits cannot be blocking: '{self.name}'.") 195 return self 196 197 def render_audit_query( 198 self, 199 *, 200 start: t.Optional[TimeLike] = None, 201 end: t.Optional[TimeLike] = None, 202 execution_time: t.Optional[TimeLike] = None, 203 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 204 deployability_index: t.Optional[DeployabilityIndex] = None, 205 **kwargs: t.Any, 206 ) -> exp.Query: 207 """Renders the audit's query. 208 209 Args: 210 start: The start datetime to render. Defaults to epoch start. 211 end: The end datetime to render. Defaults to epoch start. 212 execution_time: The date/time time reference to use for execution time. 213 snapshots: All snapshots (by name) to use for mapping of physical locations. 214 deployability_index: Determines snapshots that are deployable in the context of this render. 215 kwargs: Additional kwargs to pass to the renderer. 216 217 Returns: 218 The rendered expression. 219 """ 220 query_renderer = QueryRenderer( 221 self.query, 222 self.dialect, 223 self.macro_definitions, 224 path=self._path or Path(), 225 jinja_macro_registry=self.jinja_macros, 226 python_env=self.python_env, 227 default_catalog=self.default_catalog, 228 ) 229 230 rendered_query = query_renderer.render( 231 start=start, 232 end=end, 233 execution_time=execution_time, 234 snapshots=snapshots, 235 deployability_index=deployability_index, 236 **{**self.defaults, **kwargs}, # type: ignore 237 ) 238 239 if rendered_query is None: 240 raise SQLMeshError(f"Failed to render query for audit '{self.name}'.") 241 242 return rendered_query 243 244 @cached_property 245 def depends_on(self) -> t.Set[str]: 246 depends_on = self.depends_on_ or set() 247 248 query = self.render_audit_query() 249 if query is not None: 250 depends_on |= d.find_tables( 251 query, default_catalog=self.default_catalog, dialect=self.dialect 252 ) 253 254 depends_on -= {self.name} 255 return depends_on 256 257 @property 258 def sorted_python_env(self) -> t.List[t.Tuple[str, Executable]]: 259 """Returns the python env sorted by executable kind and then var name.""" 260 return sort_python_env(self.python_env) 261 262 @property 263 def data_hash(self) -> str: 264 """ 265 Computes the data hash for the node. 266 267 Returns: 268 The data hash for the node. 269 """ 270 # StandaloneAudits do not have a data hash 271 return hash_data("") 272 273 @property 274 def metadata_hash(self) -> str: 275 """ 276 Computes the metadata hash for the node. 277 278 Args: 279 audits: Available audits by name. 280 281 Returns: 282 The metadata hash for the node. 283 """ 284 if self._metadata_hash is None: 285 data = [ 286 self.owner, 287 self.description, 288 *sorted(self.tags), 289 str(self.sorted_python_env), 290 self.stamp, 291 self.cron, 292 self.cron_tz.key if self.cron_tz else None, 293 ] 294 295 data.append(self.query_.sql) 296 data.extend([e.sql for e in self.expressions_ or []]) 297 self._metadata_hash = hash_data(data) 298 return self._metadata_hash 299 300 def text_diff(self, other: Node, rendered: bool = False) -> str: 301 """Produce a text diff against another node. 302 303 Args: 304 other: The node to diff against. 305 rendered: Whether the diff should be between raw vs rendered nodes 306 307 Returns: 308 A unified text diff showing additions and deletions. 309 """ 310 if not isinstance(other, StandaloneAudit): 311 raise SQLMeshError( 312 f"Cannot diff audit '{self.name} against a non-audit node '{other.name}'" 313 ) 314 315 return d.text_diff( 316 self.render_definition(render_query=rendered), 317 other.render_definition(render_query=rendered), 318 self.dialect, 319 other.dialect, 320 ).strip() 321 322 def render_definition( 323 self, 324 include_python: bool = True, 325 include_defaults: bool = False, 326 render_query: bool = False, 327 ) -> t.List[exp.Expression]: 328 """Returns the original list of sql expressions comprising the model definition. 329 330 Args: 331 include_python: Whether or not to include Python code in the rendered definition. 332 """ 333 expressions: t.List[exp.Expression] = [] 334 comment = None 335 for field_name in sorted(self.meta_fields): 336 field_value = getattr(self, field_name) 337 field_info = self.all_field_infos()[field_name] 338 if ( 339 field_name == "standalone" 340 or (include_defaults and field_value) 341 or field_value != field_info.default 342 ): 343 if field_name == "description": 344 comment = field_value 345 else: 346 expression = exp.Property( 347 this=field_info.alias or field_name, 348 value=META_FIELD_CONVERTER.get(field_name, exp.to_identifier)(field_value), 349 ) 350 if field_name == "name": 351 expressions.insert(0, expression) 352 else: 353 expressions.append(expression) 354 355 audit = d.Audit(expressions=expressions) 356 audit.comments = [comment] if comment else None 357 358 jinja_expressions = [] 359 python_expressions = [] 360 if include_python: 361 python_env = d.PythonCode(expressions=sorted_python_env_payloads(self.python_env)) 362 if python_env.expressions: 363 python_expressions.append(python_env) 364 365 jinja_expressions = self.jinja_macros.to_expressions() 366 367 return [ 368 audit, 369 *python_expressions, 370 *jinja_expressions, 371 *self.expressions, 372 self.render_audit_query() if render_query else self.query, 373 ] 374 375 @property 376 def is_audit(self) -> bool: 377 """Return True if this is an audit node""" 378 return True 379 380 @property 381 def meta_fields(self) -> t.Iterable[str]: 382 return set(AuditCommonMetaMixin.__annotations__) | set(_Node.all_field_infos()) 383 384 @property 385 def audits_with_args(self) -> t.List[t.Tuple[Audit, t.Dict[str, exp.Expression]]]: 386 return [(self, {})]
Arguments:
- depends_on: A list of tables this audit depends on.
- python_env: Dictionary containing all global variables needed to render the audit's macros.
197 def render_audit_query( 198 self, 199 *, 200 start: t.Optional[TimeLike] = None, 201 end: t.Optional[TimeLike] = None, 202 execution_time: t.Optional[TimeLike] = None, 203 snapshots: t.Optional[t.Dict[str, Snapshot]] = None, 204 deployability_index: t.Optional[DeployabilityIndex] = None, 205 **kwargs: t.Any, 206 ) -> exp.Query: 207 """Renders the audit's query. 208 209 Args: 210 start: The start datetime to render. Defaults to epoch start. 211 end: The end datetime to render. Defaults to epoch start. 212 execution_time: The date/time time reference to use for execution time. 213 snapshots: All snapshots (by name) to use for mapping of physical locations. 214 deployability_index: Determines snapshots that are deployable in the context of this render. 215 kwargs: Additional kwargs to pass to the renderer. 216 217 Returns: 218 The rendered expression. 219 """ 220 query_renderer = QueryRenderer( 221 self.query, 222 self.dialect, 223 self.macro_definitions, 224 path=self._path or Path(), 225 jinja_macro_registry=self.jinja_macros, 226 python_env=self.python_env, 227 default_catalog=self.default_catalog, 228 ) 229 230 rendered_query = query_renderer.render( 231 start=start, 232 end=end, 233 execution_time=execution_time, 234 snapshots=snapshots, 235 deployability_index=deployability_index, 236 **{**self.defaults, **kwargs}, # type: ignore 237 ) 238 239 if rendered_query is None: 240 raise SQLMeshError(f"Failed to render query for audit '{self.name}'.") 241 242 return rendered_query
Renders the audit's query.
Arguments:
- start: The start datetime to render. Defaults to epoch start.
- end: The end datetime to render. Defaults to epoch start.
- execution_time: The date/time time reference to use for execution time.
- snapshots: All snapshots (by name) to use for mapping of physical locations.
- deployability_index: Determines snapshots that are deployable in the context of this render.
- kwargs: Additional kwargs to pass to the renderer.
Returns:
The rendered expression.
244 @cached_property 245 def depends_on(self) -> t.Set[str]: 246 depends_on = self.depends_on_ or set() 247 248 query = self.render_audit_query() 249 if query is not None: 250 depends_on |= d.find_tables( 251 query, default_catalog=self.default_catalog, dialect=self.dialect 252 ) 253 254 depends_on -= {self.name} 255 return depends_on
257 @property 258 def sorted_python_env(self) -> t.List[t.Tuple[str, Executable]]: 259 """Returns the python env sorted by executable kind and then var name.""" 260 return sort_python_env(self.python_env)
Returns the python env sorted by executable kind and then var name.
262 @property 263 def data_hash(self) -> str: 264 """ 265 Computes the data hash for the node. 266 267 Returns: 268 The data hash for the node. 269 """ 270 # StandaloneAudits do not have a data hash 271 return hash_data("")
Computes the data hash for the node.
Returns:
The data hash for the node.
273 @property 274 def metadata_hash(self) -> str: 275 """ 276 Computes the metadata hash for the node. 277 278 Args: 279 audits: Available audits by name. 280 281 Returns: 282 The metadata hash for the node. 283 """ 284 if self._metadata_hash is None: 285 data = [ 286 self.owner, 287 self.description, 288 *sorted(self.tags), 289 str(self.sorted_python_env), 290 self.stamp, 291 self.cron, 292 self.cron_tz.key if self.cron_tz else None, 293 ] 294 295 data.append(self.query_.sql) 296 data.extend([e.sql for e in self.expressions_ or []]) 297 self._metadata_hash = hash_data(data) 298 return self._metadata_hash
Computes the metadata hash for the node.
Arguments:
- audits: Available audits by name.
Returns:
The metadata hash for the node.
300 def text_diff(self, other: Node, rendered: bool = False) -> str: 301 """Produce a text diff against another node. 302 303 Args: 304 other: The node to diff against. 305 rendered: Whether the diff should be between raw vs rendered nodes 306 307 Returns: 308 A unified text diff showing additions and deletions. 309 """ 310 if not isinstance(other, StandaloneAudit): 311 raise SQLMeshError( 312 f"Cannot diff audit '{self.name} against a non-audit node '{other.name}'" 313 ) 314 315 return d.text_diff( 316 self.render_definition(render_query=rendered), 317 other.render_definition(render_query=rendered), 318 self.dialect, 319 other.dialect, 320 ).strip()
Produce a text diff against another node.
Arguments:
- other: The node to diff against.
- rendered: Whether the diff should be between raw vs rendered nodes
Returns:
A unified text diff showing additions and deletions.
322 def render_definition( 323 self, 324 include_python: bool = True, 325 include_defaults: bool = False, 326 render_query: bool = False, 327 ) -> t.List[exp.Expression]: 328 """Returns the original list of sql expressions comprising the model definition. 329 330 Args: 331 include_python: Whether or not to include Python code in the rendered definition. 332 """ 333 expressions: t.List[exp.Expression] = [] 334 comment = None 335 for field_name in sorted(self.meta_fields): 336 field_value = getattr(self, field_name) 337 field_info = self.all_field_infos()[field_name] 338 if ( 339 field_name == "standalone" 340 or (include_defaults and field_value) 341 or field_value != field_info.default 342 ): 343 if field_name == "description": 344 comment = field_value 345 else: 346 expression = exp.Property( 347 this=field_info.alias or field_name, 348 value=META_FIELD_CONVERTER.get(field_name, exp.to_identifier)(field_value), 349 ) 350 if field_name == "name": 351 expressions.insert(0, expression) 352 else: 353 expressions.append(expression) 354 355 audit = d.Audit(expressions=expressions) 356 audit.comments = [comment] if comment else None 357 358 jinja_expressions = [] 359 python_expressions = [] 360 if include_python: 361 python_env = d.PythonCode(expressions=sorted_python_env_payloads(self.python_env)) 362 if python_env.expressions: 363 python_expressions.append(python_env) 364 365 jinja_expressions = self.jinja_macros.to_expressions() 366 367 return [ 368 audit, 369 *python_expressions, 370 *jinja_expressions, 371 *self.expressions, 372 self.render_audit_query() if render_query else self.query, 373 ]
Returns the original list of sql expressions comprising the model definition.
Arguments:
- include_python: Whether or not to include Python code in the rendered definition.
375 @property 376 def is_audit(self) -> bool: 377 """Return True if this is an audit node""" 378 return True
Return True if this is an audit node
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
358def init_private_attributes(self: BaseModel, context: Any, /) -> None: 359 """This function is meant to behave like a BaseModel method to initialise private attributes. 360 361 It takes context as an argument since that's what pydantic-core passes when calling it. 362 363 Args: 364 self: The BaseModel instance. 365 context: The context. 366 """ 367 if getattr(self, '__pydantic_private__', None) is None: 368 pydantic_private = {} 369 for name, private_attr in self.__private_attributes__.items(): 370 default = private_attr.get_default() 371 if default is not PydanticUndefined: 372 pydantic_private[name] = default 373 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Arguments:
- self: The BaseModel instance.
- context: The context.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
- sqlmesh.core.node._Node
- project
- description
- owner
- start
- end
- cron
- cron_tz
- interval_unit_
- stamp
- dbt_node_info_
- copy
- batch_size
- batch_concurrency
- interval_unit
- fqn
- is_metadata_only_change
- is_data_change
- croniter
- cron_next
- cron_prev
- cron_floor
- is_model
- dbt_node_info
392def load_audit( 393 expressions: t.List[exp.Expression], 394 *, 395 path: Path = Path(), 396 module_path: Path = Path(), 397 macros: t.Optional[MacroRegistry] = None, 398 jinja_macros: t.Optional[JinjaMacroRegistry] = None, 399 dialect: t.Optional[str] = None, 400 default_catalog: t.Optional[str] = None, 401 variables: t.Optional[t.Dict[str, t.Any]] = None, 402 project: t.Optional[str] = None, 403) -> Audit: 404 """Load an audit from a parsed SQLMesh audit file. 405 406 Args: 407 expressions: Audit, *Statements, Query 408 path: An optional path of the file. 409 dialect: The default dialect if no audit dialect is configured. 410 """ 411 if len(expressions) < 2: 412 _raise_config_error("Incomplete audit definition, missing AUDIT or QUERY", path) 413 414 meta, *statements, query = expressions 415 416 if not isinstance(meta, d.Audit): 417 _raise_config_error( 418 "AUDIT statement is required as the first statement in the definition", 419 path, 420 ) 421 raise 422 423 meta_fields = {p.name: p.args.get("value") for p in meta.expressions if p} 424 425 standalone_field = meta_fields.pop("standalone", None) 426 if standalone_field and not isinstance(standalone_field, exp.Boolean): 427 _raise_config_error( 428 f"""Standalone must be a boolean for '{meta_fields.get("name")}'""", 429 path, 430 ) 431 raise 432 is_standalone = standalone_field and standalone_field.this 433 434 audit_class: t.Union[t.Type[StandaloneAudit], t.Type[ModelAudit]] = ( 435 StandaloneAudit if is_standalone else ModelAudit 436 ) 437 438 missing_required_fields = audit_class.missing_required_fields(set(meta_fields)) 439 missing_required_fields -= {"query"} 440 if missing_required_fields: 441 _raise_config_error( 442 f"Missing required fields {missing_required_fields} in the audit definition", 443 path, 444 ) 445 446 extra_fields = audit_class.extra_fields(set(meta_fields)) 447 448 if extra_fields: 449 _raise_config_error(f"Invalid extra fields {extra_fields} in the audit definition", path) 450 451 if not isinstance(query, exp.Query) and not isinstance(query, d.JinjaQuery): 452 _raise_config_error("Missing SELECT query in the audit definition", path) 453 raise 454 455 extra_kwargs: t.Dict[str, t.Any] = {} 456 if is_standalone: 457 jinja_macro_refrences, referenced_variables = extract_macro_references_and_variables( 458 *(gen(s) for s in statements), 459 gen(query), 460 ) 461 jinja_macros = (jinja_macros or JinjaMacroRegistry()).trim(jinja_macro_refrences) 462 for jinja_macro in jinja_macros.root_macros.values(): 463 referenced_variables.update( 464 extract_macro_references_and_variables(jinja_macro.definition)[1] 465 ) 466 467 extra_kwargs["jinja_macros"] = jinja_macros 468 extra_kwargs["python_env"] = make_python_env( 469 [*statements, query], 470 jinja_macro_refrences, 471 module_path, 472 macros or macro.get_registry(), 473 variables=variables, 474 referenced_variables=referenced_variables, 475 ) 476 extra_kwargs["default_catalog"] = default_catalog 477 if project is not None: 478 extra_kwargs["project"] = project 479 480 dialect = meta_fields.pop("dialect", dialect) or "" 481 482 parsable_query = ParsableSql.from_parsed_expression(query, dialect, use_meta_sql=True) 483 parsable_statements = [ 484 ParsableSql.from_parsed_expression(s, dialect, use_meta_sql=True) for s in statements 485 ] 486 487 try: 488 audit = audit_class( 489 query=parsable_query, 490 expressions=parsable_statements, 491 dialect=dialect, 492 **extra_kwargs, 493 **meta_fields, 494 ) 495 except Exception as ex: 496 _raise_config_error(str(ex), path) 497 498 audit._path = path 499 return audit
Load an audit from a parsed SQLMesh audit file.
Arguments:
- expressions: Audit, *Statements, Query
- path: An optional path of the file.
- dialect: The default dialect if no audit dialect is configured.
502def load_multiple_audits( 503 expressions: t.List[exp.Expression], 504 *, 505 path: Path = Path(), 506 module_path: Path = Path(), 507 macros: t.Optional[MacroRegistry] = None, 508 jinja_macros: t.Optional[JinjaMacroRegistry] = None, 509 dialect: t.Optional[str] = None, 510 default_catalog: t.Optional[str] = None, 511 variables: t.Optional[t.Dict[str, t.Any]] = None, 512 project: t.Optional[str] = None, 513) -> t.Generator[Audit, None, None]: 514 audit_block: t.List[exp.Expression] = [] 515 for expression in expressions: 516 if isinstance(expression, d.Audit): 517 if audit_block: 518 yield load_audit( 519 expressions=audit_block, 520 path=path, 521 module_path=module_path, 522 macros=macros, 523 jinja_macros=jinja_macros, 524 dialect=dialect, 525 default_catalog=default_catalog, 526 variables=variables, 527 project=project, 528 ) 529 audit_block.clear() 530 audit_block.append(expression) 531 yield load_audit( 532 expressions=audit_block, 533 path=path, 534 dialect=dialect, 535 default_catalog=default_catalog, 536 variables=variables, 537 project=project, 538 )