sqlmesh.dbt.test
1from __future__ import annotations 2 3import re 4import typing as t 5from enum import Enum 6from pathlib import Path 7 8from pydantic import Field 9import sqlmesh.core.dialect as d 10from sqlmesh.core.audit import Audit, ModelAudit, StandaloneAudit 11from sqlmesh.core.node import DbtNodeInfo 12from sqlmesh.dbt.common import ( 13 Dependencies, 14 GeneralConfig, 15 SqlStr, 16 sql_str_validator, 17) 18from sqlmesh.utils import AttributeDict 19from sqlmesh.utils.pydantic import field_validator 20 21if t.TYPE_CHECKING: 22 from sqlmesh.dbt.context import DbtContext 23 24 25class Severity(str, Enum): 26 """DBT test severity""" 27 28 ERROR = "error" 29 WARN = "warn" 30 31 32class TestConfig(GeneralConfig): 33 """ 34 TestConfig contains all the config paramters for a dbt test. 35 36 Args: 37 path: The file path to the test. 38 name: The name of the test. 39 sql: The test sql. 40 test_kwargs: The kwargs passed into the test. 41 model_name: The name of the model this test is attached to. Do not set for singular tests. 42 owner: The name of the model under test. 43 stamp: An optional arbitrary string sequence used to create new audit versions without making 44 changes to any of the functional components of the definition. 45 cron: A cron string specifying how often the audit should be refreshed, leveraging the 46 [croniter](https://github.com/kiorky/croniter) library. 47 interval_unit: The duration of an interval for the audit. By default, it is computed from the cron expression. 48 column_name: The name of the column under test. 49 dependencies: The macros, refs, and sources the test depends upon. 50 dialect: SQL dialect of the test query. 51 package_name: Name of the package that defines the test. 52 alias: The alias for the materialized table where failures are stored (Not supported). 53 schema: The schema for the materialized table where the failures are stored (Not supported). 54 database: The database for the materialized table where the failures are stored (Not supported). 55 severity: The severity of a failure: ERROR blocks execution and WARN continues execution. 56 store_failures: Failures are stored in a materialized table when True (Not supported). 57 where: Additional where clause to add to the test. 58 limit: Additional limit clause to add to the test (Not supported). 59 fail_calc: Custom calculation to use (default "count(*)") for displaying test failure (Not supported). 60 warn_if: Conditional expression (default "!=0") to detect if warn condition met (Not supported). 61 error_if: Conditional expression (default "!=0") to detect if error condition met (Not supported). 62 """ 63 64 __test__ = ( 65 False # prevent pytest trying to collect this as a test class when it's imported in a test 66 ) 67 68 # SQLMesh fields 69 path: Path = Path() 70 name: str 71 sql: SqlStr 72 test_kwargs: t.Dict[str, t.Any] = {} 73 model_name: t.Optional[str] = None 74 owner: t.Optional[str] = None 75 stamp: t.Optional[str] = None 76 cron: t.Optional[str] = None 77 interval_unit: t.Optional[str] = None 78 column_name: t.Optional[str] = None 79 dependencies: Dependencies = Dependencies() 80 dialect_: t.Optional[str] = Field(None, alias="dialect") 81 82 # dbt fields 83 unique_id: str = "" 84 package_name: str = "" 85 alias: t.Optional[str] = None 86 fqn: t.List[str] = [] 87 schema_: t.Optional[str] = Field("", alias="schema") 88 database: t.Optional[str] = None 89 severity: Severity = Severity.ERROR 90 store_failures: t.Optional[bool] = None 91 where: t.Optional[str] = None 92 limit: t.Optional[int] = None 93 fail_calc: str = "count(*)" 94 warn_if: str = "!=0" 95 error_if: str = "!=0" 96 quoting: t.Dict[str, t.Optional[bool]] = {} 97 98 _sql_validator = sql_str_validator 99 100 @field_validator("severity", mode="before") 101 @classmethod 102 def _validate_severity(cls, v: t.Union[Severity, str]) -> Severity: 103 if isinstance(v, Severity): 104 return v 105 return Severity(v.lower()) 106 107 @field_validator("name", mode="before") 108 @classmethod 109 def _lowercase_name(cls, v: str) -> str: 110 return v.lower() 111 112 @property 113 def canonical_name(self) -> str: 114 return f"{self.package_name}.{self.name}".lower() if self.package_name else self.name 115 116 @property 117 def is_standalone(self) -> bool: 118 # A test is standalone if: 119 # 1. It has no model_name (already standalone), OR 120 # 2. It references other models besides its own model 121 if not self.model_name: 122 return True 123 124 # Check if test has references to other models 125 # For versioned models, refs include version (e.g., "model_name_v1") but model_name may not 126 self_refs = {self.model_name} 127 for ref in self.dependencies.refs: 128 # versioned models end in _vX 129 if ref.startswith(f"{self.model_name}_v"): 130 self_refs.add(ref) 131 132 other_refs = {ref for ref in self.dependencies.refs if ref not in self_refs} 133 return bool(other_refs) 134 135 @property 136 def sqlmesh_config_fields(self) -> t.Set[str]: 137 return {"description", "owner", "stamp", "cron", "interval_unit"} 138 139 def dialect(self, context: DbtContext) -> str: 140 return self.dialect_ or context.default_dialect 141 142 def to_sqlmesh(self, context: DbtContext) -> Audit: 143 """Convert dbt Test to SQLMesh Audit 144 145 Args: 146 context: Context for the dbt project 147 Returns: 148 SQLMesh Audit for this test 149 """ 150 test_context = context.context_for_dependencies(self.dependencies) 151 152 jinja_macros = test_context.jinja_macros.trim( 153 self.dependencies.macros, package=self.package_name 154 ) 155 jinja_macros.add_globals( 156 { 157 "config": self.config_attribute_dict, 158 **test_context.jinja_globals, # type: ignore 159 } 160 ) 161 162 query = d.jinja_query(self.sql.replace("**_dbt_generic_test_kwargs", self._kwargs())) 163 164 skip = not self.enabled 165 blocking = self.severity == Severity.ERROR 166 167 audit: Audit 168 if self.is_standalone: 169 jinja_macros.add_globals({"this": self.relation_info}) 170 audit = StandaloneAudit( 171 name=self.name, 172 dbt_node_info=self.node_info, 173 dialect=self.dialect(context), 174 skip=skip, 175 query=query, 176 jinja_macros=jinja_macros, 177 depends_on={ 178 model.canonical_name(context) for model in test_context.refs.values() 179 }.union( 180 {source.canonical_name(context) for source in test_context.sources.values()} 181 ), 182 tags=self.tags, 183 default_catalog=context.target.database, 184 **self.sqlmesh_config_kwargs, 185 ) 186 else: 187 audit = ModelAudit( 188 name=self.name, 189 dbt_node_info=self.node_info, 190 dialect=self.dialect(context), 191 skip=skip, 192 blocking=blocking, 193 query=query, 194 jinja_macros=jinja_macros, 195 ) 196 197 audit._path = self.path 198 return audit 199 200 def _kwargs(self) -> str: 201 kwargs = {} 202 for key, value in self.test_kwargs.items(): 203 if isinstance(value, str): 204 # Multiline values will end with a newline. Remove it here. 205 value = value.rstrip() 206 # Mimic dbt kwargs logic 207 no_braces = _remove_jinja_braces(value) 208 jinja_function_regex = r"^\s*(env_var|ref|var|source|doc)\s*\(.+\)\s*$" 209 if key != "column_name" and ( 210 value != no_braces or re.match(jinja_function_regex, value) 211 ): 212 kwargs[key] = no_braces 213 else: 214 kwargs[key] = f'"{escape_quotes(value)}"' 215 else: 216 kwargs[key] = value 217 218 return ", ".join(f"{key}={value}" for key, value in kwargs.items()) 219 220 @property 221 def relation_info(self) -> AttributeDict: 222 return AttributeDict( 223 { 224 "name": self.name, 225 "database": self.database, 226 "schema": self.schema_, 227 "identifier": self.name, 228 "type": None, 229 "quote_policy": AttributeDict(), 230 } 231 ) 232 233 @property 234 def node_info(self) -> DbtNodeInfo: 235 return DbtNodeInfo( 236 unique_id=self.unique_id, name=self.name, fqn=".".join(self.fqn), alias=self.alias 237 ) 238 239 240def _remove_jinja_braces(jinja_str: str) -> str: 241 no_braces = jinja_str 242 243 cursor = 0 244 quotes: t.List[str] = [] 245 while cursor < len(no_braces): 246 val = no_braces[cursor] 247 if val in ('"', "'"): 248 if quotes and quotes[-1] == val: 249 quotes.pop() 250 else: 251 quotes.append(no_braces[cursor]) 252 if ( 253 cursor + 1 < len(no_braces) 254 and no_braces[cursor : cursor + 2] in ("{{", "}}") 255 and not quotes 256 ): 257 no_braces = no_braces[:cursor] + no_braces[cursor + 2 :] 258 else: 259 cursor += 1 260 261 return no_braces.strip() 262 263 264def escape_quotes(v: str) -> str: 265 return v.replace('"', '\\"')
class
Severity(builtins.str, enum.Enum):
DBT test severity
ERROR =
<Severity.ERROR: 'error'>
WARN =
<Severity.WARN: 'warn'>
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
33class TestConfig(GeneralConfig): 34 """ 35 TestConfig contains all the config paramters for a dbt test. 36 37 Args: 38 path: The file path to the test. 39 name: The name of the test. 40 sql: The test sql. 41 test_kwargs: The kwargs passed into the test. 42 model_name: The name of the model this test is attached to. Do not set for singular tests. 43 owner: The name of the model under test. 44 stamp: An optional arbitrary string sequence used to create new audit versions without making 45 changes to any of the functional components of the definition. 46 cron: A cron string specifying how often the audit should be refreshed, leveraging the 47 [croniter](https://github.com/kiorky/croniter) library. 48 interval_unit: The duration of an interval for the audit. By default, it is computed from the cron expression. 49 column_name: The name of the column under test. 50 dependencies: The macros, refs, and sources the test depends upon. 51 dialect: SQL dialect of the test query. 52 package_name: Name of the package that defines the test. 53 alias: The alias for the materialized table where failures are stored (Not supported). 54 schema: The schema for the materialized table where the failures are stored (Not supported). 55 database: The database for the materialized table where the failures are stored (Not supported). 56 severity: The severity of a failure: ERROR blocks execution and WARN continues execution. 57 store_failures: Failures are stored in a materialized table when True (Not supported). 58 where: Additional where clause to add to the test. 59 limit: Additional limit clause to add to the test (Not supported). 60 fail_calc: Custom calculation to use (default "count(*)") for displaying test failure (Not supported). 61 warn_if: Conditional expression (default "!=0") to detect if warn condition met (Not supported). 62 error_if: Conditional expression (default "!=0") to detect if error condition met (Not supported). 63 """ 64 65 __test__ = ( 66 False # prevent pytest trying to collect this as a test class when it's imported in a test 67 ) 68 69 # SQLMesh fields 70 path: Path = Path() 71 name: str 72 sql: SqlStr 73 test_kwargs: t.Dict[str, t.Any] = {} 74 model_name: t.Optional[str] = None 75 owner: t.Optional[str] = None 76 stamp: t.Optional[str] = None 77 cron: t.Optional[str] = None 78 interval_unit: t.Optional[str] = None 79 column_name: t.Optional[str] = None 80 dependencies: Dependencies = Dependencies() 81 dialect_: t.Optional[str] = Field(None, alias="dialect") 82 83 # dbt fields 84 unique_id: str = "" 85 package_name: str = "" 86 alias: t.Optional[str] = None 87 fqn: t.List[str] = [] 88 schema_: t.Optional[str] = Field("", alias="schema") 89 database: t.Optional[str] = None 90 severity: Severity = Severity.ERROR 91 store_failures: t.Optional[bool] = None 92 where: t.Optional[str] = None 93 limit: t.Optional[int] = None 94 fail_calc: str = "count(*)" 95 warn_if: str = "!=0" 96 error_if: str = "!=0" 97 quoting: t.Dict[str, t.Optional[bool]] = {} 98 99 _sql_validator = sql_str_validator 100 101 @field_validator("severity", mode="before") 102 @classmethod 103 def _validate_severity(cls, v: t.Union[Severity, str]) -> Severity: 104 if isinstance(v, Severity): 105 return v 106 return Severity(v.lower()) 107 108 @field_validator("name", mode="before") 109 @classmethod 110 def _lowercase_name(cls, v: str) -> str: 111 return v.lower() 112 113 @property 114 def canonical_name(self) -> str: 115 return f"{self.package_name}.{self.name}".lower() if self.package_name else self.name 116 117 @property 118 def is_standalone(self) -> bool: 119 # A test is standalone if: 120 # 1. It has no model_name (already standalone), OR 121 # 2. It references other models besides its own model 122 if not self.model_name: 123 return True 124 125 # Check if test has references to other models 126 # For versioned models, refs include version (e.g., "model_name_v1") but model_name may not 127 self_refs = {self.model_name} 128 for ref in self.dependencies.refs: 129 # versioned models end in _vX 130 if ref.startswith(f"{self.model_name}_v"): 131 self_refs.add(ref) 132 133 other_refs = {ref for ref in self.dependencies.refs if ref not in self_refs} 134 return bool(other_refs) 135 136 @property 137 def sqlmesh_config_fields(self) -> t.Set[str]: 138 return {"description", "owner", "stamp", "cron", "interval_unit"} 139 140 def dialect(self, context: DbtContext) -> str: 141 return self.dialect_ or context.default_dialect 142 143 def to_sqlmesh(self, context: DbtContext) -> Audit: 144 """Convert dbt Test to SQLMesh Audit 145 146 Args: 147 context: Context for the dbt project 148 Returns: 149 SQLMesh Audit for this test 150 """ 151 test_context = context.context_for_dependencies(self.dependencies) 152 153 jinja_macros = test_context.jinja_macros.trim( 154 self.dependencies.macros, package=self.package_name 155 ) 156 jinja_macros.add_globals( 157 { 158 "config": self.config_attribute_dict, 159 **test_context.jinja_globals, # type: ignore 160 } 161 ) 162 163 query = d.jinja_query(self.sql.replace("**_dbt_generic_test_kwargs", self._kwargs())) 164 165 skip = not self.enabled 166 blocking = self.severity == Severity.ERROR 167 168 audit: Audit 169 if self.is_standalone: 170 jinja_macros.add_globals({"this": self.relation_info}) 171 audit = StandaloneAudit( 172 name=self.name, 173 dbt_node_info=self.node_info, 174 dialect=self.dialect(context), 175 skip=skip, 176 query=query, 177 jinja_macros=jinja_macros, 178 depends_on={ 179 model.canonical_name(context) for model in test_context.refs.values() 180 }.union( 181 {source.canonical_name(context) for source in test_context.sources.values()} 182 ), 183 tags=self.tags, 184 default_catalog=context.target.database, 185 **self.sqlmesh_config_kwargs, 186 ) 187 else: 188 audit = ModelAudit( 189 name=self.name, 190 dbt_node_info=self.node_info, 191 dialect=self.dialect(context), 192 skip=skip, 193 blocking=blocking, 194 query=query, 195 jinja_macros=jinja_macros, 196 ) 197 198 audit._path = self.path 199 return audit 200 201 def _kwargs(self) -> str: 202 kwargs = {} 203 for key, value in self.test_kwargs.items(): 204 if isinstance(value, str): 205 # Multiline values will end with a newline. Remove it here. 206 value = value.rstrip() 207 # Mimic dbt kwargs logic 208 no_braces = _remove_jinja_braces(value) 209 jinja_function_regex = r"^\s*(env_var|ref|var|source|doc)\s*\(.+\)\s*$" 210 if key != "column_name" and ( 211 value != no_braces or re.match(jinja_function_regex, value) 212 ): 213 kwargs[key] = no_braces 214 else: 215 kwargs[key] = f'"{escape_quotes(value)}"' 216 else: 217 kwargs[key] = value 218 219 return ", ".join(f"{key}={value}" for key, value in kwargs.items()) 220 221 @property 222 def relation_info(self) -> AttributeDict: 223 return AttributeDict( 224 { 225 "name": self.name, 226 "database": self.database, 227 "schema": self.schema_, 228 "identifier": self.name, 229 "type": None, 230 "quote_policy": AttributeDict(), 231 } 232 ) 233 234 @property 235 def node_info(self) -> DbtNodeInfo: 236 return DbtNodeInfo( 237 unique_id=self.unique_id, name=self.name, fqn=".".join(self.fqn), alias=self.alias 238 )
TestConfig contains all the config paramters for a dbt test.
Arguments:
- path: The file path to the test.
- name: The name of the test.
- sql: The test sql.
- test_kwargs: The kwargs passed into the test.
- model_name: The name of the model this test is attached to. Do not set for singular tests.
- owner: The name of the model under test.
- stamp: An optional arbitrary string sequence used to create new audit versions without making changes to any of the functional components of the definition.
- cron: A cron string specifying how often the audit should be refreshed, leveraging the croniter library.
- interval_unit: The duration of an interval for the audit. By default, it is computed from the cron expression.
- column_name: The name of the column under test.
- dependencies: The macros, refs, and sources the test depends upon.
- dialect: SQL dialect of the test query.
- package_name: Name of the package that defines the test.
- alias: The alias for the materialized table where failures are stored (Not supported).
- schema: The schema for the materialized table where the failures are stored (Not supported).
- database: The database for the materialized table where the failures are stored (Not supported).
- severity: The severity of a failure: ERROR blocks execution and WARN continues execution.
- store_failures: Failures are stored in a materialized table when True (Not supported).
- where: Additional where clause to add to the test.
- limit: Additional limit clause to add to the test (Not supported).
- fail_calc: Custom calculation to use (default "count(*)") for displaying test failure (Not supported).
- warn_if: Conditional expression (default "!=0") to detect if warn condition met (Not supported).
- error_if: Conditional expression (default "!=0") to detect if error condition met (Not supported).
dependencies: sqlmesh.dbt.common.Dependencies
severity: Severity
is_standalone: bool
117 @property 118 def is_standalone(self) -> bool: 119 # A test is standalone if: 120 # 1. It has no model_name (already standalone), OR 121 # 2. It references other models besides its own model 122 if not self.model_name: 123 return True 124 125 # Check if test has references to other models 126 # For versioned models, refs include version (e.g., "model_name_v1") but model_name may not 127 self_refs = {self.model_name} 128 for ref in self.dependencies.refs: 129 # versioned models end in _vX 130 if ref.startswith(f"{self.model_name}_v"): 131 self_refs.add(ref) 132 133 other_refs = {ref for ref in self.dependencies.refs if ref not in self_refs} 134 return bool(other_refs)
sqlmesh_config_fields: Set[str]
136 @property 137 def sqlmesh_config_fields(self) -> t.Set[str]: 138 return {"description", "owner", "stamp", "cron", "interval_unit"}
SQLMesh config fields that can be set in dbt projects.
Returns:
A set of SQLMesh config fields that can be set in dbt projects.
def
to_sqlmesh( self, context: sqlmesh.dbt.context.DbtContext) -> Union[sqlmesh.core.audit.definition.ModelAudit, sqlmesh.core.audit.definition.StandaloneAudit]:
143 def to_sqlmesh(self, context: DbtContext) -> Audit: 144 """Convert dbt Test to SQLMesh Audit 145 146 Args: 147 context: Context for the dbt project 148 Returns: 149 SQLMesh Audit for this test 150 """ 151 test_context = context.context_for_dependencies(self.dependencies) 152 153 jinja_macros = test_context.jinja_macros.trim( 154 self.dependencies.macros, package=self.package_name 155 ) 156 jinja_macros.add_globals( 157 { 158 "config": self.config_attribute_dict, 159 **test_context.jinja_globals, # type: ignore 160 } 161 ) 162 163 query = d.jinja_query(self.sql.replace("**_dbt_generic_test_kwargs", self._kwargs())) 164 165 skip = not self.enabled 166 blocking = self.severity == Severity.ERROR 167 168 audit: Audit 169 if self.is_standalone: 170 jinja_macros.add_globals({"this": self.relation_info}) 171 audit = StandaloneAudit( 172 name=self.name, 173 dbt_node_info=self.node_info, 174 dialect=self.dialect(context), 175 skip=skip, 176 query=query, 177 jinja_macros=jinja_macros, 178 depends_on={ 179 model.canonical_name(context) for model in test_context.refs.values() 180 }.union( 181 {source.canonical_name(context) for source in test_context.sources.values()} 182 ), 183 tags=self.tags, 184 default_catalog=context.target.database, 185 **self.sqlmesh_config_kwargs, 186 ) 187 else: 188 audit = ModelAudit( 189 name=self.name, 190 dbt_node_info=self.node_info, 191 dialect=self.dialect(context), 192 skip=skip, 193 blocking=blocking, 194 query=query, 195 jinja_macros=jinja_macros, 196 ) 197 198 audit._path = self.path 199 return audit
Convert dbt Test to SQLMesh Audit
Arguments:
- context: Context for the dbt project
Returns:
SQLMesh Audit for this test
relation_info: sqlmesh.utils.AttributeDict
node_info: sqlmesh.core.node.DbtNodeInfo
model_config =
{'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'allow', 'protected_namespaces': (), 'validate_assignment': True, 'frozen': False}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
def
escape_quotes(v: str) -> str: