Edit on GitHub

sqlmesh.utils.lineage

  1import typing as t
  2from pathlib import Path
  3
  4from pydantic import Field
  5
  6from sqlmesh.core.dialect import normalize_model_name
  7from sqlmesh.core.linter.helpers import (
  8    TokenPositionDetails,
  9)
 10from sqlmesh.core.linter.rule import Range, Position
 11from sqlmesh.core.model.definition import SqlModel, ExternalModel, PythonModel, SeedModel
 12from sqlglot import exp
 13from sqlglot.optimizer.scope import build_scope
 14
 15from sqlglot.optimizer.normalize_identifiers import normalize_identifiers
 16from ruamel.yaml import YAML
 17
 18from sqlmesh.utils.pydantic import PydanticModel
 19
 20if t.TYPE_CHECKING:
 21    from sqlmesh.core.context import Context
 22    from sqlmesh.core.context import GenericContext
 23
 24
 25class ModelReference(PydanticModel):
 26    """A reference to a model, excluding external models."""
 27
 28    type: t.Literal["model"] = "model"
 29    path: Path
 30    range: Range
 31    markdown_description: t.Optional[str] = None
 32
 33
 34class ExternalModelReference(PydanticModel):
 35    """A reference to an external model."""
 36
 37    type: t.Literal["external_model"] = "external_model"
 38    range: Range
 39    target_range: t.Optional[Range] = None
 40    path: t.Optional[Path] = None
 41    """The path of the external model, typically a YAML file, it is optional because
 42    external models can be unregistered and so the path is not available."""
 43
 44    markdown_description: t.Optional[str] = None
 45
 46
 47class CTEReference(PydanticModel):
 48    """A reference to a CTE."""
 49
 50    type: t.Literal["cte"] = "cte"
 51    path: Path
 52    range: Range
 53    target_range: Range
 54
 55
 56class MacroReference(PydanticModel):
 57    """A reference to a macro."""
 58
 59    type: t.Literal["macro"] = "macro"
 60    path: Path
 61    range: Range
 62    target_range: Range
 63    markdown_description: t.Optional[str] = None
 64
 65
 66Reference = t.Annotated[
 67    t.Union[ModelReference, CTEReference, MacroReference, ExternalModelReference],
 68    Field(discriminator="type"),
 69]
 70
 71
 72def extract_references_from_query(
 73    query: exp.Expression,
 74    context: t.Union["Context", "GenericContext[t.Any]"],
 75    document_path: Path,
 76    read_file: t.List[str],
 77    depends_on: t.Set[str],
 78    dialect: t.Optional[str] = None,
 79) -> t.List[Reference]:
 80    # Build a scope tree to properly handle nested CTEs
 81    try:
 82        query = normalize_identifiers(query.copy(), dialect=dialect)
 83        root_scope = build_scope(query)
 84    except Exception:
 85        root_scope = None
 86
 87    references: t.List[Reference] = []
 88    if not root_scope:
 89        return references
 90
 91    # Traverse all scopes to find CTE definitions and table references
 92    for scope in root_scope.traverse():
 93        for table in scope.tables:
 94            table_name = table.name
 95
 96            # Check if this table reference is a CTE in the current scope
 97            if cte_scope := scope.cte_sources.get(table_name):
 98                cte = cte_scope.expression.parent
 99                alias = cte.args["alias"]
