Edit on GitHub

Contains all the standard rules included with SQLMesh

  1"""Contains all the standard rules included with SQLMesh"""
  2
  3from __future__ import annotations
  4
  5import typing as t
  6
  7from sqlglot.expressions import Star
  8from sqlglot.helper import subclasses
  9
 10from sqlmesh.core.constants import EXTERNAL_MODELS_YAML
 11from sqlmesh.core.dialect import normalize_model_name
 12from sqlmesh.core.linter.helpers import (
 13    TokenPositionDetails,
 14    get_range_of_model_block,
 15    read_range_from_string,
 16)
 17from sqlmesh.core.linter.rule import (
 18    Rule,
 19    RuleViolation,
 20    Range,
 21    Fix,
 22    TextEdit,
 23    Position,
 24    CreateFile,
 25)
 26from sqlmesh.core.linter.definition import RuleSet
 27from sqlmesh.core.model import Model, SqlModel, ExternalModel
 28from sqlmesh.utils.lineage import extract_references_from_query, ExternalModelReference
 29
 30
 31class NoSelectStar(Rule):
 32    """Query should not contain SELECT * on its outer most projections, even if it can be expanded."""
 33
 34    def check_model(self, model: Model) -> t.Optional[RuleViolation]:
 35        # Only applies to SQL models, as other model types do not have a query.
 36        if not isinstance(model, SqlModel):
 37            return None
 38        if model.query.is_star:
 39            violation_range = self._get_range(model)
 40            fixes = self._create_fixes(model, violation_range)
 41            return self.violation(violation_range=violation_range, fixes=fixes)
 42        return None
 43
 44    def _get_range(self, model: SqlModel) -> t.Optional[Range]:
 45        """Get the range of the violation if available."""
 46        try:
 47            if len(model.query.expressions) == 1 and isinstance(model.query.expressions[0], Star):
 48                return TokenPositionDetails.from_meta(model.query.expressions[0].meta).to_range(
 49                    None
 50                )
 51        except Exception:
 52            pass
 53
 54        return None
 55
 56    def _create_fixes(
 57        self, model: SqlModel, violation_range: t.Optional[Range]
 58    ) -> t.Optional[t.List[Fix]]:
 59        """Create fixes for the SELECT * violation."""
 60        if not violation_range:
 61            return None
 62        columns = model.columns_to_types
 63        if not columns:
 64            return None
 65        path = model._path
 66        if path is None:
 67            return None
 68        new_text = ", ".join(columns.keys())
 69        return [
 70            Fix(
 71                title="Replace SELECT * with explicit column list",
 72                edits=[
 73                    TextEdit(
 74                        path=path,
 75                        range=violation_range,
 76                        new_text=new_text,
 77                    )
 78                ],
 79            )
 80        ]
 81
 82
 83class InvalidSelectStarExpansion(Rule):
 84    def check_model(self, model: Model) -> t.Optional[RuleViolation]:
 85        deps = model.violated_rules_for_query.get(InvalidSelectStarExpansion)
 86        if not deps:
 87            return None
 88
 89        violation_msg = (
 90            f"SELECT * cannot be expanded due to missing schema(s) for model(s): {deps}. "
 91            "Run `sqlmesh create_external_models` and / or make sure that the model "
 92            f"'{model.fqn}' can be rendered at parse time."
 93        )
 94
 95        return self.violation(violation_msg)
 96
 97
 98class AmbiguousOrInvalidColumn(Rule):
 99    def check_model(self, model: Model) -> t.Optional[RuleViolation]:
