Edit on GitHub

sqlmesh.utils.lineage

  1import typing as t
  2from pathlib import Path
  3
  4from pydantic import Field
  5
  6from sqlmesh.core.dialect import normalize_model_name
  7from sqlmesh.core.linter.helpers import (
  8    TokenPositionDetails,
  9)
 10from sqlmesh.core.linter.rule import Range, Position
 11from sqlmesh.core.model.definition import SqlModel, ExternalModel, PythonModel, SeedModel
 12from sqlglot import exp
 13from sqlglot.optimizer.scope import build_scope
 14
 15from sqlglot.optimizer.normalize_identifiers import normalize_identifiers
 16from ruamel.yaml import YAML
 17
 18from sqlmesh.utils.pydantic import PydanticModel
 19
 20if t.TYPE_CHECKING:
 21    from sqlmesh.core.context import Context
 22    from sqlmesh.core.context import GenericContext
 23
 24
 25class ModelReference(PydanticModel):
 26    """A reference to a model, excluding external models."""
 27
 28    type: t.Literal["model"] = "model"
 29    path: Path
 30    range: Range
 31    markdown_description: t.Optional[str] = None
 32
 33
 34class ExternalModelReference(PydanticModel):
 35    """A reference to an external model."""
 36
 37    type: t.Literal["external_model"] = "external_model"
 38    range: Range
 39    target_range: t.Optional[Range] = None
 40    path: t.Optional[Path] = None
 41    """The path of the external model, typically a YAML file, it is optional because
 42    external models can be unregistered and so the path is not available."""
 43
 44    markdown_description: t.Optional[str] = None
 45
 46
 47class CTEReference(PydanticModel):
 48    """A reference to a CTE."""
 49
 50    type: t.Literal["cte"] = "cte"
 51    path: Path
 52    range: Range
 53    target_range: Range
 54
 55
 56class MacroReference(PydanticModel):
 57    """A reference to a macro."""
 58
 59    type: t.Literal["macro"] = "macro"
 60    path: Path
 61    range: Range
 62    target_range: Range
 63    markdown_description: t.Optional[str] = None
 64
 65
 66Reference = t.Annotated[
 67    t.Union[ModelReference, CTEReference, MacroReference, ExternalModelReference],
 68    Field(discriminator="type"),
 69]
 70
 71
 72def extract_references_from_query(
 73    query: exp.Expr,
 74    context: t.Union["Context", "GenericContext[t.Any]"],
 75    document_path: Path,
 76    read_file: t.List[str],
 77    depends_on: t.Set[str],
 78    dialect: t.Optional[str] = None,
 79) -> t.List[Reference]:
 80    # Build a scope tree to properly handle nested CTEs
 81    try:
 82        query = normalize_identifiers(query.copy(), dialect=dialect)
 83        root_scope = build_scope(query)
 84    except Exception:
 85        root_scope = None
 86
 87    references: t.List[Reference] = []
 88    if not root_scope:
 89        return references
 90
 91    # Traverse all scopes to find CTE definitions and table references
 92    for scope in root_scope.traverse():
 93        for table in scope.tables:
 94            table_name = table.name
 95
 96            # Check if this table reference is a CTE in the current scope
 97            if cte_scope := scope.cte_sources.get(table_name):
 98                if cte_scope.expression is None:
 99                    continue