100                if isinstance(alias, exp.TableAlias):
101                    identifier = alias.this
102                    if isinstance(identifier, exp.Identifier):
103                        target_range_sqlmesh = TokenPositionDetails.from_meta(
104                            identifier.meta
105                        ).to_range(read_file)
106                        table_range_sqlmesh = TokenPositionDetails.from_meta(
107                            table.this.meta
108                        ).to_range(read_file)
109
110                        references.append(
111                            CTEReference(
112                                path=document_path,  # Same file
113                                range=table_range_sqlmesh,
114                                target_range=target_range_sqlmesh,
115                            )
116                        )
117
118                        column_references = _process_column_references(
119                            scope=scope,
120                            reference_name=table.name,
121                            read_file=read_file,
122                            referenced_model_path=document_path,
123                            description="",
124                            reference_type="cte",
125                            cte_target_range=target_range_sqlmesh,
126                        )
127                        references.extend(column_references)
128                continue
129
130            # For non-CTE tables, process these as before (external model references)
131            # Normalize the table reference
132            unaliased = table.copy()
133            if unaliased.args.get("alias") is not None:
134                unaliased.set("alias", None)
135            reference_name = unaliased.sql(dialect=dialect)
136            try:
137                normalized_reference_name = normalize_model_name(
138                    reference_name,
139                    default_catalog=context.default_catalog,
140                    dialect=dialect,
141                )
142                if normalized_reference_name not in depends_on:
143                    continue
144            except Exception:
145                # Skip references that cannot be normalized
146                continue
147
148            # Get the referenced model uri
149            referenced_model = context.get_model(
150                model_or_snapshot=normalized_reference_name, raise_if_missing=False
151            )
152            if referenced_model is None:
153                # Extract metadata for positioning
154                table_meta = TokenPositionDetails.from_meta(table.this.meta)
155                table_range_sqlmesh = table_meta.to_range(read_file)
156                start_pos_sqlmesh = table_range_sqlmesh.start
157                end_pos_sqlmesh = table_range_sqlmesh.end
158
159                # If there's a catalog or database qualifier, adjust the start position
160                catalog_or_db = table.args.get("catalog") or table.args.get("db")
161                if catalog_or_db is not None:
162                    catalog_or_db_meta = TokenPositionDetails.from_meta(catalog_or_db.meta)
163                    catalog_or_db_range_sqlmesh = catalog_or_db_meta.to_range(read_file)
164                    start_pos_sqlmesh = catalog_or_db_range_sqlmesh.start
165
166                references.append(
167                    ExternalModelReference(
168                        range=Range(
169                            start=start_pos_sqlmesh,
170                            end=end_pos_sqlmesh,
171                        ),
172                        markdown_description="Unregistered external model",
173                    )
174                )
175                continue
176            referenced_model_path = referenced_model._path
177            if referenced_model_path is None:
178                continue
179            # Check whether the path exists
180            if not referenced_model_path.is_file():
181                continue
182
183            # Extract metadata for positioning
184            table_meta = TokenPositionDetails.from_meta(table.this.meta)
185            table_range_sqlmesh = table_meta.to_range(read_file)
186            start_pos_sqlmesh = table_range_sqlmesh.start
187            end_pos_sqlmesh = table_range_sqlmesh.end
188
189            # If there's a catalog or database qualifier, adjust the start position
190            catalog_or_db = table.args.get("catalog") or table.args.get("db")
191            if catalog_or_db is not None:
192                catalog_or_db_meta = TokenPositionDetails.from_meta(catalog_or_db.meta)
193                catalog_or_db_range_sqlmesh = catalog_or_db_meta.to_range(read_file)
194                start_pos_sqlmesh = catalog_or_db_range_sqlmesh.start
195
196            description = generate_markdown_description(referenced_model)
197
198            # For external models in YAML files, find the specific model block
199            if isinstance(referenced_model, ExternalModel):
200                yaml_target_range: t.Optional[Range] = None
201                if (
202                    referenced_model_path.suffix in (".yaml", ".yml")
203                    and referenced_model_path.is_file()
204                ):
205                    yaml_target_range = _get_yaml_model_range(
206                        referenced_model_path, referenced_model.name
207                    )
208                references.append(
209                    ExternalModelReference(
210                        path=referenced_model_path,
211                        range=Range(
212                            start=start_pos_sqlmesh,
213                            end=end_pos_sqlmesh,
214                        ),
215                        markdown_description=description,
216                        target_range=yaml_target_range,
217                    )
218                )
219
220                column_references = _process_column_references(
221                    scope=scope,
222                    reference_name=normalized_reference_name,
223                    read_file=read_file,
224                    referenced_model_path=referenced_model_path,
225                    description=description,
226                    yaml_target_range=yaml_target_range,
227                    reference_type="external_model",
228                    default_catalog=context.default_catalog,
229                    dialect=dialect,
230                )
231                references.extend(column_references)
232            else:
233                references.append(
234                    ModelReference(
235                        path=referenced_model_path,
236                        range=Range(
237                            start=start_pos_sqlmesh,
238                            end=end_pos_sqlmesh,
239                        ),
240                        markdown_description=description,
241                    )
242                )
243
244                column_references = _process_column_references(
245                    scope=scope,
246                    reference_name=normalized_reference_name,
247                    read_file=read_file,
248                    referenced_model_path=referenced_model_path,
249                    description=description,
250                    reference_type="model",
251                    default_catalog=context.default_catalog,
252                    dialect=dialect,
253                )
254                references.extend(column_references)
255
256    return references
257
258
259def generate_markdown_description(
260    model: t.Union[SqlModel, ExternalModel, PythonModel, SeedModel],
261) -> t.Optional[str]:
262    description = model.description
263    columns = model.columns_to_types
264    column_descriptions = model.column_descriptions
265
266    if columns is None:
267        return description or None
268
269    columns_table = "\n".join(
270        [
271            f"| {column} | {column_type} | {column_descriptions.get(column, '')} |"
272            for column, column_type in columns.items()
273        ]
274    )
275
276    table_header = "| Column | Type | Description |\n|--------|------|-------------|\n"
277    columns_text = table_header + columns_table
278    return f"{description}\n\n{columns_text}" if description else columns_text
279
280
281def _process_column_references(
282    scope: t.Any,
283    reference_name: str,
284    read_file: t.List[str],
285    referenced_model_path: Path,
286    description: t.Optional[str] = None,
287    yaml_target_range: t.Optional[Range] = None,
288    reference_type: t.Literal["model", "external_model", "cte"] = "model",
289    default_catalog: t.Optional[str] = None,
290    dialect: t.Optional[str] = None,
291    cte_target_range: t.Optional[Range] = None,
292) -> t.List[Reference]:
293    """
294    Process column references for a given table and create appropriate reference objects.
295
296    Args:
297        scope: The SQL scope to search for columns
298        reference_name: The full reference name (may include database/catalog)
299        read_file: The file content as list of lines
300        referenced_model_path: Path of the referenced model
301        description: Markdown description for the reference
302        yaml_target_range: Target range for external models (YAML files)
303        reference_type: Type of reference - "model", "external_model", or "cte"
304        default_catalog: Default catalog for normalization
305        dialect: SQL dialect for normalization
306        cte_target_range: Target range for CTE references
307
308    Returns:
309        List of table references for column usages
310    """
311
312    references: t.List[Reference] = []
313    for column in scope.find_all(exp.Column):
314        if column.table:
315            if reference_type == "cte":
316                if column.table == reference_name:
317                    table_range = _get_column_table_range(column, read_file)
318                    references.append(
319                        CTEReference(
320                            path=referenced_model_path,
321                            range=table_range,
322                            target_range=cte_target_range,
323                        )
324                    )
325            else:
326                table_parts = [part.sql(dialect) for part in column.parts[:-1]]
327                table_ref = ".".join(table_parts)
328                normalized_reference_name = normalize_model_name(
329                    table_ref,
330                    default_catalog=default_catalog,
331                    dialect=dialect,
332                )
333                if normalized_reference_name == reference_name:
334                    table_range = _get_column_table_range(column, read_file)
335                    if reference_type == "external_model":
336                        references.append(
337                            ExternalModelReference(
338                                path=referenced_model_path,
339                                range=table_range,
340                                markdown_description=description,
341                                target_range=yaml_target_range,
342                            )
343                        )
344                    else:
345                        references.append(
346                            ModelReference(
347                                path=referenced_model_path,
348                                range=table_range,
349                                markdown_description=description,
350                            )
351                        )
352
353    return references
354
355
356def _get_column_table_range(column: exp.Column, read_file: t.List[str]) -> Range:
357    """
358    Get the range for a column's table reference, handling both simple and qualified table names.
359
360    Args:
361        column: The column expression
362        read_file: The file content as list of lines
363
364    Returns:
365        The Range covering the table reference in the column
366    """
367
368    table_parts = column.parts[:-1]
369
370    start_range = TokenPositionDetails.from_meta(table_parts[0].meta).to_range(read_file)
371    end_range = TokenPositionDetails.from_meta(table_parts[-1].meta).to_range(read_file)
372
373    return Range(
374        start=start_range.start,
375        end=end_range.end,
376    )
377
378
379def _get_yaml_model_range(path: Path, model_name: str) -> t.Optional[Range]:
380    """
381    Find the range of a specific model block in a YAML file.
382
383    Args:
384        yaml_path: Path to the YAML file
385        model_name: Name of the model to find
386
387    Returns:
388        The Range of the model block in the YAML file, or None if not found
389    """
390    model_name_ranges = get_yaml_model_name_ranges(path)
391    if model_name_ranges is None:
392        return None
393    return model_name_ranges.get(model_name, None)
394
395
396def get_yaml_model_name_ranges(path: Path) -> t.Optional[t.Dict[str, Range]]:
397    """
398    Get the ranges of all model names in a YAML file.
399
400    Args:
401        path: Path to the YAML file
402
403    Returns:
404        A dictionary mapping model names to their ranges in the YAML file.
405    """
406    yaml = YAML()
407    with path.open("r", encoding="utf-8") as f:
408        data = yaml.load(f)
409
410    if not isinstance(data, list):
411        return None
412
413    model_name_ranges = {}
414    for item in data:
415        if isinstance(item, dict):
416            position_data = item.lc.data["name"]  # type: ignore
417            start = Position(line=position_data[2], character=position_data[3])
418            end = Position(line=position_data[2], character=position_data[3] + len(item["name"]))
419            name = item.get("name")
420            if not name:
421                continue
422            model_name_ranges[name] = Range(start=start, end=end)
423
424    return model_name_ranges
class ModelReference(sqlmesh.utils.pydantic.PydanticModel):
26class ModelReference(PydanticModel):
27    """A reference to a model, excluding external models."""
28
29    type: t.Literal["model"] = "model"
30    path: Path
31    range: Range
32    markdown_description: t.Optional[str] = None