100        sqlglot_err = model.violated_rules_for_query.get(AmbiguousOrInvalidColumn)
101        if not sqlglot_err:
102            return None
103
104        violation_msg = (
105            f"{sqlglot_err} for model '{model.fqn}', the column may not exist or is ambiguous."
106        )
107
108        return self.violation(violation_msg)
109
110
111class NoMissingAudits(Rule):
112    """Model `audits` must be configured to test data quality."""
113
114    def check_model(self, model: Model) -> t.Optional[RuleViolation]:
115        if model.audits or model.kind.is_symbolic:
116            return None
117        if model._path is None or not str(model._path).endswith(".sql"):
118            return self.violation()
119
120        try:
121            with open(model._path, "r", encoding="utf-8") as file:
122                content = file.read()
123
124            range = get_range_of_model_block(content, model.dialect)
125            if range:
126                return self.violation(violation_range=range)
127            return self.violation()
128        except Exception:
129            return self.violation()
130
131
132class NoMissingUnitTest(Rule):
133    """All models must have a unit test found in the tests/ directory yaml files"""
134
135    def check_model(self, model: Model) -> t.Optional[RuleViolation]:
136        #  External models cannot have unit tests
137        if isinstance(model, ExternalModel):
138            return None
139
140        if model.name not in self.context.models_with_tests:
141            return self.violation(
142                violation_msg=f"Model {model.name} is missing unit test(s). Please add in the tests/ directory."
143            )
144        return None
145
146
147class NoMissingExternalModels(Rule):
148    """All external models must be registered in the external_models.yaml file"""
149
150    def check_model(
151        self, model: Model
152    ) -> t.Optional[t.Union[RuleViolation, t.List[RuleViolation]]]:
153        # Ignore external models themselves, because either they are registered,
154        # and if they are not, they will be caught as referenced in another model.
155        if isinstance(model, ExternalModel):
156            return None
157
158        # Handle other models that may refer to the external models.
159        not_registered_external_models: t.Set[str] = set()
160        for depends_on_model in model.depends_on:
161            existing_model = self.context.get_model(depends_on_model)
162            if existing_model is None:
163                not_registered_external_models.add(depends_on_model)
164
165        if not not_registered_external_models:
166            return None
167
168        # If the model is anything other than a sql model that and has a path
169        # that ends with .sql, we cannot extract the references from the query.
170        path = model._path
171        if not isinstance(model, SqlModel) or not path or not str(path).endswith(".sql"):
172            return self._standard_error_message(
173                model_name=model.fqn,
174                external_models=not_registered_external_models,
175            )
176
177        with open(path, "r", encoding="utf-8") as file:
178            read_file = file.read()
179        split_read_file = read_file.splitlines()
180
181        # If there are any unregistered external models, return a violation find
182        # the ranges for them.
183        references = extract_references_from_query(
184            query=model.query,
185            context=self.context,
186            document_path=path,
187            read_file=split_read_file,
188            depends_on=model.depends_on,
189            dialect=model.dialect,
190        )
191        external_references = {
192            normalize_model_name(
193                table=read_range_from_string(read_file, ref.range),
194                default_catalog=model.default_catalog,
195                dialect=model.dialect,
196            ): ref
197            for ref in references
198            if isinstance(ref, ExternalModelReference) and ref.path is None
199        }
200
201        # Ensure that depends_on and external references match.
202        if not_registered_external_models != set(external_references.keys()):
203            return self._standard_error_message(
204                model_name=model.fqn,
205                external_models=not_registered_external_models,
206            )
207
208        # Return a violation for each unregistered external model with its range.
209        violations = []
210        for ref_name, ref in external_references.items():
211            if ref_name in not_registered_external_models:
212                fix = self.create_fix(ref_name)
213                violations.append(
214                    RuleViolation(
215                        rule=self,
216                        violation_msg=f"Model '{model.fqn}' depends on unregistered external model '{ref_name}'. "
217                        "Please register it in the external models file. This can be done by running 'sqlmesh create_external_models'.",
218                        violation_range=ref.range,
219                        fixes=[fix] if fix else [],
220                    )
221                )
222
223        if len(violations) < len(not_registered_external_models):
224            return self._standard_error_message(
225                model_name=model.fqn,
226                external_models=not_registered_external_models,
227            )
228
229        return violations
230
231    def _standard_error_message(
232        self, model_name: str, external_models: t.Set[str]
233    ) -> RuleViolation:
234        return RuleViolation(
235            rule=self,
236            violation_msg=f"Model '{model_name}' depends on unregistered external models: "
237            f"{', '.join(m for m in external_models)}. "
238            "Please register them in the external models file. This can be done by running 'sqlmesh create_external_models'.",
239        )
240
241    def create_fix(self, model_name: str) -> t.Optional[Fix]:
242        """
243        Add an external model to the external models file.
244        - If no external models file exists, it will create one with the model.
245        - If the model already exists, it will not add it again.
246        """
247        root = self.context.path
248        if not root:
249            return None
250
251        external_models_path = root / EXTERNAL_MODELS_YAML
252        if not external_models_path.exists():
253            return Fix(
254                title="Add external model file",
255                edits=[],
256                create_files=[
257                    CreateFile(
258                        path=external_models_path,
259                        text=f"- name: '{model_name}'\n",
260                    )
261                ],
262            )
263
264        # Figure out the position to insert the new external model at the end of the file, whether
265        # needs new line or not.
266        with open(external_models_path, "r", encoding="utf-8") as file:
267            lines = file.read()
268
269        # If a file ends in newline, we can add the new model directly.
270        split_lines = lines.splitlines()
271        if lines.endswith("\n"):
272            new_text = f"- name: '{model_name}'\n"
273            position = Position(line=len(split_lines), character=0)
274        else:
275            new_text = f"\n- name: '{model_name}'\n"
276            position = Position(
277                line=len(split_lines) - 1, character=len(split_lines[-1]) if split_lines else 0
278            )
279
280        return Fix(
281            title="Add external model",
282            edits=[
283                TextEdit(
284                    path=external_models_path,
285                    range=Range(start=position, end=position),
286                    new_text=new_text,
287                )
288            ],
289        )
290
291
292class NoAmbiguousProjections(Rule):
293    """All projections in a model must have unique & inferrable names or explicit aliases."""
294
295    def check_model(self, model: Model) -> t.Optional[RuleViolation]:
296        query = model.render_query()
297        if query is None:
298            return None
299
300        name_counts: t.Dict[str, int] = {}
301        projection_list = query.selects
302        for expression in projection_list:
303            alias = expression.output_name
304            if alias == "*":
305                continue
306
307            if not alias:
308                return self.violation(
309                    f"Outer projection '{expression.sql(dialect=model.dialect)}' must have inferrable names or explicit aliases."
310                )
311
312            name_counts[alias] = name_counts.get(alias, 0) + 1
313
314        for name, count in name_counts.items():
315            if count > 1:
316                return self.violation(f"Found duplicate outer select name '{name}'")
317
318        return None
319
320
321BUILTIN_RULES = RuleSet(subclasses(__name__, Rule, exclude={Rule}))
class NoSelectStar(sqlmesh.core.linter.rule.Rule):
32class NoSelectStar(Rule):
33    """Query should not contain SELECT * on its outer most projections, even if it can be expanded."""
34
35    def check_model(self, model: Model) -> t.Optional[RuleViolation]:
36        # Only applies to SQL models, as other model types do not have a query.
37        if not isinstance(model, SqlModel):
38            return None
39        if model.query.is_star:
40            violation_range = self._get_range(model)
41            fixes = self._create_fixes(model, violation_range)
42            return self.violation(violation_range=violation_range, fixes=fixes)
43        return None
44
45    def _get_range(self, model: SqlModel) -> t.Optional[Range]:
46        """Get the range of the violation if available."""
47        try:
48            if len(model.query.expressions) == 1 and isinstance(model.query.expressions[0], Star):
49                return TokenPositionDetails.from_meta(model.query.expressions[0].meta).to_range(
50                    None
51                )
52        except Exception:
53            pass
54
55        return None
56
57    def _create_fixes(
58        self, model: SqlModel, violation_range: t.Optional[Range]
59    ) -> t.Optional[t.List[Fix]]:
60        """Create fixes for the SELECT * violation."""
61        if not violation_range:
62            return None
63        columns = model.columns_to_types
64        if not columns:
65            return None
66        path = model._path
67        if path is None:
68            return None
69        new_text = ", ".join(columns.keys())
70        return [
71            Fix(
72                title="Replace SELECT * with explicit column list",
73                edits=[
74                    TextEdit(
75                        path=path,
76                        range=violation_range,
77                        new_text=new_text,
78                    )
79                ],
80            )
81        ]