100                cte = cte_scope.expression.parent
101                if cte is None:
102                    continue
103                alias = cte.args["alias"]
104                if isinstance(alias, exp.TableAlias):
105                    identifier = alias.this
106                    if isinstance(identifier, exp.Identifier):
107                        target_range_sqlmesh = TokenPositionDetails.from_meta(
108                            identifier.meta
109                        ).to_range(read_file)
110                        table_range_sqlmesh = TokenPositionDetails.from_meta(
111                            table.this.meta
112                        ).to_range(read_file)
113
114                        references.append(
115                            CTEReference(
116                                path=document_path,  # Same file
117                                range=table_range_sqlmesh,
118                                target_range=target_range_sqlmesh,
119                            )
120                        )
121
122                        column_references = _process_column_references(
123                            scope=scope,
124                            reference_name=table.name,
125                            read_file=read_file,
126                            referenced_model_path=document_path,
127                            description="",
128                            reference_type="cte",
129                            cte_target_range=target_range_sqlmesh,
130                        )
131                        references.extend(column_references)
132                continue
133
134            # For non-CTE tables, process these as before (external model references)
135            # Normalize the table reference
136            unaliased = table.copy()
137            if unaliased.args.get("alias") is not None:
138                unaliased.set("alias", None)
139            reference_name = unaliased.sql(dialect=dialect)
140            try:
141                normalized_reference_name = normalize_model_name(
142                    reference_name,
143                    default_catalog=context.default_catalog,
144                    dialect=dialect,
145                )
146                if normalized_reference_name not in depends_on:
147                    continue
148            except Exception:
149                # Skip references that cannot be normalized
150                continue
151
152            # Get the referenced model uri
153            referenced_model = context.get_model(
154                model_or_snapshot=normalized_reference_name, raise_if_missing=False
155            )
156            if referenced_model is None:
157                # Extract metadata for positioning
158                table_meta = TokenPositionDetails.from_meta(table.this.meta)
159                table_range_sqlmesh = table_meta.to_range(read_file)
160                start_pos_sqlmesh = table_range_sqlmesh.start
161                end_pos_sqlmesh = table_range_sqlmesh.end
162
163                # If there's a catalog or database qualifier, adjust the start position
164                catalog_or_db = table.args.get("catalog") or table.args.get("db")
165                if catalog_or_db is not None:
166                    catalog_or_db_meta = TokenPositionDetails.from_meta(catalog_or_db.meta)
167                    catalog_or_db_range_sqlmesh = catalog_or_db_meta.to_range(read_file)
168                    start_pos_sqlmesh = catalog_or_db_range_sqlmesh.start
169
170                references.append(
171                    ExternalModelReference(
172                        range=Range(
173                            start=start_pos_sqlmesh,
174                            end=end_pos_sqlmesh,
175                        ),
176                        markdown_description="Unregistered external model",
177                    )
178                )
179                continue
180            referenced_model_path = referenced_model._path
181            if referenced_model_path is None:
182                continue
183            # Check whether the path exists
184            if not referenced_model_path.is_file():
185                continue
186
187            # Extract metadata for positioning
188            table_meta = TokenPositionDetails.from_meta(table.this.meta)
189            table_range_sqlmesh = table_meta.to_range(read_file)
190            start_pos_sqlmesh = table_range_sqlmesh.start
191            end_pos_sqlmesh = table_range_sqlmesh.end
192
193            # If there's a catalog or database qualifier, adjust the start position
194            catalog_or_db = table.args.get("catalog") or table.args.get("db")
195            if catalog_or_db is not None:
196                catalog_or_db_meta = TokenPositionDetails.from_meta(catalog_or_db.meta)
197                catalog_or_db_range_sqlmesh = catalog_or_db_meta.to_range(read_file)
198                start_pos_sqlmesh = catalog_or_db_range_sqlmesh.start
199
200            description = generate_markdown_description(referenced_model)
201
202            # For external models in YAML files, find the specific model block
203            if isinstance(referenced_model, ExternalModel):
204                yaml_target_range: t.Optional[Range] = None
205                if (
206                    referenced_model_path.suffix in (".yaml", ".yml")
207                    and referenced_model_path.is_file()
208                ):
209                    yaml_target_range = _get_yaml_model_range(
210                        referenced_model_path, referenced_model.name
211                    )
212                references.append(
213                    ExternalModelReference(
214                        path=referenced_model_path,
215                        range=Range(
216                            start=start_pos_sqlmesh,
217                            end=end_pos_sqlmesh,
218                        ),
219                        markdown_description=description,
220                        target_range=yaml_target_range,
221                    )
222                )
223
224                column_references = _process_column_references(
225                    scope=scope,
226                    reference_name=normalized_reference_name,
227                    read_file=read_file,
228                    referenced_model_path=referenced_model_path,
229                    description=description,
230                    yaml_target_range=yaml_target_range,
231                    reference_type="external_model",
232                    default_catalog=context.default_catalog,
233                    dialect=dialect,
234                )
235                references.extend(column_references)
236            else:
237                references.append(
238                    ModelReference(
239                        path=referenced_model_path,
240                        range=Range(
241                            start=start_pos_sqlmesh,
242                            end=end_pos_sqlmesh,
243                        ),
244                        markdown_description=description,
245                    )
246                )
247
248                column_references = _process_column_references(
249                    scope=scope,
250                    reference_name=normalized_reference_name,
251                    read_file=read_file,
252                    referenced_model_path=referenced_model_path,
253                    description=description,
254                    reference_type="model",
255                    default_catalog=context.default_catalog,
256                    dialect=dialect,
257                )
258                references.extend(column_references)
259
260    return references
261
262
263def generate_markdown_description(
264    model: t.Union[SqlModel, ExternalModel, PythonModel, SeedModel],
265) -> t.Optional[str]:
266    description = model.description
267    columns = model.columns_to_types
268    column_descriptions = model.column_descriptions
269
270    if columns is None:
271        return description or None
272
273    columns_table = "\n".join(
274        [
275            f"| {column} | {column_type} | {column_descriptions.get(column, '')} |"
276            for column, column_type in columns.items()
277        ]
278    )
279
280    table_header = "| Column | Type | Description |\n|--------|------|-------------|\n"
281    columns_text = table_header + columns_table
282    return f"{description}\n\n{columns_text}" if description else columns_text
283
284
285def _process_column_references(
286    scope: t.Any,
287    reference_name: str,
288    read_file: t.List[str],
289    referenced_model_path: Path,
290    description: t.Optional[str] = None,
291    yaml_target_range: t.Optional[Range] = None,
292    reference_type: t.Literal["model", "external_model", "cte"] = "model",
293    default_catalog: t.Optional[str] = None,
294    dialect: t.Optional[str] = None,
295    cte_target_range: t.Optional[Range] = None,
296) -> t.List[Reference]:
297    """
298    Process column references for a given table and create appropriate reference objects.
299
300    Args:
301        scope: The SQL scope to search for columns
302        reference_name: The full reference name (may include database/catalog)
303        read_file: The file content as list of lines
304        referenced_model_path: Path of the referenced model
305        description: Markdown description for the reference
306        yaml_target_range: Target range for external models (YAML files)
307        reference_type: Type of reference - "model", "external_model", or "cte"
308        default_catalog: Default catalog for normalization
309        dialect: SQL dialect for normalization
310        cte_target_range: Target range for CTE references
311
312    Returns:
313        List of table references for column usages
314    """
315
316    references: t.List[Reference] = []
317    for column in scope.find_all(exp.Column):
318        if column.table:
319            if reference_type == "cte":
320                if column.table == reference_name:
321                    table_range = _get_column_table_range(column, read_file)
322                    references.append(
323                        CTEReference(
324                            path=referenced_model_path,
325                            range=table_range,
326                            target_range=cte_target_range,
327                        )
328                    )
329            else:
330                table_parts = [part.sql(dialect) for part in column.parts[:-1]]
331                table_ref = ".".join(table_parts)
332                normalized_reference_name = normalize_model_name(
333                    table_ref,
334                    default_catalog=default_catalog,
335                    dialect=dialect,
336                )
337                if normalized_reference_name == reference_name:
338                    table_range = _get_column_table_range(column, read_file)
339                    if reference_type == "external_model":
340                        references.append(
341                            ExternalModelReference(
342                                path=referenced_model_path,
343                                range=table_range,
344                                markdown_description=description,
345                                target_range=yaml_target_range,
346                            )
347                        )
348                    else:
349                        references.append(
350                            ModelReference(
351                                path=referenced_model_path,
352                                range=table_range,
353                                markdown_description=description,
354                            )
355                        )
356
357    return references
358
359
360def _get_column_table_range(column: exp.Column, read_file: t.List[str]) -> Range:
361    """
362    Get the range for a column's table reference, handling both simple and qualified table names.
363
364    Args:
365        column: The column expression
366        read_file: The file content as list of lines
367
368    Returns:
369        The Range covering the table reference in the column
370    """
371
372    table_parts = column.parts[:-1]
373
374    start_range = TokenPositionDetails.from_meta(table_parts[0].meta).to_range(read_file)
375    end_range = TokenPositionDetails.from_meta(table_parts[-1].meta).to_range(read_file)
376
377    return Range(
378        start=start_range.start,
379        end=end_range.end,
380    )
381
382
383def _get_yaml_model_range(path: Path, model_name: str) -> t.Optional[Range]:
384    """
385    Find the range of a specific model block in a YAML file.
386
387    Args:
388        yaml_path: Path to the YAML file
389        model_name: Name of the model to find
390
391    Returns:
392        The Range of the model block in the YAML file, or None if not found
393    """
394    model_name_ranges = get_yaml_model_name_ranges(path)
395    if model_name_ranges is None:
396        return None
397    return model_name_ranges.get(model_name, None)
398
399
400def get_yaml_model_name_ranges(path: Path) -> t.Optional[t.Dict[str, Range]]:
401    """
402    Get the ranges of all model names in a YAML file.
403
404    Args:
405        path: Path to the YAML file
406
407    Returns:
408        A dictionary mapping model names to their ranges in the YAML file.
409    """
410    yaml = YAML()
411    with path.open("r", encoding="utf-8") as f:
412        data = yaml.load(f)
413
414    if not isinstance(data, list):
415        return None
416
417    model_name_ranges = {}
418    for item in data:
419        if isinstance(item, dict):
420            position_data = item.lc.data["name"]  # type: ignore
421            start = Position(line=position_data[2], character=position_data[3])
422            end = Position(line=position_data[2], character=position_data[3] + len(item["name"]))
423            name = item.get("name")
424            if not name:
425                continue
426            model_name_ranges[name] = Range(start=start, end=end)
427
428    return model_name_ranges
class ModelReference(sqlmesh.utils.pydantic.PydanticModel):
26class ModelReference(PydanticModel):
27    """A reference to a model, excluding external models."""
28
29    type: t.Literal["model"] = "model"
30    path: Path
31    range: Range
32    markdown_description: t.Optional[str] = None

