sqlmesh.utils.lineage
1import typing as t 2from pathlib import Path 3 4from pydantic import Field 5 6from sqlmesh.core.dialect import normalize_model_name 7from sqlmesh.core.linter.helpers import ( 8 TokenPositionDetails, 9) 10from sqlmesh.core.linter.rule import Range, Position 11from sqlmesh.core.model.definition import SqlModel, ExternalModel, PythonModel, SeedModel 12from sqlglot import exp 13from sqlglot.optimizer.scope import build_scope 14 15from sqlglot.optimizer.normalize_identifiers import normalize_identifiers 16from ruamel.yaml import YAML 17 18from sqlmesh.utils.pydantic import PydanticModel 19 20if t.TYPE_CHECKING: 21 from sqlmesh.core.context import Context 22 from sqlmesh.core.context import GenericContext 23 24 25class ModelReference(PydanticModel): 26 """A reference to a model, excluding external models.""" 27 28 type: t.Literal["model"] = "model" 29 path: Path 30 range: Range 31 markdown_description: t.Optional[str] = None 32 33 34class ExternalModelReference(PydanticModel): 35 """A reference to an external model.""" 36 37 type: t.Literal["external_model"] = "external_model" 38 range: Range 39 target_range: t.Optional[Range] = None 40 path: t.Optional[Path] = None 41 """The path of the external model, typically a YAML file, it is optional because 42 external models can be unregistered and so the path is not available.""" 43 44 markdown_description: t.Optional[str] = None 45 46 47class CTEReference(PydanticModel): 48 """A reference to a CTE.""" 49 50 type: t.Literal["cte"] = "cte" 51 path: Path 52 range: Range 53 target_range: Range 54 55 56class MacroReference(PydanticModel): 57 """A reference to a macro.""" 58 59 type: t.Literal["macro"] = "macro" 60 path: Path 61 range: Range 62 target_range: Range 63 markdown_description: t.Optional[str] = None 64 65 66Reference = t.Annotated[ 67 t.Union[ModelReference, CTEReference, MacroReference, ExternalModelReference], 68 Field(discriminator="type"), 69] 70 71 72def extract_references_from_query( 73 query: exp.Expression, 74 context: t.Union["Context", "GenericContext[t.Any]"], 75 document_path: Path, 76 read_file: t.List[str], 77 depends_on: t.Set[str], 78 dialect: t.Optional[str] = None, 79) -> t.List[Reference]: 80 # Build a scope tree to properly handle nested CTEs 81 try: 82 query = normalize_identifiers(query.copy(), dialect=dialect) 83 root_scope = build_scope(query) 84 except Exception: 85 root_scope = None 86 87 references: t.List[Reference] = [] 88 if not root_scope: 89 return references 90 91 # Traverse all scopes to find CTE definitions and table references 92 for scope in root_scope.traverse(): 93 for table in scope.tables: 94 table_name = table.name 95 96 # Check if this table reference is a CTE in the current scope 97 if cte_scope := scope.cte_sources.get(table_name): 98 cte = cte_scope.expression.parent 99 alias = cte.args["alias"] 100 if isinstance(alias, exp.TableAlias): 101 identifier = alias.this 102 if isinstance(identifier, exp.Identifier): 103 target_range_sqlmesh = TokenPositionDetails.from_meta( 104 identifier.meta 105 ).to_range(read_file) 106 table_range_sqlmesh = TokenPositionDetails.from_meta( 107 table.this.meta 108 ).to_range(read_file) 109 110 references.append( 111 CTEReference( 112 path=document_path, # Same file 113 range=table_range_sqlmesh, 114 target_range=target_range_sqlmesh, 115 ) 116 ) 117 118 column_references = _process_column_references( 119 scope=scope, 120 reference_name=table.name, 121 read_file=read_file, 122 referenced_model_path=document_path, 123 description="", 124 reference_type="cte", 125 cte_target_range=target_range_sqlmesh, 126 ) 127 references.extend(column_references) 128 continue 129 130 # For non-CTE tables, process these as before (external model references) 131 # Normalize the table reference 132 unaliased = table.copy() 133 if unaliased.args.get("alias") is not None: 134 unaliased.set("alias", None) 135 reference_name = unaliased.sql(dialect=dialect) 136 try: 137 normalized_reference_name = normalize_model_name( 138 reference_name, 139 default_catalog=context.default_catalog, 140 dialect=dialect, 141 ) 142 if normalized_reference_name not in depends_on: 143 continue 144 except Exception: 145 # Skip references that cannot be normalized 146 continue 147 148 # Get the referenced model uri 149 referenced_model = context.get_model( 150 model_or_snapshot=normalized_reference_name, raise_if_missing=False 151 ) 152 if referenced_model is None: 153 # Extract metadata for positioning 154 table_meta = TokenPositionDetails.from_meta(table.this.meta) 155 table_range_sqlmesh = table_meta.to_range(read_file) 156 start_pos_sqlmesh = table_range_sqlmesh.start 157 end_pos_sqlmesh = table_range_sqlmesh.end 158 159 # If there's a catalog or database qualifier, adjust the start position 160 catalog_or_db = table.args.get("catalog") or table.args.get("db") 161 if catalog_or_db is not None: 162 catalog_or_db_meta = TokenPositionDetails.from_meta(catalog_or_db.meta) 163 catalog_or_db_range_sqlmesh = catalog_or_db_meta.to_range(read_file) 164 start_pos_sqlmesh = catalog_or_db_range_sqlmesh.start 165 166 references.append( 167 ExternalModelReference( 168 range=Range( 169 start=start_pos_sqlmesh, 170 end=end_pos_sqlmesh, 171 ), 172 markdown_description="Unregistered external model", 173 ) 174 ) 175 continue 176 referenced_model_path = referenced_model._path 177 if referenced_model_path is None: 178 continue 179 # Check whether the path exists 180 if not referenced_model_path.is_file(): 181 continue 182 183 # Extract metadata for positioning 184 table_meta = TokenPositionDetails.from_meta(table.this.meta) 185 table_range_sqlmesh = table_meta.to_range(read_file) 186 start_pos_sqlmesh = table_range_sqlmesh.start 187 end_pos_sqlmesh = table_range_sqlmesh.end 188 189 # If there's a catalog or database qualifier, adjust the start position 190 catalog_or_db = table.args.get("catalog") or table.args.get("db") 191 if catalog_or_db is not None: 192 catalog_or_db_meta = TokenPositionDetails.from_meta(catalog_or_db.meta) 193 catalog_or_db_range_sqlmesh = catalog_or_db_meta.to_range(read_file) 194 start_pos_sqlmesh = catalog_or_db_range_sqlmesh.start 195 196 description = generate_markdown_description(referenced_model) 197 198 # For external models in YAML files, find the specific model block 199 if isinstance(referenced_model, ExternalModel): 200 yaml_target_range: t.Optional[Range] = None 201 if ( 202 referenced_model_path.suffix in (".yaml", ".yml") 203 and referenced_model_path.is_file() 204 ): 205 yaml_target_range = _get_yaml_model_range( 206 referenced_model_path, referenced_model.name 207 ) 208 references.append( 209 ExternalModelReference( 210 path=referenced_model_path, 211 range=Range( 212 start=start_pos_sqlmesh, 213 end=end_pos_sqlmesh, 214 ), 215 markdown_description=description, 216 target_range=yaml_target_range, 217 ) 218 ) 219 220 column_references = _process_column_references( 221 scope=scope, 222 reference_name=normalized_reference_name, 223 read_file=read_file, 224 referenced_model_path=referenced_model_path, 225 description=description, 226 yaml_target_range=yaml_target_range, 227 reference_type="external_model", 228 default_catalog=context.default_catalog, 229 dialect=dialect, 230 ) 231 references.extend(column_references) 232 else: 233 references.append( 234 ModelReference( 235 path=referenced_model_path, 236 range=Range( 237 start=start_pos_sqlmesh, 238 end=end_pos_sqlmesh, 239 ), 240 markdown_description=description, 241 ) 242 ) 243 244 column_references = _process_column_references( 245 scope=scope, 246 reference_name=normalized_reference_name, 247 read_file=read_file, 248 referenced_model_path=referenced_model_path, 249 description=description, 250 reference_type="model", 251 default_catalog=context.default_catalog, 252 dialect=dialect, 253 ) 254 references.extend(column_references) 255 256 return references 257 258 259def generate_markdown_description( 260 model: t.Union[SqlModel, ExternalModel, PythonModel, SeedModel], 261) -> t.Optional[str]: 262 description = model.description 263 columns = model.columns_to_types 264 column_descriptions = model.column_descriptions 265 266 if columns is None: 267 return description or None 268 269 columns_table = "\n".join( 270 [ 271 f"| {column} | {column_type} | {column_descriptions.get(column, '')} |" 272 for column, column_type in columns.items() 273 ] 274 ) 275 276 table_header = "| Column | Type | Description |\n|--------|------|-------------|\n" 277 columns_text = table_header + columns_table 278 return f"{description}\n\n{columns_text}" if description else columns_text 279 280 281def _process_column_references( 282 scope: t.Any, 283 reference_name: str, 284 read_file: t.List[str], 285 referenced_model_path: Path, 286 description: t.Optional[str] = None, 287 yaml_target_range: t.Optional[Range] = None, 288 reference_type: t.Literal["model", "external_model", "cte"] = "model", 289 default_catalog: t.Optional[str] = None, 290 dialect: t.Optional[str] = None, 291 cte_target_range: t.Optional[Range] = None, 292) -> t.List[Reference]: 293 """ 294 Process column references for a given table and create appropriate reference objects. 295 296 Args: 297 scope: The SQL scope to search for columns 298 reference_name: The full reference name (may include database/catalog) 299 read_file: The file content as list of lines 300 referenced_model_path: Path of the referenced model 301 description: Markdown description for the reference 302 yaml_target_range: Target range for external models (YAML files) 303 reference_type: Type of reference - "model", "external_model", or "cte" 304 default_catalog: Default catalog for normalization 305 dialect: SQL dialect for normalization 306 cte_target_range: Target range for CTE references 307 308 Returns: 309 List of table references for column usages 310 """ 311 312 references: t.List[Reference] = [] 313 for column in scope.find_all(exp.Column): 314 if column.table: 315 if reference_type == "cte": 316 if column.table == reference_name: 317 table_range = _get_column_table_range(column, read_file) 318 references.append( 319 CTEReference( 320 path=referenced_model_path, 321 range=table_range, 322 target_range=cte_target_range, 323 ) 324 ) 325 else: 326 table_parts = [part.sql(dialect) for part in column.parts[:-1]] 327 table_ref = ".".join(table_parts) 328 normalized_reference_name = normalize_model_name( 329 table_ref, 330 default_catalog=default_catalog, 331 dialect=dialect, 332 ) 333 if normalized_reference_name == reference_name: 334 table_range = _get_column_table_range(column, read_file) 335 if reference_type == "external_model": 336 references.append( 337 ExternalModelReference( 338 path=referenced_model_path, 339 range=table_range, 340 markdown_description=description, 341 target_range=yaml_target_range, 342 ) 343 ) 344 else: 345 references.append( 346 ModelReference( 347 path=referenced_model_path, 348 range=table_range, 349 markdown_description=description, 350 ) 351 ) 352 353 return references 354 355 356def _get_column_table_range(column: exp.Column, read_file: t.List[str]) -> Range: 357 """ 358 Get the range for a column's table reference, handling both simple and qualified table names. 359 360 Args: 361 column: The column expression 362 read_file: The file content as list of lines 363 364 Returns: 365 The Range covering the table reference in the column 366 """ 367 368 table_parts = column.parts[:-1] 369 370 start_range = TokenPositionDetails.from_meta(table_parts[0].meta).to_range(read_file) 371 end_range = TokenPositionDetails.from_meta(table_parts[-1].meta).to_range(read_file) 372 373 return Range( 374 start=start_range.start, 375 end=end_range.end, 376 ) 377 378 379def _get_yaml_model_range(path: Path, model_name: str) -> t.Optional[Range]: 380 """ 381 Find the range of a specific model block in a YAML file. 382 383 Args: 384 yaml_path: Path to the YAML file 385 model_name: Name of the model to find 386 387 Returns: 388 The Range of the model block in the YAML file, or None if not found 389 """ 390 model_name_ranges = get_yaml_model_name_ranges(path) 391 if model_name_ranges is None: 392 return None 393 return model_name_ranges.get(model_name, None) 394 395 396def get_yaml_model_name_ranges(path: Path) -> t.Optional[t.Dict[str, Range]]: 397 """ 398 Get the ranges of all model names in a YAML file. 399 400 Args: 401 path: Path to the YAML file 402 403 Returns: 404 A dictionary mapping model names to their ranges in the YAML file. 405 """ 406 yaml = YAML() 407 with path.open("r", encoding="utf-8") as f: 408 data = yaml.load(f) 409 410 if not isinstance(data, list): 411 return None 412 413 model_name_ranges = {} 414 for item in data: 415 if isinstance(item, dict): 416 position_data = item.lc.data["name"] # type: ignore 417 start = Position(line=position_data[2], character=position_data[3]) 418 end = Position(line=position_data[2], character=position_data[3] + len(item["name"])) 419 name = item.get("name") 420 if not name: 421 continue 422 model_name_ranges[name] = Range(start=start, end=end) 423 424 return model_name_ranges
26class ModelReference(PydanticModel): 27 """A reference to a model, excluding external models.""" 28 29 type: t.Literal["model"] = "model" 30 path: Path 31 range: Range 32 markdown_description: t.Optional[str] = None
A reference to a model, excluding external models.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
35class ExternalModelReference(PydanticModel): 36 """A reference to an external model.""" 37 38 type: t.Literal["external_model"] = "external_model" 39 range: Range 40 target_range: t.Optional[Range] = None 41 path: t.Optional[Path] = None 42 """The path of the external model, typically a YAML file, it is optional because 43 external models can be unregistered and so the path is not available.""" 44 45 markdown_description: t.Optional[str] = None
A reference to an external model.
The path of the external model, typically a YAML file, it is optional because external models can be unregistered and so the path is not available.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
48class CTEReference(PydanticModel): 49 """A reference to a CTE.""" 50 51 type: t.Literal["cte"] = "cte" 52 path: Path 53 range: Range 54 target_range: Range
A reference to a CTE.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
57class MacroReference(PydanticModel): 58 """A reference to a macro.""" 59 60 type: t.Literal["macro"] = "macro" 61 path: Path 62 range: Range 63 target_range: Range 64 markdown_description: t.Optional[str] = None
A reference to a macro.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
73def extract_references_from_query( 74 query: exp.Expression, 75 context: t.Union["Context", "GenericContext[t.Any]"], 76 document_path: Path, 77 read_file: t.List[str], 78 depends_on: t.Set[str], 79 dialect: t.Optional[str] = None, 80) -> t.List[Reference]: 81 # Build a scope tree to properly handle nested CTEs 82 try: 83 query = normalize_identifiers(query.copy(), dialect=dialect) 84 root_scope = build_scope(query) 85 except Exception: 86 root_scope = None 87 88 references: t.List[Reference] = [] 89 if not root_scope: 90 return references 91 92 # Traverse all scopes to find CTE definitions and table references 93 for scope in root_scope.traverse(): 94 for table in scope.tables: 95 table_name = table.name 96 97 # Check if this table reference is a CTE in the current scope 98 if cte_scope := scope.cte_sources.get(table_name): 99 cte = cte_scope.expression.parent 100 alias = cte.args["alias"] 101 if isinstance(alias, exp.TableAlias): 102 identifier = alias.this 103 if isinstance(identifier, exp.Identifier): 104 target_range_sqlmesh = TokenPositionDetails.from_meta( 105 identifier.meta 106 ).to_range(read_file) 107 table_range_sqlmesh = TokenPositionDetails.from_meta( 108 table.this.meta 109 ).to_range(read_file) 110 111 references.append( 112 CTEReference( 113 path=document_path, # Same file 114 range=table_range_sqlmesh, 115 target_range=target_range_sqlmesh, 116 ) 117 ) 118 119 column_references = _process_column_references( 120 scope=scope, 121 reference_name=table.name, 122 read_file=read_file, 123 referenced_model_path=document_path, 124 description="", 125 reference_type="cte", 126 cte_target_range=target_range_sqlmesh, 127 ) 128 references.extend(column_references) 129 continue 130 131 # For non-CTE tables, process these as before (external model references) 132 # Normalize the table reference 133 unaliased = table.copy() 134 if unaliased.args.get("alias") is not None: 135 unaliased.set("alias", None) 136 reference_name = unaliased.sql(dialect=dialect) 137 try: 138 normalized_reference_name = normalize_model_name( 139 reference_name, 140 default_catalog=context.default_catalog, 141 dialect=dialect, 142 ) 143 if normalized_reference_name not in depends_on: 144 continue 145 except Exception: 146 # Skip references that cannot be normalized 147 continue 148 149 # Get the referenced model uri 150 referenced_model = context.get_model( 151 model_or_snapshot=normalized_reference_name, raise_if_missing=False 152 ) 153 if referenced_model is None: 154 # Extract metadata for positioning 155 table_meta = TokenPositionDetails.from_meta(table.this.meta) 156 table_range_sqlmesh = table_meta.to_range(read_file) 157 start_pos_sqlmesh = table_range_sqlmesh.start 158 end_pos_sqlmesh = table_range_sqlmesh.end 159 160 # If there's a catalog or database qualifier, adjust the start position 161 catalog_or_db = table.args.get("catalog") or table.args.get("db") 162 if catalog_or_db is not None: 163 catalog_or_db_meta = TokenPositionDetails.from_meta(catalog_or_db.meta) 164 catalog_or_db_range_sqlmesh = catalog_or_db_meta.to_range(read_file) 165 start_pos_sqlmesh = catalog_or_db_range_sqlmesh.start 166 167 references.append( 168 ExternalModelReference( 169 range=Range( 170 start=start_pos_sqlmesh, 171 end=end_pos_sqlmesh, 172 ), 173 markdown_description="Unregistered external model", 174 ) 175 ) 176 continue 177 referenced_model_path = referenced_model._path 178 if referenced_model_path is None: 179 continue 180 # Check whether the path exists 181 if not referenced_model_path.is_file(): 182 continue 183 184 # Extract metadata for positioning 185 table_meta = TokenPositionDetails.from_meta(table.this.meta) 186 table_range_sqlmesh = table_meta.to_range(read_file) 187 start_pos_sqlmesh = table_range_sqlmesh.start 188 end_pos_sqlmesh = table_range_sqlmesh.end 189 190 # If there's a catalog or database qualifier, adjust the start position 191 catalog_or_db = table.args.get("catalog") or table.args.get("db") 192 if catalog_or_db is not None: 193 catalog_or_db_meta = TokenPositionDetails.from_meta(catalog_or_db.meta) 194 catalog_or_db_range_sqlmesh = catalog_or_db_meta.to_range(read_file) 195 start_pos_sqlmesh = catalog_or_db_range_sqlmesh.start 196 197 description = generate_markdown_description(referenced_model) 198 199 # For external models in YAML files, find the specific model block 200 if isinstance(referenced_model, ExternalModel): 201 yaml_target_range: t.Optional[Range] = None 202 if ( 203 referenced_model_path.suffix in (".yaml", ".yml") 204 and referenced_model_path.is_file() 205 ): 206 yaml_target_range = _get_yaml_model_range( 207 referenced_model_path, referenced_model.name 208 ) 209 references.append( 210 ExternalModelReference( 211 path=referenced_model_path, 212 range=Range( 213 start=start_pos_sqlmesh, 214 end=end_pos_sqlmesh, 215 ), 216 markdown_description=description, 217 target_range=yaml_target_range, 218 ) 219 ) 220 221 column_references = _process_column_references( 222 scope=scope, 223 reference_name=normalized_reference_name, 224 read_file=read_file, 225 referenced_model_path=referenced_model_path, 226 description=description, 227 yaml_target_range=yaml_target_range, 228 reference_type="external_model", 229 default_catalog=context.default_catalog, 230 dialect=dialect, 231 ) 232 references.extend(column_references) 233 else: 234 references.append( 235 ModelReference( 236 path=referenced_model_path, 237 range=Range( 238 start=start_pos_sqlmesh, 239 end=end_pos_sqlmesh, 240 ), 241 markdown_description=description, 242 ) 243 ) 244 245 column_references = _process_column_references( 246 scope=scope, 247 reference_name=normalized_reference_name, 248 read_file=read_file, 249 referenced_model_path=referenced_model_path, 250 description=description, 251 reference_type="model", 252 default_catalog=context.default_catalog, 253 dialect=dialect, 254 ) 255 references.extend(column_references) 256 257 return references
260def generate_markdown_description( 261 model: t.Union[SqlModel, ExternalModel, PythonModel, SeedModel], 262) -> t.Optional[str]: 263 description = model.description 264 columns = model.columns_to_types 265 column_descriptions = model.column_descriptions 266 267 if columns is None: 268 return description or None 269 270 columns_table = "\n".join( 271 [ 272 f"| {column} | {column_type} | {column_descriptions.get(column, '')} |" 273 for column, column_type in columns.items() 274 ] 275 ) 276 277 table_header = "| Column | Type | Description |\n|--------|------|-------------|\n" 278 columns_text = table_header + columns_table 279 return f"{description}\n\n{columns_text}" if description else columns_text
397def get_yaml_model_name_ranges(path: Path) -> t.Optional[t.Dict[str, Range]]: 398 """ 399 Get the ranges of all model names in a YAML file. 400 401 Args: 402 path: Path to the YAML file 403 404 Returns: 405 A dictionary mapping model names to their ranges in the YAML file. 406 """ 407 yaml = YAML() 408 with path.open("r", encoding="utf-8") as f: 409 data = yaml.load(f) 410 411 if not isinstance(data, list): 412 return None 413 414 model_name_ranges = {} 415 for item in data: 416 if isinstance(item, dict): 417 position_data = item.lc.data["name"] # type: ignore 418 start = Position(line=position_data[2], character=position_data[3]) 419 end = Position(line=position_data[2], character=position_data[3] + len(item["name"])) 420 name = item.get("name") 421 if not name: 422 continue 423 model_name_ranges[name] = Range(start=start, end=end) 424 425 return model_name_ranges
Get the ranges of all model names in a YAML file.
Arguments:
- path: Path to the YAML file
Returns:
A dictionary mapping model names to their ranges in the YAML file.