Query should not contain SELECT * on its outer most projections, even if it can be expanded.

35    def check_model(self, model: Model) -> t.Optional[RuleViolation]:
36        # Only applies to SQL models, as other model types do not have a query.
37        if not isinstance(model, SqlModel):
38            return None
39        if model.query.is_star:
40            violation_range = self._get_range(model)
41            fixes = self._create_fixes(model, violation_range)
42            return self.violation(violation_range=violation_range, fixes=fixes)
43        return None

The evaluation function that'll check for a violation of this rule.

name = 'noselectstar'
class InvalidSelectStarExpansion(sqlmesh.core.linter.rule.Rule):
84class InvalidSelectStarExpansion(Rule):
85    def check_model(self, model: Model) -> t.Optional[RuleViolation]:
86        deps = model.violated_rules_for_query.get(InvalidSelectStarExpansion)
87        if not deps:
88            return None
89
90        violation_msg = (
91            f"SELECT * cannot be expanded due to missing schema(s) for model(s): {deps}. "
92            "Run `sqlmesh create_external_models` and / or make sure that the model "
93            f"'{model.fqn}' can be rendered at parse time."
94        )
95
96        return self.violation(violation_msg)

The base class for a rule.

85    def check_model(self, model: Model) -> t.Optional[RuleViolation]:
86        deps = model.violated_rules_for_query.get(InvalidSelectStarExpansion)
87        if not deps:
88            return None
89
90        violation_msg = (
91            f"SELECT * cannot be expanded due to missing schema(s) for model(s): {deps}. "
92            "Run `sqlmesh create_external_models` and / or make sure that the model "
93            f"'{model.fqn}' can be rendered at parse time."
94        )
95
96        return self.violation(violation_msg)

