Contains all the standard rules included with SQLMesh
1"""Contains all the standard rules included with SQLMesh""" 2 3from __future__ import annotations 4 5import typing as t 6 7from sqlglot.expressions import Star 8from sqlglot.helper import subclasses 9 10from sqlmesh.core.constants import EXTERNAL_MODELS_YAML 11from sqlmesh.core.dialect import normalize_model_name 12from sqlmesh.core.linter.helpers import ( 13 TokenPositionDetails, 14 get_range_of_model_block, 15 read_range_from_string, 16) 17from sqlmesh.core.linter.rule import ( 18 Rule, 19 RuleViolation, 20 Range, 21 Fix, 22 TextEdit, 23 Position, 24 CreateFile, 25) 26from sqlmesh.core.linter.definition import RuleSet 27from sqlmesh.core.model import Model, SqlModel, ExternalModel 28from sqlmesh.utils.lineage import extract_references_from_query, ExternalModelReference 29 30 31class NoSelectStar(Rule): 32 """Query should not contain SELECT * on its outer most projections, even if it can be expanded.""" 33 34 def check_model(self, model: Model) -> t.Optional[RuleViolation]: 35 # Only applies to SQL models, as other model types do not have a query. 36 if not isinstance(model, SqlModel): 37 return None 38 if model.query.is_star: 39 violation_range = self._get_range(model) 40 fixes = self._create_fixes(model, violation_range) 41 return self.violation(violation_range=violation_range, fixes=fixes) 42 return None 43 44 def _get_range(self, model: SqlModel) -> t.Optional[Range]: 45 """Get the range of the violation if available.""" 46 try: 47 if len(model.query.expressions) == 1 and isinstance(model.query.expressions[0], Star): 48 return TokenPositionDetails.from_meta(model.query.expressions[0].meta).to_range( 49 None 50 ) 51 except Exception: 52 pass 53 54 return None 55 56 def _create_fixes( 57 self, model: SqlModel, violation_range: t.Optional[Range] 58 ) -> t.Optional[t.List[Fix]]: 59 """Create fixes for the SELECT * violation.""" 60 if not violation_range: 61 return None 62 columns = model.columns_to_types 63 if not columns: 64 return None 65 path = model._path 66 if path is None: 67 return None 68 new_text = ", ".join(columns.keys()) 69 return [ 70 Fix( 71 title="Replace SELECT * with explicit column list", 72 edits=[ 73 TextEdit( 74 path=path, 75 range=violation_range, 76 new_text=new_text, 77 ) 78 ], 79 ) 80 ] 81 82 83class InvalidSelectStarExpansion(Rule): 84 def check_model(self, model: Model) -> t.Optional[RuleViolation]: 85 deps = model.violated_rules_for_query.get(InvalidSelectStarExpansion) 86 if not deps: 87 return None 88 89 violation_msg = ( 90 f"SELECT * cannot be expanded due to missing schema(s) for model(s): {deps}. " 91 "Run `sqlmesh create_external_models` and / or make sure that the model " 92 f"'{model.fqn}' can be rendered at parse time." 93 ) 94 95 return self.violation(violation_msg) 96 97 98class AmbiguousOrInvalidColumn(Rule): 99 def check_model(self, model: Model) -> t.Optional[RuleViolation]: 100 sqlglot_err = model.violated_rules_for_query.get(AmbiguousOrInvalidColumn) 101 if not sqlglot_err: 102 return None 103 104 violation_msg = ( 105 f"{sqlglot_err} for model '{model.fqn}', the column may not exist or is ambiguous." 106 ) 107 108 return self.violation(violation_msg) 109 110 111class NoMissingAudits(Rule): 112 """Model `audits` must be configured to test data quality.""" 113 114 def check_model(self, model: Model) -> t.Optional[RuleViolation]: 115 if model.audits or model.kind.is_symbolic: 116 return None 117 if model._path is None or not str(model._path).endswith(".sql"): 118 return self.violation() 119 120 try: 121 with open(model._path, "r", encoding="utf-8") as file: 122 content = file.read() 123 124 range = get_range_of_model_block(content, model.dialect) 125 if range: 126 return self.violation(violation_range=range) 127 return self.violation() 128 except Exception: 129 return self.violation() 130 131 132class NoMissingUnitTest(Rule): 133 """All models must have a unit test found in the tests/ directory yaml files""" 134 135 def check_model(self, model: Model) -> t.Optional[RuleViolation]: 136 # External models cannot have unit tests 137 if isinstance(model, ExternalModel): 138 return None 139 140 if model.name not in self.context.models_with_tests: 141 return self.violation( 142 violation_msg=f"Model {model.name} is missing unit test(s). Please add in the tests/ directory." 143 ) 144 return None 145 146 147class NoMissingExternalModels(Rule): 148 """All external models must be registered in the external_models.yaml file""" 149 150 def check_model( 151 self, model: Model 152 ) -> t.Optional[t.Union[RuleViolation, t.List[RuleViolation]]]: 153 # Ignore external models themselves, because either they are registered, 154 # and if they are not, they will be caught as referenced in another model. 155 if isinstance(model, ExternalModel): 156 return None 157 158 # Handle other models that may refer to the external models. 159 not_registered_external_models: t.Set[str] = set() 160 for depends_on_model in model.depends_on: 161 existing_model = self.context.get_model(depends_on_model) 162 if existing_model is None: 163 not_registered_external_models.add(depends_on_model) 164 165 if not not_registered_external_models: 166 return None 167 168 # If the model is anything other than a sql model that and has a path 169 # that ends with .sql, we cannot extract the references from the query. 170 path = model._path 171 if not isinstance(model, SqlModel) or not path or not str(path).endswith(".sql"): 172 return self._standard_error_message( 173 model_name=model.fqn, 174 external_models=not_registered_external_models, 175 ) 176 177 with open(path, "r", encoding="utf-8") as file: 178 read_file = file.read() 179 split_read_file = read_file.splitlines() 180 181 # If there are any unregistered external models, return a violation find 182 # the ranges for them. 183 references = extract_references_from_query( 184 query=model.query, 185 context=self.context, 186 document_path=path, 187 read_file=split_read_file, 188 depends_on=model.depends_on, 189 dialect=model.dialect, 190 ) 191 external_references = { 192 normalize_model_name( 193 table=read_range_from_string(read_file, ref.range), 194 default_catalog=model.default_catalog, 195 dialect=model.dialect, 196 ): ref 197 for ref in references 198 if isinstance(ref, ExternalModelReference) and ref.path is None 199 } 200 201 # Ensure that depends_on and external references match. 202 if not_registered_external_models != set(external_references.keys()): 203 return self._standard_error_message( 204 model_name=model.fqn, 205 external_models=not_registered_external_models, 206 ) 207 208 # Return a violation for each unregistered external model with its range. 209 violations = [] 210 for ref_name, ref in external_references.items(): 211 if ref_name in not_registered_external_models: 212 fix = self.create_fix(ref_name) 213 violations.append( 214 RuleViolation( 215 rule=self, 216 violation_msg=f"Model '{model.fqn}' depends on unregistered external model '{ref_name}'. " 217 "Please register it in the external models file. This can be done by running 'sqlmesh create_external_models'.", 218 violation_range=ref.range, 219 fixes=[fix] if fix else [], 220 ) 221 ) 222 223 if len(violations) < len(not_registered_external_models): 224 return self._standard_error_message( 225 model_name=model.fqn, 226 external_models=not_registered_external_models, 227 ) 228 229 return violations 230 231 def _standard_error_message( 232 self, model_name: str, external_models: t.Set[str] 233 ) -> RuleViolation: 234 return RuleViolation( 235 rule=self, 236 violation_msg=f"Model '{model_name}' depends on unregistered external models: " 237 f"{', '.join(m for m in external_models)}. " 238 "Please register them in the external models file. This can be done by running 'sqlmesh create_external_models'.", 239 ) 240 241 def create_fix(self, model_name: str) -> t.Optional[Fix]: 242 """ 243 Add an external model to the external models file. 244 - If no external models file exists, it will create one with the model. 245 - If the model already exists, it will not add it again. 246 """ 247 root = self.context.path 248 if not root: 249 return None 250 251 external_models_path = root / EXTERNAL_MODELS_YAML 252 if not external_models_path.exists(): 253 return Fix( 254 title="Add external model file", 255 edits=[], 256 create_files=[ 257 CreateFile( 258 path=external_models_path, 259 text=f"- name: '{model_name}'\n", 260 ) 261 ], 262 ) 263 264 # Figure out the position to insert the new external model at the end of the file, whether 265 # needs new line or not. 266 with open(external_models_path, "r", encoding="utf-8") as file: 267 lines = file.read() 268 269 # If a file ends in newline, we can add the new model directly. 270 split_lines = lines.splitlines() 271 if lines.endswith("\n"): 272 new_text = f"- name: '{model_name}'\n" 273 position = Position(line=len(split_lines), character=0) 274 else: 275 new_text = f"\n- name: '{model_name}'\n" 276 position = Position( 277 line=len(split_lines) - 1, character=len(split_lines[-1]) if split_lines else 0 278 ) 279 280 return Fix( 281 title="Add external model", 282 edits=[ 283 TextEdit( 284 path=external_models_path, 285 range=Range(start=position, end=position), 286 new_text=new_text, 287 ) 288 ], 289 ) 290 291 292class NoAmbiguousProjections(Rule): 293 """All projections in a model must have unique & inferrable names or explicit aliases.""" 294 295 def check_model(self, model: Model) -> t.Optional[RuleViolation]: 296 query = model.render_query() 297 if query is None: 298 return None 299 300 name_counts: t.Dict[str, int] = {} 301 projection_list = query.selects 302 for expression in projection_list: 303 alias = expression.output_name 304 if alias == "*": 305 continue 306 307 if not alias: 308 return self.violation( 309 f"Outer projection '{expression.sql(dialect=model.dialect)}' must have inferrable names or explicit aliases." 310 ) 311 312 name_counts[alias] = name_counts.get(alias, 0) + 1 313 314 for name, count in name_counts.items(): 315 if count > 1: 316 return self.violation(f"Found duplicate outer select name '{name}'") 317 318 return None 319 320 321BUILTIN_RULES = RuleSet(subclasses(__name__, Rule, exclude={Rule}))
32class NoSelectStar(Rule): 33 """Query should not contain SELECT * on its outer most projections, even if it can be expanded.""" 34 35 def check_model(self, model: Model) -> t.Optional[RuleViolation]: 36 # Only applies to SQL models, as other model types do not have a query. 37 if not isinstance(model, SqlModel): 38 return None 39 if model.query.is_star: 40 violation_range = self._get_range(model) 41 fixes = self._create_fixes(model, violation_range) 42 return self.violation(violation_range=violation_range, fixes=fixes) 43 return None 44 45 def _get_range(self, model: SqlModel) -> t.Optional[Range]: 46 """Get the range of the violation if available.""" 47 try: 48 if len(model.query.expressions) == 1 and isinstance(model.query.expressions[0], Star): 49 return TokenPositionDetails.from_meta(model.query.expressions[0].meta).to_range( 50 None 51 ) 52 except Exception: 53 pass 54 55 return None 56 57 def _create_fixes( 58 self, model: SqlModel, violation_range: t.Optional[Range] 59 ) -> t.Optional[t.List[Fix]]: 60 """Create fixes for the SELECT * violation.""" 61 if not violation_range: 62 return None 63 columns = model.columns_to_types 64 if not columns: 65 return None 66 path = model._path 67 if path is None: 68 return None 69 new_text = ", ".join(columns.keys()) 70 return [ 71 Fix( 72 title="Replace SELECT * with explicit column list", 73 edits=[ 74 TextEdit( 75 path=path, 76 range=violation_range, 77 new_text=new_text, 78 ) 79 ], 80 ) 81 ]
Query should not contain SELECT * on its outer most projections, even if it can be expanded.
35 def check_model(self, model: Model) -> t.Optional[RuleViolation]: 36 # Only applies to SQL models, as other model types do not have a query. 37 if not isinstance(model, SqlModel): 38 return None 39 if model.query.is_star: 40 violation_range = self._get_range(model) 41 fixes = self._create_fixes(model, violation_range) 42 return self.violation(violation_range=violation_range, fixes=fixes) 43 return None
The evaluation function that'll check for a violation of this rule.
Inherited Members
84class InvalidSelectStarExpansion(Rule): 85 def check_model(self, model: Model) -> t.Optional[RuleViolation]: 86 deps = model.violated_rules_for_query.get(InvalidSelectStarExpansion) 87 if not deps: 88 return None 89 90 violation_msg = ( 91 f"SELECT * cannot be expanded due to missing schema(s) for model(s): {deps}. " 92 "Run `sqlmesh create_external_models` and / or make sure that the model " 93 f"'{model.fqn}' can be rendered at parse time." 94 ) 95 96 return self.violation(violation_msg)
The base class for a rule.
85 def check_model(self, model: Model) -> t.Optional[RuleViolation]: 86 deps = model.violated_rules_for_query.get(InvalidSelectStarExpansion) 87 if not deps: 88 return None 89 90 violation_msg = ( 91 f"SELECT * cannot be expanded due to missing schema(s) for model(s): {deps}. " 92 "Run `sqlmesh create_external_models` and / or make sure that the model " 93 f"'{model.fqn}' can be rendered at parse time." 94 ) 95 96 return self.violation(violation_msg)
The evaluation function that'll check for a violation of this rule.
Inherited Members
99class AmbiguousOrInvalidColumn(Rule): 100 def check_model(self, model: Model) -> t.Optional[RuleViolation]: 101 sqlglot_err = model.violated_rules_for_query.get(AmbiguousOrInvalidColumn) 102 if not sqlglot_err: 103 return None 104 105 violation_msg = ( 106 f"{sqlglot_err} for model '{model.fqn}', the column may not exist or is ambiguous." 107 ) 108 109 return self.violation(violation_msg)
The base class for a rule.
100 def check_model(self, model: Model) -> t.Optional[RuleViolation]: 101 sqlglot_err = model.violated_rules_for_query.get(AmbiguousOrInvalidColumn) 102 if not sqlglot_err: 103 return None 104 105 violation_msg = ( 106 f"{sqlglot_err} for model '{model.fqn}', the column may not exist or is ambiguous." 107 ) 108 109 return self.violation(violation_msg)
The evaluation function that'll check for a violation of this rule.
Inherited Members
112class NoMissingAudits(Rule): 113 """Model `audits` must be configured to test data quality.""" 114 115 def check_model(self, model: Model) -> t.Optional[RuleViolation]: 116 if model.audits or model.kind.is_symbolic: 117 return None 118 if model._path is None or not str(model._path).endswith(".sql"): 119 return self.violation() 120 121 try: 122 with open(model._path, "r", encoding="utf-8") as file: 123 content = file.read() 124 125 range = get_range_of_model_block(content, model.dialect) 126 if range: 127 return self.violation(violation_range=range) 128 return self.violation() 129 except Exception: 130 return self.violation()
Model audits must be configured to test data quality.
115 def check_model(self, model: Model) -> t.Optional[RuleViolation]: 116 if model.audits or model.kind.is_symbolic: 117 return None 118 if model._path is None or not str(model._path).endswith(".sql"): 119 return self.violation() 120 121 try: 122 with open(model._path, "r", encoding="utf-8") as file: 123 content = file.read() 124 125 range = get_range_of_model_block(content, model.dialect) 126 if range: 127 return self.violation(violation_range=range) 128 return self.violation() 129 except Exception: 130 return self.violation()
The evaluation function that'll check for a violation of this rule.
Inherited Members
133class NoMissingUnitTest(Rule): 134 """All models must have a unit test found in the tests/ directory yaml files""" 135 136 def check_model(self, model: Model) -> t.Optional[RuleViolation]: 137 # External models cannot have unit tests 138 if isinstance(model, ExternalModel): 139 return None 140 141 if model.name not in self.context.models_with_tests: 142 return self.violation( 143 violation_msg=f"Model {model.name} is missing unit test(s). Please add in the tests/ directory." 144 ) 145 return None
All models must have a unit test found in the tests/ directory yaml files
136 def check_model(self, model: Model) -> t.Optional[RuleViolation]: 137 # External models cannot have unit tests 138 if isinstance(model, ExternalModel): 139 return None 140 141 if model.name not in self.context.models_with_tests: 142 return self.violation( 143 violation_msg=f"Model {model.name} is missing unit test(s). Please add in the tests/ directory." 144 ) 145 return None
The evaluation function that'll check for a violation of this rule.
Inherited Members
148class NoMissingExternalModels(Rule): 149 """All external models must be registered in the external_models.yaml file""" 150 151 def check_model( 152 self, model: Model 153 ) -> t.Optional[t.Union[RuleViolation, t.List[RuleViolation]]]: 154 # Ignore external models themselves, because either they are registered, 155 # and if they are not, they will be caught as referenced in another model. 156 if isinstance(model, ExternalModel): 157 return None 158 159 # Handle other models that may refer to the external models. 160 not_registered_external_models: t.Set[str] = set() 161 for depends_on_model in model.depends_on: 162 existing_model = self.context.get_model(depends_on_model) 163 if existing_model is None: 164 not_registered_external_models.add(depends_on_model) 165 166 if not not_registered_external_models: 167 return None 168 169 # If the model is anything other than a sql model that and has a path 170 # that ends with .sql, we cannot extract the references from the query. 171 path = model._path 172 if not isinstance(model, SqlModel) or not path or not str(path).endswith(".sql"): 173 return self._standard_error_message( 174 model_name=model.fqn, 175 external_models=not_registered_external_models, 176 ) 177 178 with open(path, "r", encoding="utf-8") as file: 179 read_file = file.read() 180 split_read_file = read_file.splitlines() 181 182 # If there are any unregistered external models, return a violation find 183 # the ranges for them. 184 references = extract_references_from_query( 185 query=model.query, 186 context=self.context, 187 document_path=path, 188 read_file=split_read_file, 189 depends_on=model.depends_on, 190 dialect=model.dialect, 191 ) 192 external_references = { 193 normalize_model_name( 194 table=read_range_from_string(read_file, ref.range), 195 default_catalog=model.default_catalog, 196 dialect=model.dialect, 197 ): ref 198 for ref in references 199 if isinstance(ref, ExternalModelReference) and ref.path is None 200 } 201 202 # Ensure that depends_on and external references match. 203 if not_registered_external_models != set(external_references.keys()): 204 return self._standard_error_message( 205 model_name=model.fqn, 206 external_models=not_registered_external_models, 207 ) 208 209 # Return a violation for each unregistered external model with its range. 210 violations = [] 211 for ref_name, ref in external_references.items(): 212 if ref_name in not_registered_external_models: 213 fix = self.create_fix(ref_name) 214 violations.append( 215 RuleViolation( 216 rule=self, 217 violation_msg=f"Model '{model.fqn}' depends on unregistered external model '{ref_name}'. " 218 "Please register it in the external models file. This can be done by running 'sqlmesh create_external_models'.", 219 violation_range=ref.range, 220 fixes=[fix] if fix else [], 221 ) 222 ) 223 224 if len(violations) < len(not_registered_external_models): 225 return self._standard_error_message( 226 model_name=model.fqn, 227 external_models=not_registered_external_models, 228 ) 229 230 return violations 231 232 def _standard_error_message( 233 self, model_name: str, external_models: t.Set[str] 234 ) -> RuleViolation: 235 return RuleViolation( 236 rule=self, 237 violation_msg=f"Model '{model_name}' depends on unregistered external models: " 238 f"{', '.join(m for m in external_models)}. " 239 "Please register them in the external models file. This can be done by running 'sqlmesh create_external_models'.", 240 ) 241 242 def create_fix(self, model_name: str) -> t.Optional[Fix]: 243 """ 244 Add an external model to the external models file. 245 - If no external models file exists, it will create one with the model. 246 - If the model already exists, it will not add it again. 247 """ 248 root = self.context.path 249 if not root: 250 return None 251 252 external_models_path = root / EXTERNAL_MODELS_YAML 253 if not external_models_path.exists(): 254 return Fix( 255 title="Add external model file", 256 edits=[], 257 create_files=[ 258 CreateFile( 259 path=external_models_path, 260 text=f"- name: '{model_name}'\n", 261 ) 262 ], 263 ) 264 265 # Figure out the position to insert the new external model at the end of the file, whether 266 # needs new line or not. 267 with open(external_models_path, "r", encoding="utf-8") as file: 268 lines = file.read() 269 270 # If a file ends in newline, we can add the new model directly. 271 split_lines = lines.splitlines() 272 if lines.endswith("\n"): 273 new_text = f"- name: '{model_name}'\n" 274 position = Position(line=len(split_lines), character=0) 275 else: 276 new_text = f"\n- name: '{model_name}'\n" 277 position = Position( 278 line=len(split_lines) - 1, character=len(split_lines[-1]) if split_lines else 0 279 ) 280 281 return Fix( 282 title="Add external model", 283 edits=[ 284 TextEdit( 285 path=external_models_path, 286 range=Range(start=position, end=position), 287 new_text=new_text, 288 ) 289 ], 290 )
All external models must be registered in the external_models.yaml file
151 def check_model( 152 self, model: Model 153 ) -> t.Optional[t.Union[RuleViolation, t.List[RuleViolation]]]: 154 # Ignore external models themselves, because either they are registered, 155 # and if they are not, they will be caught as referenced in another model. 156 if isinstance(model, ExternalModel): 157 return None 158 159 # Handle other models that may refer to the external models. 160 not_registered_external_models: t.Set[str] = set() 161 for depends_on_model in model.depends_on: 162 existing_model = self.context.get_model(depends_on_model) 163 if existing_model is None: 164 not_registered_external_models.add(depends_on_model) 165 166 if not not_registered_external_models: 167 return None 168 169 # If the model is anything other than a sql model that and has a path 170 # that ends with .sql, we cannot extract the references from the query. 171 path = model._path 172 if not isinstance(model, SqlModel) or not path or not str(path).endswith(".sql"): 173 return self._standard_error_message( 174 model_name=model.fqn, 175 external_models=not_registered_external_models, 176 ) 177 178 with open(path, "r", encoding="utf-8") as file: 179 read_file = file.read() 180 split_read_file = read_file.splitlines() 181 182 # If there are any unregistered external models, return a violation find 183 # the ranges for them. 184 references = extract_references_from_query( 185 query=model.query, 186 context=self.context, 187 document_path=path, 188 read_file=split_read_file, 189 depends_on=model.depends_on, 190 dialect=model.dialect, 191 ) 192 external_references = { 193 normalize_model_name( 194 table=read_range_from_string(read_file, ref.range), 195 default_catalog=model.default_catalog, 196 dialect=model.dialect, 197 ): ref 198 for ref in references 199 if isinstance(ref, ExternalModelReference) and ref.path is None 200 } 201 202 # Ensure that depends_on and external references match. 203 if not_registered_external_models != set(external_references.keys()): 204 return self._standard_error_message( 205 model_name=model.fqn, 206 external_models=not_registered_external_models, 207 ) 208 209 # Return a violation for each unregistered external model with its range. 210 violations = [] 211 for ref_name, ref in external_references.items(): 212 if ref_name in not_registered_external_models: 213 fix = self.create_fix(ref_name) 214 violations.append( 215 RuleViolation( 216 rule=self, 217 violation_msg=f"Model '{model.fqn}' depends on unregistered external model '{ref_name}'. " 218 "Please register it in the external models file. This can be done by running 'sqlmesh create_external_models'.", 219 violation_range=ref.range, 220 fixes=[fix] if fix else [], 221 ) 222 ) 223 224 if len(violations) < len(not_registered_external_models): 225 return self._standard_error_message( 226 model_name=model.fqn, 227 external_models=not_registered_external_models, 228 ) 229 230 return violations
The evaluation function that'll check for a violation of this rule.
242 def create_fix(self, model_name: str) -> t.Optional[Fix]: 243 """ 244 Add an external model to the external models file. 245 - If no external models file exists, it will create one with the model. 246 - If the model already exists, it will not add it again. 247 """ 248 root = self.context.path 249 if not root: 250 return None 251 252 external_models_path = root / EXTERNAL_MODELS_YAML 253 if not external_models_path.exists(): 254 return Fix( 255 title="Add external model file", 256 edits=[], 257 create_files=[ 258 CreateFile( 259 path=external_models_path, 260 text=f"- name: '{model_name}'\n", 261 ) 262 ], 263 ) 264 265 # Figure out the position to insert the new external model at the end of the file, whether 266 # needs new line or not. 267 with open(external_models_path, "r", encoding="utf-8") as file: 268 lines = file.read() 269 270 # If a file ends in newline, we can add the new model directly. 271 split_lines = lines.splitlines() 272 if lines.endswith("\n"): 273 new_text = f"- name: '{model_name}'\n" 274 position = Position(line=len(split_lines), character=0) 275 else: 276 new_text = f"\n- name: '{model_name}'\n" 277 position = Position( 278 line=len(split_lines) - 1, character=len(split_lines[-1]) if split_lines else 0 279 ) 280 281 return Fix( 282 title="Add external model", 283 edits=[ 284 TextEdit( 285 path=external_models_path, 286 range=Range(start=position, end=position), 287 new_text=new_text, 288 ) 289 ], 290 )
Add an external model to the external models file.
- If no external models file exists, it will create one with the model.
- If the model already exists, it will not add it again.
Inherited Members
293class NoAmbiguousProjections(Rule): 294 """All projections in a model must have unique & inferrable names or explicit aliases.""" 295 296 def check_model(self, model: Model) -> t.Optional[RuleViolation]: 297 query = model.render_query() 298 if query is None: 299 return None 300 301 name_counts: t.Dict[str, int] = {} 302 projection_list = query.selects 303 for expression in projection_list: 304 alias = expression.output_name 305 if alias == "*": 306 continue 307 308 if not alias: 309 return self.violation( 310 f"Outer projection '{expression.sql(dialect=model.dialect)}' must have inferrable names or explicit aliases." 311 ) 312 313 name_counts[alias] = name_counts.get(alias, 0) + 1 314 315 for name, count in name_counts.items(): 316 if count > 1: 317 return self.violation(f"Found duplicate outer select name '{name}'") 318 319 return None
All projections in a model must have unique & inferrable names or explicit aliases.
296 def check_model(self, model: Model) -> t.Optional[RuleViolation]: 297 query = model.render_query() 298 if query is None: 299 return None 300 301 name_counts: t.Dict[str, int] = {} 302 projection_list = query.selects 303 for expression in projection_list: 304 alias = expression.output_name 305 if alias == "*": 306 continue 307 308 if not alias: 309 return self.violation( 310 f"Outer projection '{expression.sql(dialect=model.dialect)}' must have inferrable names or explicit aliases." 311 ) 312 313 name_counts[alias] = name_counts.get(alias, 0) + 1 314 315 for name, count in name_counts.items(): 316 if count > 1: 317 return self.violation(f"Found duplicate outer select name '{name}'") 318 319 return None
The evaluation function that'll check for a violation of this rule.