A reference to a model, excluding external models.

type: Literal['model']
path: pathlib.Path
markdown_description: Optional[str]
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class ExternalModelReference(sqlmesh.utils.pydantic.PydanticModel):
35class ExternalModelReference(PydanticModel):
36    """A reference to an external model."""
37
38    type: t.Literal["external_model"] = "external_model"
39    range: Range
40    target_range: t.Optional[Range] = None
41    path: t.Optional[Path] = None
42    """The path of the external model, typically a YAML file, it is optional because
43    external models can be unregistered and so the path is not available."""
44
45    markdown_description: t.Optional[str] = None

A reference to an external model.

type: Literal['external_model']
target_range: Optional[sqlmesh.core.linter.rule.Range]
path: Optional[pathlib.Path]

The path of the external model, typically a YAML file, it is optional because external models can be unregistered and so the path is not available.

markdown_description: Optional[str]
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class CTEReference(sqlmesh.utils.pydantic.PydanticModel):
48class CTEReference(PydanticModel):
49    """A reference to a CTE."""
50
51    type: t.Literal["cte"] = "cte"
52    path: Path
53    range: Range
54    target_range: Range

A reference to a CTE.

type: Literal['cte']
path: pathlib.Path
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class MacroReference(sqlmesh.utils.pydantic.PydanticModel):
57class MacroReference(PydanticModel):
58    """A reference to a macro."""
59
60    type: t.Literal["macro"] = "macro"
61    path: Path
62    range: Range
63    target_range: Range
64    markdown_description: t.Optional[str] = None