The evaluation function that'll check for a violation of this rule.

name = 'invalidselectstarexpansion'
class AmbiguousOrInvalidColumn(sqlmesh.core.linter.rule.Rule):
 99class AmbiguousOrInvalidColumn(Rule):
100    def check_model(self, model: Model) -> t.Optional[RuleViolation]:
101        sqlglot_err = model.violated_rules_for_query.get(AmbiguousOrInvalidColumn)
102        if not sqlglot_err:
103            return None
104
105        violation_msg = (
106            f"{sqlglot_err} for model '{model.fqn}', the column may not exist or is ambiguous."
107        )
108
109        return self.violation(violation_msg)

The base class for a rule.

100    def check_model(self, model: Model) -> t.Optional[RuleViolation]:
101        sqlglot_err = model.violated_rules_for_query.get(AmbiguousOrInvalidColumn)
102        if not sqlglot_err:
103            return None
104
105        violation_msg = (
106            f"{sqlglot_err} for model '{model.fqn}', the column may not exist or is ambiguous."
107        )
108
109        return self.violation(violation_msg)

The evaluation function that'll check for a violation of this rule.

name = 'ambiguousorinvalidcolumn'
class NoMissingAudits(sqlmesh.core.linter.rule.Rule):
112class NoMissingAudits(Rule):
113    """Model `audits` must be configured to test data quality."""
114
115    def check_model(self, model: Model) -> t.Optional[RuleViolation]:
116        if model.audits or model.kind.is_symbolic:
117            return None
118        if model._path is None or not str(model._path).endswith(".sql"):
119            return self.violation()
120
121        try:
122            with open(model._path, "r", encoding="utf-8") as file:
123                content = file.read()
124
125            range = get_range_of_model_block(content, model.dialect)
126            if range:
127                return self.violation(violation_range=range)
128            return self.violation()
129        except Exception:
130            return self.violation()

Model audits must be configured to test data quality.

115    def check_model(self, model: Model) -> t.Optional[RuleViolation]:
116        if model.audits or model.kind.is_symbolic:
117            return None
118        if model._path is None or not str(model._path).endswith(".sql"):
119            return self.violation()
120
121        try:
122            with open(model._path, "r", encoding="utf-8") as file:
123                content = file.read()
124
125            range = get_range_of_model_block(content, model.dialect)
126            if range:
127                return self.violation(violation_range=range)
128            return self.violation()
129        except Exception:
130            return self.violation()

The evaluation function that'll check for a violation of this rule.

