sqlmesh.utils.lineage
1import typing as t 2from pathlib import Path 3 4from pydantic import Field 5 6from sqlmesh.core.dialect import normalize_model_name 7from sqlmesh.core.linter.helpers import ( 8 TokenPositionDetails, 9) 10from sqlmesh.core.linter.rule import Range, Position 11from sqlmesh.core.model.definition import SqlModel, ExternalModel, PythonModel, SeedModel 12from sqlglot import exp 13from sqlglot.optimizer.scope import build_scope 14 15from sqlglot.optimizer.normalize_identifiers import normalize_identifiers 16from ruamel.yaml import YAML 17 18from sqlmesh.utils.pydantic import PydanticModel 19 20if t.TYPE_CHECKING: 21 from sqlmesh.core.context import Context 22 from sqlmesh.core.context import GenericContext 23 24 25class ModelReference(PydanticModel): 26 """A reference to a model, excluding external models.""" 27 28 type: t.Literal["model"] = "model" 29 path: Path 30 range: Range 31 markdown_description: t.Optional[str] = None 32 33 34class ExternalModelReference(PydanticModel): 35 """A reference to an external model.""" 36 37 type: t.Literal["external_model"] = "external_model" 38 range: Range 39 target_range: t.Optional[Range] = None 40 path: t.Optional[Path] = None 41 """The path of the external model, typically a YAML file, it is optional because 42 external models can be unregistered and so the path is not available.""" 43 44 markdown_description: t.Optional[str] = None 45 46 47class CTEReference(PydanticModel): 48 """A reference to a CTE.""" 49 50 type: t.Literal["cte"] = "cte" 51 path: Path 52 range: Range 53 target_range: Range 54 55 56class MacroReference(PydanticModel): 57 """A reference to a macro.""" 58 59 type: t.Literal["macro"] = "macro" 60 path: Path 61 range: Range 62 target_range: Range 63 markdown_description: t.Optional[str] = None 64 65 66Reference = t.Annotated[ 67 t.Union[ModelReference, CTEReference, MacroReference, ExternalModelReference], 68 Field(discriminator="type"), 69] 70 71 72def extract_references_from_query( 73 query: exp.Expr, 74 context: t.Union["Context", "GenericContext[t.Any]"], 75 document_path: Path, 76 read_file: t.List[str], 77 depends_on: t.Set[str], 78 dialect: t.Optional[str] = None, 79) -> t.List[Reference]: 80 # Build a scope tree to properly handle nested CTEs 81 try: 82 query = normalize_identifiers(query.copy(), dialect=dialect) 83 root_scope = build_scope(query) 84 except Exception: 85 root_scope = None 86 87 references: t.List[Reference] = [] 88 if not root_scope: 89 return references 90 91 # Traverse all scopes to find CTE definitions and table references 92 for scope in root_scope.traverse(): 93 for table in scope.tables: 94 table_name = table.name 95 96 # Check if this table reference is a CTE in the current scope 97 if cte_scope := scope.cte_sources.get(table_name): 98 if cte_scope.expression is None: 99 continue 100 cte = cte_scope.expression.parent 101 if cte is None: 102 continue 103 alias = cte.args["alias"] 104 if isinstance(alias, exp.TableAlias): 105 identifier = alias.this 106 if isinstance(identifier, exp.Identifier): 107 target_range_sqlmesh = TokenPositionDetails.from_meta( 108 identifier.meta 109 ).to_range(read_file) 110 table_range_sqlmesh = TokenPositionDetails.from_meta( 111 table.this.meta 112 ).to_range(read_file) 113 114 references.append( 115 CTEReference( 116 path=document_path, # Same file 117 range=table_range_sqlmesh, 118 target_range=target_range_sqlmesh, 119 ) 120 ) 121 122 column_references = _process_column_references( 123 scope=scope, 124 reference_name=table.name, 125 read_file=read_file, 126 referenced_model_path=document_path, 127 description="", 128 reference_type="cte", 129 cte_target_range=target_range_sqlmesh, 130 ) 131 references.extend(column_references) 132 continue 133 134 # For non-CTE tables, process these as before (external model references) 135 # Normalize the table reference 136 unaliased = table.copy() 137 if unaliased.args.get("alias") is not None: 138 unaliased.set("alias", None) 139 reference_name = unaliased.sql(dialect=dialect) 140 try: 141 normalized_reference_name = normalize_model_name( 142 reference_name, 143 default_catalog=context.default_catalog, 144 dialect=dialect, 145 ) 146 if normalized_reference_name not in depends_on: 147 continue 148 except Exception: 149 # Skip references that cannot be normalized 150 continue 151 152 # Get the referenced model uri 153 referenced_model = context.get_model( 154 model_or_snapshot=normalized_reference_name, raise_if_missing=False 155 ) 156 if referenced_model is None: 157 # Extract metadata for positioning 158 table_meta = TokenPositionDetails.from_meta(table.this.meta) 159 table_range_sqlmesh = table_meta.to_range(read_file) 160 start_pos_sqlmesh = table_range_sqlmesh.start 161 end_pos_sqlmesh = table_range_sqlmesh.end 162 163 # If there's a catalog or database qualifier, adjust the start position 164 catalog_or_db = table.args.get("catalog") or table.args.get("db") 165 if catalog_or_db is not None: 166 catalog_or_db_meta = TokenPositionDetails.from_meta(catalog_or_db.meta) 167 catalog_or_db_range_sqlmesh = catalog_or_db_meta.to_range(read_file) 168 start_pos_sqlmesh = catalog_or_db_range_sqlmesh.start 169 170 references.append( 171 ExternalModelReference( 172 range=Range( 173 start=start_pos_sqlmesh, 174 end=end_pos_sqlmesh, 175 ), 176 markdown_description="Unregistered external model", 177 ) 178 ) 179 continue 180 referenced_model_path = referenced_model._path 181 if referenced_model_path is None: 182 continue 183 # Check whether the path exists 184 if not referenced_model_path.is_file(): 185 continue 186 187 # Extract metadata for positioning 188 table_meta = TokenPositionDetails.from_meta(table.this.meta) 189 table_range_sqlmesh = table_meta.to_range(read_file) 190 start_pos_sqlmesh = table_range_sqlmesh.start 191 end_pos_sqlmesh = table_range_sqlmesh.end 192 193 # If there's a catalog or database qualifier, adjust the start position 194 catalog_or_db = table.args.get("catalog") or table.args.get("db") 195 if catalog_or_db is not None: 196 catalog_or_db_meta = TokenPositionDetails.from_meta(catalog_or_db.meta) 197 catalog_or_db_range_sqlmesh = catalog_or_db_meta.to_range(read_file) 198 start_pos_sqlmesh = catalog_or_db_range_sqlmesh.start 199 200 description = generate_markdown_description(referenced_model) 201 202 # For external models in YAML files, find the specific model block 203 if isinstance(referenced_model, ExternalModel): 204 yaml_target_range: t.Optional[Range] = None 205 if ( 206 referenced_model_path.suffix in (".yaml", ".yml") 207 and referenced_model_path.is_file() 208 ): 209 yaml_target_range = _get_yaml_model_range( 210 referenced_model_path, referenced_model.name 211 ) 212 references.append( 213 ExternalModelReference( 214 path=referenced_model_path, 215 range=Range( 216 start=start_pos_sqlmesh, 217 end=end_pos_sqlmesh, 218 ), 219 markdown_description=description, 220 target_range=yaml_target_range, 221 ) 222 ) 223 224 column_references = _process_column_references( 225 scope=scope, 226 reference_name=normalized_reference_name, 227 read_file=read_file, 228 referenced_model_path=referenced_model_path, 229 description=description, 230 yaml_target_range=yaml_target_range, 231 reference_type="external_model", 232 default_catalog=context.default_catalog, 233 dialect=dialect, 234 ) 235 references.extend(column_references) 236 else: 237 references.append( 238 ModelReference( 239 path=referenced_model_path, 240 range=Range( 241 start=start_pos_sqlmesh, 242 end=end_pos_sqlmesh, 243 ), 244 markdown_description=description, 245 ) 246 ) 247 248 column_references = _process_column_references( 249 scope=scope, 250 reference_name=normalized_reference_name, 251 read_file=read_file, 252 referenced_model_path=referenced_model_path, 253 description=description, 254 reference_type="model", 255 default_catalog=context.default_catalog, 256 dialect=dialect, 257 ) 258 references.extend(column_references) 259 260 return references 261 262 263def generate_markdown_description( 264 model: t.Union[SqlModel, ExternalModel, PythonModel, SeedModel], 265) -> t.Optional[str]: 266 description = model.description 267 columns = model.columns_to_types 268 column_descriptions = model.column_descriptions 269 270 if columns is None: 271 return description or None 272 273 columns_table = "\n".join( 274 [ 275 f"| {column} | {column_type} | {column_descriptions.get(column, '')} |" 276 for column, column_type in columns.items() 277 ] 278 ) 279 280 table_header = "| Column | Type | Description |\n|--------|------|-------------|\n" 281 columns_text = table_header + columns_table 282 return f"{description}\n\n{columns_text}" if description else columns_text 283 284 285def _process_column_references( 286 scope: t.Any, 287 reference_name: str, 288 read_file: t.List[str], 289 referenced_model_path: Path, 290 description: t.Optional[str] = None, 291 yaml_target_range: t.Optional[Range] = None, 292 reference_type: t.Literal["model", "external_model", "cte"] = "model", 293 default_catalog: t.Optional[str] = None, 294 dialect: t.Optional[str] = None, 295 cte_target_range: t.Optional[Range] = None, 296) -> t.List[Reference]: 297 """ 298 Process column references for a given table and create appropriate reference objects. 299 300 Args: 301 scope: The SQL scope to search for columns 302 reference_name: The full reference name (may include database/catalog) 303 read_file: The file content as list of lines 304 referenced_model_path: Path of the referenced model 305 description: Markdown description for the reference 306 yaml_target_range: Target range for external models (YAML files) 307 reference_type: Type of reference - "model", "external_model", or "cte" 308 default_catalog: Default catalog for normalization 309 dialect: SQL dialect for normalization 310 cte_target_range: Target range for CTE references 311 312 Returns: 313 List of table references for column usages 314 """ 315 316 references: t.List[Reference] = [] 317 for column in scope.find_all(exp.Column): 318 if column.table: 319 if reference_type == "cte": 320 if column.table == reference_name: 321 table_range = _get_column_table_range(column, read_file) 322 references.append( 323 CTEReference( 324 path=referenced_model_path, 325 range=table_range, 326 target_range=cte_target_range, 327 ) 328 ) 329 else: 330 table_parts = [part.sql(dialect) for part in column.parts[:-1]] 331 table_ref = ".".join(table_parts) 332 normalized_reference_name = normalize_model_name( 333 table_ref, 334 default_catalog=default_catalog, 335 dialect=dialect, 336 ) 337 if normalized_reference_name == reference_name: 338 table_range = _get_column_table_range(column, read_file) 339 if reference_type == "external_model": 340 references.append( 341 ExternalModelReference( 342 path=referenced_model_path, 343 range=table_range, 344 markdown_description=description, 345 target_range=yaml_target_range, 346 ) 347 ) 348 else: 349 references.append( 350 ModelReference( 351 path=referenced_model_path, 352 range=table_range, 353 markdown_description=description, 354 ) 355 ) 356 357 return references 358 359 360def _get_column_table_range(column: exp.Column, read_file: t.List[str]) -> Range: 361 """ 362 Get the range for a column's table reference, handling both simple and qualified table names. 363 364 Args: 365 column: The column expression 366 read_file: The file content as list of lines 367 368 Returns: 369 The Range covering the table reference in the column 370 """ 371 372 table_parts = column.parts[:-1] 373 374 start_range = TokenPositionDetails.from_meta(table_parts[0].meta).to_range(read_file) 375 end_range = TokenPositionDetails.from_meta(table_parts[-1].meta).to_range(read_file) 376 377 return Range( 378 start=start_range.start, 379 end=end_range.end, 380 ) 381 382 383def _get_yaml_model_range(path: Path, model_name: str) -> t.Optional[Range]: 384 """ 385 Find the range of a specific model block in a YAML file. 386 387 Args: 388 yaml_path: Path to the YAML file 389 model_name: Name of the model to find 390 391 Returns: 392 The Range of the model block in the YAML file, or None if not found 393 """ 394 model_name_ranges = get_yaml_model_name_ranges(path) 395 if model_name_ranges is None: 396 return None 397 return model_name_ranges.get(model_name, None) 398 399 400def get_yaml_model_name_ranges(path: Path) -> t.Optional[t.Dict[str, Range]]: 401 """ 402 Get the ranges of all model names in a YAML file. 403 404 Args: 405 path: Path to the YAML file 406 407 Returns: 408 A dictionary mapping model names to their ranges in the YAML file. 409 """ 410 yaml = YAML() 411 with path.open("r", encoding="utf-8") as f: 412 data = yaml.load(f) 413 414 if not isinstance(data, list): 415 return None 416 417 model_name_ranges = {} 418 for item in data: 419 if isinstance(item, dict): 420 position_data = item.lc.data["name"] # type: ignore 421 start = Position(line=position_data[2], character=position_data[3]) 422 end = Position(line=position_data[2], character=position_data[3] + len(item["name"])) 423 name = item.get("name") 424 if not name: 425 continue 426 model_name_ranges[name] = Range(start=start, end=end) 427 428 return model_name_ranges
26class ModelReference(PydanticModel): 27 """A reference to a model, excluding external models.""" 28 29 type: t.Literal["model"] = "model" 30 path: Path 31 range: Range 32 markdown_description: t.Optional[str] = None
A reference to a model, excluding external models.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
35class ExternalModelReference(PydanticModel): 36 """A reference to an external model.""" 37 38 type: t.Literal["external_model"] = "external_model" 39 range: Range 40 target_range: t.Optional[Range] = None 41 path: t.Optional[Path] = None 42 """The path of the external model, typically a YAML file, it is optional because 43 external models can be unregistered and so the path is not available.""" 44 45 markdown_description: t.Optional[str] = None
A reference to an external model.
The path of the external model, typically a YAML file, it is optional because external models can be unregistered and so the path is not available.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
48class CTEReference(PydanticModel): 49 """A reference to a CTE.""" 50 51 type: t.Literal["cte"] = "cte" 52 path: Path 53 range: Range 54 target_range: Range
A reference to a CTE.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
57class MacroReference(PydanticModel): 58 """A reference to a macro.""" 59 60 type: t.Literal["macro"] = "macro" 61 path: Path 62 range: Range 63 target_range: Range 64 markdown_description: t.Optional[str] = None
A reference to a macro.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
73def extract_references_from_query( 74 query: exp.Expr, 75 context: t.Union["Context", "GenericContext[t.Any]"], 76 document_path: Path, 77 read_file: t.List[str], 78 depends_on: t.Set[str], 79 dialect: t.Optional[str] = None, 80) -> t.List[Reference]: 81 # Build a scope tree to properly handle nested CTEs 82 try: 83 query = normalize_identifiers(query.copy(), dialect=dialect) 84 root_scope = build_scope(query) 85 except Exception: 86 root_scope = None 87 88 references: t.List[Reference] = [] 89 if not root_scope: 90 return references 91 92 # Traverse all scopes to find CTE definitions and table references 93 for scope in root_scope.traverse(): 94 for table in scope.tables: 95 table_name = table.name 96 97 # Check if this table reference is a CTE in the current scope 98 if cte_scope := scope.cte_sources.get(table_name): 99 if cte_scope.expression is None: 100 continue 101 cte = cte_scope.expression.parent 102 if cte is None: 103 continue 104 alias = cte.args["alias"] 105 if isinstance(alias, exp.TableAlias): 106 identifier = alias.this 107 if isinstance(identifier, exp.Identifier): 108 target_range_sqlmesh = TokenPositionDetails.from_meta( 109 identifier.meta 110 ).to_range(read_file) 111 table_range_sqlmesh = TokenPositionDetails.from_meta( 112 table.this.meta 113 ).to_range(read_file) 114 115 references.append( 116 CTEReference( 117 path=document_path, # Same file 118 range=table_range_sqlmesh, 119 target_range=target_range_sqlmesh, 120 ) 121 ) 122 123 column_references = _process_column_references( 124 scope=scope, 125 reference_name=table.name, 126 read_file=read_file, 127 referenced_model_path=document_path, 128 description="", 129 reference_type="cte", 130 cte_target_range=target_range_sqlmesh, 131 ) 132 references.extend(column_references) 133 continue 134 135 # For non-CTE tables, process these as before (external model references) 136 # Normalize the table reference 137 unaliased = table.copy() 138 if unaliased.args.get("alias") is not None: 139 unaliased.set("alias", None) 140 reference_name = unaliased.sql(dialect=dialect) 141 try: 142 normalized_reference_name = normalize_model_name( 143 reference_name, 144 default_catalog=context.default_catalog, 145 dialect=dialect, 146 ) 147 if normalized_reference_name not in depends_on: 148 continue 149 except Exception: 150 # Skip references that cannot be normalized 151 continue 152 153 # Get the referenced model uri 154 referenced_model = context.get_model( 155 model_or_snapshot=normalized_reference_name, raise_if_missing=False 156 ) 157 if referenced_model is None: 158 # Extract metadata for positioning 159 table_meta = TokenPositionDetails.from_meta(table.this.meta) 160 table_range_sqlmesh = table_meta.to_range(read_file) 161 start_pos_sqlmesh = table_range_sqlmesh.start 162 end_pos_sqlmesh = table_range_sqlmesh.end 163 164 # If there's a catalog or database qualifier, adjust the start position 165 catalog_or_db = table.args.get("catalog") or table.args.get("db") 166 if catalog_or_db is not None: 167 catalog_or_db_meta = TokenPositionDetails.from_meta(catalog_or_db.meta) 168 catalog_or_db_range_sqlmesh = catalog_or_db_meta.to_range(read_file) 169 start_pos_sqlmesh = catalog_or_db_range_sqlmesh.start 170 171 references.append( 172 ExternalModelReference( 173 range=Range( 174 start=start_pos_sqlmesh, 175 end=end_pos_sqlmesh, 176 ), 177 markdown_description="Unregistered external model", 178 ) 179 ) 180 continue 181 referenced_model_path = referenced_model._path 182 if referenced_model_path is None: 183 continue 184 # Check whether the path exists 185 if not referenced_model_path.is_file(): 186 continue 187 188 # Extract metadata for positioning 189 table_meta = TokenPositionDetails.from_meta(table.this.meta) 190 table_range_sqlmesh = table_meta.to_range(read_file) 191 start_pos_sqlmesh = table_range_sqlmesh.start 192 end_pos_sqlmesh = table_range_sqlmesh.end 193 194 # If there's a catalog or database qualifier, adjust the start position 195 catalog_or_db = table.args.get("catalog") or table.args.get("db") 196 if catalog_or_db is not None: 197 catalog_or_db_meta = TokenPositionDetails.from_meta(catalog_or_db.meta) 198 catalog_or_db_range_sqlmesh = catalog_or_db_meta.to_range(read_file) 199 start_pos_sqlmesh = catalog_or_db_range_sqlmesh.start 200 201 description = generate_markdown_description(referenced_model) 202 203 # For external models in YAML files, find the specific model block 204 if isinstance(referenced_model, ExternalModel): 205 yaml_target_range: t.Optional[Range] = None 206 if ( 207 referenced_model_path.suffix in (".yaml", ".yml") 208 and referenced_model_path.is_file() 209 ): 210 yaml_target_range = _get_yaml_model_range( 211 referenced_model_path, referenced_model.name 212 ) 213 references.append( 214 ExternalModelReference( 215 path=referenced_model_path, 216 range=Range( 217 start=start_pos_sqlmesh, 218 end=end_pos_sqlmesh, 219 ), 220 markdown_description=description, 221 target_range=yaml_target_range, 222 ) 223 ) 224 225 column_references = _process_column_references( 226 scope=scope, 227 reference_name=normalized_reference_name, 228 read_file=read_file, 229 referenced_model_path=referenced_model_path, 230 description=description, 231 yaml_target_range=yaml_target_range, 232 reference_type="external_model", 233 default_catalog=context.default_catalog, 234 dialect=dialect, 235 ) 236 references.extend(column_references) 237 else: 238 references.append( 239 ModelReference( 240 path=referenced_model_path, 241 range=Range( 242 start=start_pos_sqlmesh, 243 end=end_pos_sqlmesh, 244 ), 245 markdown_description=description, 246 ) 247 ) 248 249 column_references = _process_column_references( 250 scope=scope, 251 reference_name=normalized_reference_name, 252 read_file=read_file, 253 referenced_model_path=referenced_model_path, 254 description=description, 255 reference_type="model", 256 default_catalog=context.default_catalog, 257 dialect=dialect, 258 ) 259 references.extend(column_references) 260 261 return references
264def generate_markdown_description( 265 model: t.Union[SqlModel, ExternalModel, PythonModel, SeedModel], 266) -> t.Optional[str]: 267 description = model.description 268 columns = model.columns_to_types 269 column_descriptions = model.column_descriptions 270 271 if columns is None: 272 return description or None 273 274 columns_table = "\n".join( 275 [ 276 f"| {column} | {column_type} | {column_descriptions.get(column, '')} |" 277 for column, column_type in columns.items() 278 ] 279 ) 280 281 table_header = "| Column | Type | Description |\n|--------|------|-------------|\n" 282 columns_text = table_header + columns_table 283 return f"{description}\n\n{columns_text}" if description else columns_text
401def get_yaml_model_name_ranges(path: Path) -> t.Optional[t.Dict[str, Range]]: 402 """ 403 Get the ranges of all model names in a YAML file. 404 405 Args: 406 path: Path to the YAML file 407 408 Returns: 409 A dictionary mapping model names to their ranges in the YAML file. 410 """ 411 yaml = YAML() 412 with path.open("r", encoding="utf-8") as f: 413 data = yaml.load(f) 414 415 if not isinstance(data, list): 416 return None 417 418 model_name_ranges = {} 419 for item in data: 420 if isinstance(item, dict): 421 position_data = item.lc.data["name"] # type: ignore 422 start = Position(line=position_data[2], character=position_data[3]) 423 end = Position(line=position_data[2], character=position_data[3] + len(item["name"])) 424 name = item.get("name") 425 if not name: 426 continue 427 model_name_ranges[name] = Range(start=start, end=end) 428 429 return model_name_ranges
Get the ranges of all model names in a YAML file.
Arguments:
- path: Path to the YAML file
Returns:
A dictionary mapping model names to their ranges in the YAML file.