A reference to a model, excluding external models.

type: Literal['model']
path: pathlib.Path
markdown_description: Optional[str]
model_config = {'json_encoders': {<class 'sqlglot.expressions.Expression'>: <function _expression_encoder>, <class 'sqlglot.expressions.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class ExternalModelReference(sqlmesh.utils.pydantic.PydanticModel):
35class ExternalModelReference(PydanticModel):
36    """A reference to an external model."""
37
38    type: t.Literal["external_model"] = "external_model"
39    range: Range
40    target_range: t.Optional[Range] = None
41    path: t.Optional[Path] = None
42    """The path of the external model, typically a YAML file, it is optional because
43    external models can be unregistered and so the path is not available."""
44
45    markdown_description: t.Optional[str] = None

A reference to an external model.

type: Literal['external_model']
target_range: Optional[sqlmesh.core.linter.rule.Range]
path: Optional[pathlib.Path]

The path of the external model, typically a YAML file, it is optional because external models can be unregistered and so the path is not available.

markdown_description: Optional[str]
model_config = {'json_encoders': {<class 'sqlglot.expressions.Expression'>: <function _expression_encoder>, <class 'sqlglot.expressions.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class CTEReference(sqlmesh.utils.pydantic.PydanticModel):
48class CTEReference(PydanticModel):
49    """A reference to a CTE."""
50
51    type: t.Literal["cte"] = "cte"
52    path: Path
53    range: Range
54    target_range: Range

A reference to a CTE.

type: Literal['cte']
path: pathlib.Path
model_config = {'json_encoders': {<class 'sqlglot.expressions.Expression'>: <function _expression_encoder>, <class 'sqlglot.expressions.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
class MacroReference(sqlmesh.utils.pydantic.PydanticModel):
57class MacroReference(PydanticModel):
58    """A reference to a macro."""
59
60    type: t.Literal["macro"] = "macro"
61    path: Path
62    range: Range
63    target_range: Range
64    markdown_description: t.Optional[str] = None

A reference to a macro.

type: Literal['macro']
path: pathlib.Path
markdown_description: Optional[str]
model_config = {'json_encoders': {<class 'sqlglot.expressions.Expression'>: <function _expression_encoder>, <class 'sqlglot.expressions.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
Reference = typing.Annotated[typing.Union[ModelReference, CTEReference, MacroReference, ExternalModelReference], FieldInfo(annotation=NoneType, required=True, discriminator='type')]
def extract_references_from_query( query: sqlglot.expressions.Expression, context: Union[sqlmesh.core.context.Context, sqlmesh.core.context.GenericContext[Any]], document_path: pathlib.Path, read_file: List[str], depends_on: Set[str], dialect: Optional[str] = None) -> List[Annotated[Union[ModelReference, CTEReference, MacroReference, ExternalModelReference], FieldInfo(annotation=NoneType, required=True, discriminator='type')]]:
 73def extract_references_from_query(
 74    query: exp.Expression,
 75    context: t.Union["Context", "GenericContext[t.Any]"],
 76    document_path: Path,
 77    read_file: t.List[str],
 78    depends_on: t.Set[str],
 79    dialect: t.Optional[str] = None,
 80) -> t.List[Reference]:
 81    # Build a scope tree to properly handle nested CTEs
 82    try:
 83        query = normalize_identifiers(query.copy(), dialect=dialect)
 84        root_scope = build_scope(query)
 85    except Exception:
 86        root_scope = None
 87
 88    references: t.List[Reference] = []
 89    if not root_scope:
 90        return references
 91
 92    # Traverse all scopes to find CTE definitions and table references
 93    for scope in root_scope.traverse():
 94        for table in scope.tables:
 95            table_name = table.name
 96
 97            # Check if this table reference is a CTE in the current scope
 98            if cte_scope := scope.cte_sources.get(table_name):
 99                cte = cte_scope.expression.parent
100                alias = cte.args["alias"]
101                if isinstance(alias, exp.TableAlias):
102                    identifier = alias.this
103                    if isinstance(identifier, exp.Identifier):
104                        target_range_sqlmesh = TokenPositionDetails.from_meta(
105                            identifier.meta
106                        ).to_range(read_file)
107                        table_range_sqlmesh = TokenPositionDetails.from_meta(
108                            table.this.meta
109                        ).to_range(read_file)
110
111                        references.append(
112                            CTEReference(
113                                path=document_path,  # Same file
114                                range=table_range_sqlmesh,
115                                target_range=target_range_sqlmesh,
116                            )
117                        )
118
119                        column_references = _process_column_references(
120                            scope=scope,
121                            reference_name=table.name,
122                            read_file=read_file,
123                            referenced_model_path=document_path,
124                            description="",
125                            reference_type="cte",
126                            cte_target_range=target_range_sqlmesh,
127                        )
128                        references.extend(column_references)
129                continue
130
131            # For non-CTE tables, process these as before (external model references)
132            # Normalize the table reference
133            unaliased = table.copy()
134            if unaliased.args.get("alias") is not None:
135                unaliased.set("alias", None)
136            reference_name = unaliased.sql(dialect=dialect)
137            try:
138                normalized_reference_name = normalize_model_name(
139                    reference_name,
140                    default_catalog=context.default_catalog,
141                    dialect=dialect,
142                )
143                if normalized_reference_name not in depends_on:
144                    continue
145            except Exception:
146                # Skip references that cannot be normalized
147                continue
148
149            # Get the referenced model uri
150            referenced_model = context.get_model(
151                model_or_snapshot=normalized_reference_name, raise_if_missing=False
152            )
153            if referenced_model is None:
154                # Extract metadata for positioning
155                table_meta = TokenPositionDetails.from_meta(table.this.meta)
156                table_range_sqlmesh = table_meta.to_range(read_file)
157                start_pos_sqlmesh = table_range_sqlmesh.start
158                end_pos_sqlmesh = table_range_sqlmesh.end
159
160                # If there's a catalog or database qualifier, adjust the start position
161                catalog_or_db = table.args.get("catalog") or table.args.get("db")
162                if catalog_or_db is not None:
163                    catalog_or_db_meta = TokenPositionDetails.from_meta(catalog_or_db.meta)
164                    catalog_or_db_range_sqlmesh = catalog_or_db_meta.to_range(read_file)
165                    start_pos_sqlmesh = catalog_or_db_range_sqlmesh.start
166
167                references.append(
168                    ExternalModelReference(
169                        range=Range(
170                            start=start_pos_sqlmesh,
171                            end=end_pos_sqlmesh,
172                        ),
173                        markdown_description="Unregistered external model",
174                    )
175                )
176                continue
177            referenced_model_path = referenced_model._path
178            if referenced_model_path is None:
179                continue
180            # Check whether the path exists
181            if not referenced_model_path.is_file():
182                continue
183
184            # Extract metadata for positioning
185            table_meta = TokenPositionDetails.from_meta(table.this.meta)
186            table_range_sqlmesh = table_meta.to_range(read_file)
187            start_pos_sqlmesh = table_range_sqlmesh.start
188            end_pos_sqlmesh = table_range_sqlmesh.end
189
190            # If there's a catalog or database qualifier, adjust the start position
191            catalog_or_db = table.args.get("catalog") or table.args.get("db")
192            if catalog_or_db is not None:
193                catalog_or_db_meta = TokenPositionDetails.from_meta(catalog_or_db.meta)
194                catalog_or_db_range_sqlmesh = catalog_or_db_meta.to_range(read_file)
195                start_pos_sqlmesh = catalog_or_db_range_sqlmesh.start
196
197            description = generate_markdown_description(referenced_model)
198
199            # For external models in YAML files, find the specific model block
200            if isinstance(referenced_model, ExternalModel):
201                yaml_target_range: t.Optional[Range] = None
202                if (
203                    referenced_model_path.suffix in (".yaml", ".yml")
204                    and referenced_model_path.is_file()
205                ):
206                    yaml_target_range = _get_yaml_model_range(
207                        referenced_model_path, referenced_model.name
208                    )
209                references.append(
210                    ExternalModelReference(
211                        path=referenced_model_path,
212                        range=Range(
213                            start=start_pos_sqlmesh,
214                            end=end_pos_sqlmesh,
215                        ),
216                        markdown_description=description,
217                        target_range=yaml_target_range,
218                    )
219                )
220
221                column_references = _process_column_references(
222                    scope=scope,
223                    reference_name=normalized_reference_name,
224                    read_file=read_file,
225                    referenced_model_path=referenced_model_path,
226                    description=description,
227                    yaml_target_range=yaml_target_range,
228                    reference_type="external_model",
229                    default_catalog=context.default_catalog,
230                    dialect=dialect,
231                )
232                references.extend(column_references)
233            else:
234                references.append(
235                    ModelReference(
236                        path=referenced_model_path,
237                        range=Range(
238                            start=start_pos_sqlmesh,
239                            end=end_pos_sqlmesh,
240                        ),
241                        markdown_description=description,
242                    )
243                )
244
245                column_references = _process_column_references(
246                    scope=scope,
247                    reference_name=normalized_reference_name,
248                    read_file=read_file,
249                    referenced_model_path=referenced_model_path,
250                    description=description,
251                    reference_type="model",
252                    default_catalog=context.default_catalog,
253                    dialect=dialect,
254                )
255                references.extend(column_references)
256
257    return references
260def generate_markdown_description(
261    model: t.Union[SqlModel, ExternalModel, PythonModel, SeedModel],
262) -> t.Optional[str]:
263    description = model.description
264    columns = model.columns_to_types
265    column_descriptions = model.column_descriptions
266
267    if columns is None:
268        return description or None
269
270    columns_table = "\n".join(
271        [
272            f"| {column} | {column_type} | {column_descriptions.get(column, '')} |"
273            for column, column_type in columns.items()
274        ]
275    )
276
277    table_header = "| Column | Type | Description |\n|--------|------|-------------|\n"
278    columns_text = table_header + columns_table
279    return f"{description}\n\n{columns_text}" if description else columns_text
def get_yaml_model_name_ranges( path: pathlib.Path) -> Optional[Dict[str, sqlmesh.core.linter.rule.Range]]:
397def get_yaml_model_name_ranges(path: Path) -> t.Optional[t.Dict[str, Range]]:
398    """
399    Get the ranges of all model names in a YAML file.
400
401    Args:
402        path: Path to the YAML file
403
404    Returns:
405        A dictionary mapping model names to their ranges in the YAML file.
406    """
407    yaml = YAML()
408    with path.open("r", encoding="utf-8") as f:
409        data = yaml.load(f)
410
411    if not isinstance(data, list):
412        return None
413
414    model_name_ranges = {}
415    for item in data:
416        if isinstance(item, dict):
417            position_data = item.lc.data["name"]  # type: ignore
418            start = Position(line=position_data[2], character=position_data[3])
419            end = Position(line=position_data[2], character=position_data[3] + len(item["name"]))
420            name = item.get("name")
421            if not name:
422                continue
423            model_name_ranges[name] = Range(start=start, end=end)
424
425    return model_name_ranges

Get the ranges of all model names in a YAML file.

Arguments:
  • path: Path to the YAML file
Returns:

A dictionary mapping model names to their ranges in the YAML file.