name = 'nomissingaudits'
class NoMissingUnitTest(sqlmesh.core.linter.rule.Rule):
133class NoMissingUnitTest(Rule):
134    """All models must have a unit test found in the tests/ directory yaml files"""
135
136    def check_model(self, model: Model) -> t.Optional[RuleViolation]:
137        #  External models cannot have unit tests
138        if isinstance(model, ExternalModel):
139            return None
140
141        if model.name not in self.context.models_with_tests:
142            return self.violation(
143                violation_msg=f"Model {model.name} is missing unit test(s). Please add in the tests/ directory."
144            )
145        return None

All models must have a unit test found in the tests/ directory yaml files

136    def check_model(self, model: Model) -> t.Optional[RuleViolation]:
137        #  External models cannot have unit tests
138        if isinstance(model, ExternalModel):
139            return None
140
141        if model.name not in self.context.models_with_tests:
142            return self.violation(
143                violation_msg=f"Model {model.name} is missing unit test(s). Please add in the tests/ directory."
144            )
145        return None

The evaluation function that'll check for a violation of this rule.

name = 'nomissingunittest'
class NoMissingExternalModels(sqlmesh.core.linter.rule.Rule):
148class NoMissingExternalModels(Rule):
149    """All external models must be registered in the external_models.yaml file"""
150
151    def check_model(
152        self, model: Model
153    ) -> t.Optional[t.Union[RuleViolation, t.List[RuleViolation]]]:
154        # Ignore external models themselves, because either they are registered,
155        # and if they are not, they will be caught as referenced in another model.
156        if isinstance(model, ExternalModel):
157            return None
158
159        # Handle other models that may refer to the external models.
160        not_registered_external_models: t.Set[str] = set()
161        for depends_on_model in model.depends_on:
162            existing_model = self.context.get_model(depends_on_model)
163            if existing_model is None:
164                not_registered_external_models.add(depends_on_model)
165
166        if not not_registered_external_models:
167            return None
168
169        # If the model is anything other than a sql model that and has a path
170        # that ends with .sql, we cannot extract the references from the query.
171        path = model._path
172        if not isinstance(model, SqlModel) or not path or not str(path).endswith(".sql"):
173            return self._standard_error_message(
174                model_name=model.fqn,
175                external_models=not_registered_external_models,
176            )
177
178        with open(path, "r", encoding="utf-8") as file:
179            read_file = file.read()
180        split_read_file = read_file.splitlines()
181
182        # If there are any unregistered external models, return a violation find
183        # the ranges for them.
184        references = extract_references_from_query(
185            query=model.query,
186            context=self.context,
187            document_path=path,
188            read_file=split_read_file,
189            depends_on=model.depends_on,
190            dialect=model.dialect,
191        )
192        external_references = {
193            normalize_model_name(
194                table=read_range_from_string(read_file, ref.range),
195                default_catalog=model.default_catalog,
196                dialect=model.dialect,
197            ): ref
198            for ref in references
199            if isinstance(ref, ExternalModelReference) and ref.path is None
200        }
201
202        # Ensure that depends_on and external references match.
203        if not_registered_external_models != set(external_references.keys()):
204            return self._standard_error_message(
205                model_name=model.fqn,
206                external_models=not_registered_external_models,
207            )
208
209        # Return a violation for each unregistered external model with its range.
210        violations = []
211        for ref_name, ref in external_references.items():
212            if ref_name in not_registered_external_models:
213                fix = self.create_fix(ref_name)
214                violations.append(
215                    RuleViolation(
216                        rule=self,
217                        violation_msg=f"Model '{model.fqn}' depends on unregistered external model '{ref_name}'. "
218                        "Please register it in the external models file. This can be done by running 'sqlmesh create_external_models'.",
219                        violation_range=ref.range,
220                        fixes=[fix] if fix else [],
221                    )
222                )
223
224        if len(violations) < len(not_registered_external_models):
225            return self._standard_error_message(
226                model_name=model.fqn,
227                external_models=not_registered_external_models,
228            )
229
230        return violations
231
232    def _standard_error_message(
233        self, model_name: str, external_models: t.Set[str]
234    ) -> RuleViolation:
235        return RuleViolation(
236            rule=self,
237            violation_msg=f"Model '{model_name}' depends on unregistered external models: "
238            f"{', '.join(m for m in external_models)}. "
239            "Please register them in the external models file. This can be done by running 'sqlmesh create_external_models'.",
240        )
241
242    def create_fix(self, model_name: str) -> t.Optional[Fix]:
243        """
244        Add an external model to the external models file.
245        - If no external models file exists, it will create one with the model.
246        - If the model already exists, it will not add it again.
247        """
248        root = self.context.path
249        if not root:
250            return None
251
252        external_models_path = root / EXTERNAL_MODELS_YAML
253        if not external_models_path.exists():
254            return Fix(
255                title="Add external model file",
256                edits=[],
257                create_files=[
258                    CreateFile(
259                        path=external_models_path,
260                        text=f"- name: '{model_name}'\n",
261                    )
262                ],
263            )
264
265        # Figure out the position to insert the new external model at the end of the file, whether
266        # needs new line or not.
267        with open(external_models_path, "r", encoding="utf-8") as file:
268            lines = file.read()
269
270        # If a file ends in newline, we can add the new model directly.
271        split_lines = lines.splitlines()
272        if lines.endswith("\n"):
273            new_text = f"- name: '{model_name}'\n"
274            position = Position(line=len(split_lines), character=0)
275        else:
276            new_text = f"\n- name: '{model_name}'\n"
277            position = Position(
278                line=len(split_lines) - 1, character=len(split_lines[-1]) if split_lines else 0
279            )
280
281        return Fix(
282            title="Add external model",
283            edits=[
284                TextEdit(
285                    path=external_models_path,
286                    range=Range(start=position, end=position),
287                    new_text=new_text,
288                )
289            ],
290        )