A reference to a macro.

type: Literal['macro']
path: pathlib.Path
markdown_description: Optional[str]
model_config = {'json_encoders': {<class 'sqlglot.expressions.core.Expr'>: <function _expression_encoder>, <class 'sqlglot.expressions.datatypes.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.query.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.query.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
Reference = typing.Annotated[typing.Union[ModelReference, CTEReference, MacroReference, ExternalModelReference], FieldInfo(annotation=NoneType, required=True, discriminator='type')]
def extract_references_from_query( query: sqlglot.expressions.core.Expr, context: Union[sqlmesh.core.context.Context, sqlmesh.core.context.GenericContext[Any]], document_path: pathlib.Path, read_file: List[str], depends_on: Set[str], dialect: Optional[str] = None) -> List[Annotated[Union[ModelReference, CTEReference, MacroReference, ExternalModelReference], FieldInfo(annotation=NoneType, required=True, discriminator='type')]]:
 73def extract_references_from_query(
 74    query: exp.Expr,
 75    context: t.Union["Context", "GenericContext[t.Any]"],
 76    document_path: Path,
 77    read_file: t.List[str],
 78    depends_on: t.Set[str],
 79    dialect: t.Optional[str] = None,
 80) -> t.List[Reference]:
 81    # Build a scope tree to properly handle nested CTEs
 82    try:
 83        query = normalize_identifiers(query.copy(), dialect=dialect)
 84        root_scope = build_scope(query)
 85    except Exception:
 86        root_scope = None
 87
 88    references: t.List[Reference] = []
 89    if not root_scope:
 90        return references
 91
 92    # Traverse all scopes to find CTE definitions and table references
 93    for scope in root_scope.traverse():
 94        for table in scope.tables:
 95            table_name = table.name
 96
 97            # Check if this table reference is a CTE in the current scope
 98            if cte_scope := scope.cte_sources.get(table_name):
 99                if cte_scope.expression is None:
100                    continue
101                cte = cte_scope.expression.parent
102                if cte is None:
103                    continue
104                alias = cte.args["alias"]
105                if isinstance(alias, exp.TableAlias):
106                    identifier = alias.this
107                    if isinstance(identifier, exp.Identifier):
108                        target_range_sqlmesh = TokenPositionDetails.from_meta(
109                            identifier.meta
110                        ).to_range(read_file)
111                        table_range_sqlmesh = TokenPositionDetails.from_meta(
112                            table.this.meta
113                        ).to_range(read_file)
114
115                        references.append(
116                            CTEReference(
117                                path=document_path,  # Same file
118                                range=table_range_sqlmesh,
119                                target_range=target_range_sqlmesh,
120                            )
121                        )
122
123                        column_references = _process_column_references(
124                            scope=scope,
125                            reference_name=table.name,
126                            read_file=read_file,
127                            referenced_model_path=document_path,
128                            description="",
129                            reference_type="cte",
130                            cte_target_range=target_range_sqlmesh,
131                        )
132                        references.extend(column_references)
133                continue
134
135            # For non-CTE tables, process these as before (external model references)
136            # Normalize the table reference
137            unaliased = table.copy()
138            if unaliased.args.get("alias") is not None:
139                unaliased.set("alias", None)
140            reference_name = unaliased.sql(dialect=dialect)
141            try:
142                normalized_reference_name = normalize_model_name(
143                    reference_name,
144                    default_catalog=context.default_catalog,
145                    dialect=dialect,
146                )
147                if normalized_reference_name not in depends_on:
148                    continue
149            except Exception:
150                # Skip references that cannot be normalized
151                continue
152
153            # Get the referenced model uri
154            referenced_model = context.get_model(
155                model_or_snapshot=normalized_reference_name, raise_if_missing=False
156            )
157            if referenced_model is None:
158                # Extract metadata for positioning
159                table_meta = TokenPositionDetails.from_meta(table.this.meta)
160                table_range_sqlmesh = table_meta.to_range(read_file)
161                start_pos_sqlmesh = table_range_sqlmesh.start
162                end_pos_sqlmesh = table_range_sqlmesh.end
163
164                # If there's a catalog or database qualifier, adjust the start position
165                catalog_or_db = table.args.get("catalog") or table.args.get("db")
166                if catalog_or_db is not None:
167                    catalog_or_db_meta = TokenPositionDetails.from_meta(catalog_or_db.meta)
168                    catalog_or_db_range_sqlmesh = catalog_or_db_meta.to_range(read_file)
169                    start_pos_sqlmesh = catalog_or_db_range_sqlmesh.start
170
171                references.append(
172                    ExternalModelReference(
173                        range=Range(
174                            start=start_pos_sqlmesh,
175                            end=end_pos_sqlmesh,
176                        ),
177                        markdown_description="Unregistered external model",
178                    )
179                )
180                continue
181            referenced_model_path = referenced_model._path
182            if referenced_model_path is None:
183                continue
184            # Check whether the path exists
185            if not referenced_model_path.is_file():
186                continue
187
188            # Extract metadata for positioning
189            table_meta = TokenPositionDetails.from_meta(table.this.meta)
190            table_range_sqlmesh = table_meta.to_range(read_file)
191            start_pos_sqlmesh = table_range_sqlmesh.start
192            end_pos_sqlmesh = table_range_sqlmesh.end
193
194            # If there's a catalog or database qualifier, adjust the start position
195            catalog_or_db = table.args.get("catalog") or table.args.get("db")
196            if catalog_or_db is not None:
197                catalog_or_db_meta = TokenPositionDetails.from_meta(catalog_or_db.meta)
198                catalog_or_db_range_sqlmesh = catalog_or_db_meta.to_range(read_file)
199                start_pos_sqlmesh = catalog_or_db_range_sqlmesh.start
200
201            description = generate_markdown_description(referenced_model)
202
203            # For external models in YAML files, find the specific model block
204            if isinstance(referenced_model, ExternalModel):
205                yaml_target_range: t.Optional[Range] = None
206                if (
207                    referenced_model_path.suffix in (".yaml", ".yml")
208                    and referenced_model_path.is_file()
209                ):
210                    yaml_target_range = _get_yaml_model_range(
211                        referenced_model_path, referenced_model.name
212                    )
213                references.append(
214                    ExternalModelReference(
215                        path=referenced_model_path,
216                        range=Range(
217                            start=start_pos_sqlmesh,
218                            end=end_pos_sqlmesh,
219                        ),
220                        markdown_description=description,
221                        target_range=yaml_target_range,
222                    )
223                )
224
225                column_references = _process_column_references(
226                    scope=scope,
227                    reference_name=normalized_reference_name,
228                    read_file=read_file,
229                    referenced_model_path=referenced_model_path,
230                    description=description,
231                    yaml_target_range=yaml_target_range,
232                    reference_type="external_model",
233                    default_catalog=context.default_catalog,
234                    dialect=dialect,
235                )
236                references.extend(column_references)
237            else:
238                references.append(
239                    ModelReference(
240                        path=referenced_model_path,
241                        range=Range(
242                            start=start_pos_sqlmesh,
243                            end=end_pos_sqlmesh,
244                        ),
245                        markdown_description=description,
246                    )
247                )
248
249                column_references = _process_column_references(
250                    scope=scope,
251                    reference_name=normalized_reference_name,
252                    read_file=read_file,
253                    referenced_model_path=referenced_model_path,
254                    description=description,
255                    reference_type="model",
256                    default_catalog=context.default_catalog,
257                    dialect=dialect,
258                )
259                references.extend(column_references)
260
261    return references
264def generate_markdown_description(
265    model: t.Union[SqlModel, ExternalModel, PythonModel, SeedModel],
266) -> t.Optional[str]:
267    description = model.description
268    columns = model.columns_to_types
269    column_descriptions = model.column_descriptions
270
271    if columns is None:
272        return description or None
273
274    columns_table = "\n".join(
275        [
276            f"| {column} | {column_type} | {column_descriptions.get(column, '')} |"
277            for column, column_type in columns.items()
278        ]
279    )
280
281    table_header = "| Column | Type | Description |\n|--------|------|-------------|\n"
282    columns_text = table_header + columns_table
283    return f"{description}\n\n{columns_text}" if description else columns_text
def get_yaml_model_name_ranges( path: pathlib.Path) -> Optional[Dict[str, sqlmesh.core.linter.rule.Range]]:
401def get_yaml_model_name_ranges(path: Path) -> t.Optional[t.Dict[str, Range]]:
402    """
403    Get the ranges of all model names in a YAML file.
404
405    Args:
406        path: Path to the YAML file
407
408    Returns:
409        A dictionary mapping model names to their ranges in the YAML file.
410    """
411    yaml = YAML()
412    with path.open("r", encoding="utf-8") as f:
413        data = yaml.load(f)
414
415    if not isinstance(data, list):
416        return None
417
418    model_name_ranges = {}
419    for item in data:
420        if isinstance(item, dict):
421            position_data = item.lc.data["name"]  # type: ignore
422            start = Position(line=position_data[2], character=position_data[3])
423            end = Position(line=position_data[2], character=position_data[3] + len(item["name"]))
424            name = item.get("name")
425            if not name:
426                continue
427            model_name_ranges[name] = Range(start=start, end=end)
428
429    return model_name_ranges

Get the ranges of all model names in a YAML file.

Arguments:
  • path: Path to the YAML file
Returns:

A dictionary mapping model names to their ranges in the YAML file.