All external models must be registered in the external_models.yaml file

151    def check_model(
152        self, model: Model
153    ) -> t.Optional[t.Union[RuleViolation, t.List[RuleViolation]]]:
154        # Ignore external models themselves, because either they are registered,
155        # and if they are not, they will be caught as referenced in another model.
156        if isinstance(model, ExternalModel):
157            return None
158
159        # Handle other models that may refer to the external models.
160        not_registered_external_models: t.Set[str] = set()
161        for depends_on_model in model.depends_on:
162            existing_model = self.context.get_model(depends_on_model)
163            if existing_model is None:
164                not_registered_external_models.add(depends_on_model)
165
166        if not not_registered_external_models:
167            return None
168
169        # If the model is anything other than a sql model that and has a path
170        # that ends with .sql, we cannot extract the references from the query.
171        path = model._path
172        if not isinstance(model, SqlModel) or not path or not str(path).endswith(".sql"):
173            return self._standard_error_message(
174                model_name=model.fqn,
175                external_models=not_registered_external_models,
176            )
177
178        with open(path, "r", encoding="utf-8") as file:
179            read_file = file.read()
180        split_read_file = read_file.splitlines()
181
182        # If there are any unregistered external models, return a violation find
183        # the ranges for them.
184        references = extract_references_from_query(
185            query=model.query,
186            context=self.context,
187            document_path=path,
188            read_file=split_read_file,
189            depends_on=model.depends_on,
190            dialect=model.dialect,
191        )
192        external_references = {
193            normalize_model_name(
194                table=read_range_from_string(read_file, ref.range),
195                default_catalog=model.default_catalog,
196                dialect=model.dialect,
197            ): ref
198            for ref in references
199            if isinstance(ref, ExternalModelReference) and ref.path is None
200        }
201
202        # Ensure that depends_on and external references match.
203        if not_registered_external_models != set(external_references.keys()):
204            return self._standard_error_message(
205                model_name=model.fqn,
206                external_models=not_registered_external_models,
207            )
208
209        # Return a violation for each unregistered external model with its range.
210        violations = []
211        for ref_name, ref in external_references.items():
212            if ref_name in not_registered_external_models:
213                fix = self.create_fix(ref_name)
214                violations.append(
215                    RuleViolation(
216                        rule=self,
217                        violation_msg=f"Model '{model.fqn}' depends on unregistered external model '{ref_name}'. "
218                        "Please register it in the external models file. This can be done by running 'sqlmesh create_external_models'.",
219                        violation_range=ref.range,
220                        fixes=[fix] if fix else [],
221                    )
222                )
223
224        if len(violations) < len(not_registered_external_models):
225            return self._standard_error_message(
226                model_name=model.fqn,
227                external_models=not_registered_external_models,
228            )
229
230        return violations

The evaluation function that'll check for a violation of this rule.

def create_fix(self, model_name: str) -> Optional[sqlmesh.core.linter.rule.Fix]:
242    def create_fix(self, model_name: str) -> t.Optional[Fix]:
243        """
244        Add an external model to the external models file.
245        - If no external models file exists, it will create one with the model.
246        - If the model already exists, it will not add it again.
247        """
248        root = self.context.path
249        if not root:
250            return None
251
252        external_models_path = root / EXTERNAL_MODELS_YAML
253        if not external_models_path.exists():
254            return Fix(
255                title="Add external model file",
256                edits=[],
257                create_files=[
258                    CreateFile(
259                        path=external_models_path,
260                        text=f"- name: '{model_name}'\n",
261                    )
262                ],
263            )
264
265        # Figure out the position to insert the new external model at the end of the file, whether
266        # needs new line or not.
267        with open(external_models_path, "r", encoding="utf-8") as file:
268            lines = file.read()
269
270        # If a file ends in newline, we can add the new model directly.
271        split_lines = lines.splitlines()
272        if lines.endswith("\n"):
273            new_text = f"- name: '{model_name}'\n"
274            position = Position(line=len(split_lines), character=0)
275        else:
276            new_text = f"\n- name: '{model_name}'\n"
277            position = Position(
278                line=len(split_lines) - 1, character=len(split_lines[-1]) if split_lines else 0
279            )
280
281        return Fix(
282            title="Add external model",
283            edits=[
284                TextEdit(
285                    path=external_models_path,
286                    range=Range(start=position, end=position),
287                    new_text=new_text,
288                )
289            ],
290        )

Add an external model to the external models file.

  • If no external models file exists, it will create one with the model.
  • If the model already exists, it will not add it again.
name = 'nomissingexternalmodels'
class NoAmbiguousProjections(sqlmesh.core.linter.rule.Rule):
293class NoAmbiguousProjections(Rule):
294    """All projections in a model must have unique & inferrable names or explicit aliases."""
295
296    def check_model(self, model: Model) -> t.Optional[RuleViolation]:
297        query = model.render_query()
298        if query is None:
299            return None
300
301        name_counts: t.Dict[str, int] = {}
302        projection_list = query.selects
303        for expression in projection_list:
304            alias = expression.output_name
305            if alias == "*":
306                continue
307
308            if not alias:
309                return self.violation(
310                    f"Outer projection '{expression.sql(dialect=model.dialect)}' must have inferrable names or explicit aliases."
311                )
312
313            name_counts[alias] = name_counts.get(alias, 0) + 1
314
315        for name, count in name_counts.items():
316            if count > 1:
317                return self.violation(f"Found duplicate outer select name '{name}'")
318
319        return None

All projections in a model must have unique & inferrable names or explicit aliases.

296    def check_model(self, model: Model) -> t.Optional[RuleViolation]:
297        query = model.render_query()
298        if query is None:
299            return None
300
301        name_counts: t.Dict[str, int] = {}
302        projection_list = query.selects
303        for expression in projection_list:
304            alias = expression.output_name
305            if alias == "*":
306                continue
307
308            if not alias:
309                return self.violation(
310                    f"Outer projection '{expression.sql(dialect=model.dialect)}' must have inferrable names or explicit aliases."
311                )
312
313            name_counts[alias] = name_counts.get(alias, 0) + 1
314
315        for name, count in name_counts.items():
316            if count > 1:
317                return self.violation(f"Found duplicate outer select name '{name}'")
318
319        return None

The evaluation function that'll check for a violation of this rule.

name = 'noambiguousprojections'
BUILTIN_RULES = <sqlmesh.core.linter.definition.RuleSet object>