sqlmesh.core.linter.helpers
1from pathlib import Path 2 3from sqlmesh.core.linter.rule import Range, Position 4from sqlmesh.utils.pydantic import PydanticModel 5from sqlglot import tokenize, TokenType, Token 6import typing as t 7 8 9class TokenPositionDetails(PydanticModel): 10 """ 11 Details about a token's position in the source code in the structure provided by SQLGlot. 12 13 Attributes: 14 line (int): The line that the token ends on. 15 col (int): The column that the token ends on. 16 start (int): The start index of the token. 17 end (int): The ending index of the token. 18 """ 19 20 line: int 21 col: int 22 start: int 23 end: int 24 25 @staticmethod 26 def from_meta(meta: t.Dict[str, int]) -> "TokenPositionDetails": 27 return TokenPositionDetails( 28 line=meta["line"], 29 col=meta["col"], 30 start=meta["start"], 31 end=meta["end"], 32 ) 33 34 def to_range(self, read_file: t.Optional[t.List[str]]) -> Range: 35 """ 36 Convert a TokenPositionDetails object to a Range object. 37 38 In the circumstances where the token's start and end positions are the same, 39 there is no need for a read_file parameter, as the range can be derived from the token's 40 line and column. This is an optimization to avoid unnecessary file reads and should 41 only be used when the token represents a single character or position in the file. 42 43 If the token's start and end positions are different, the read_file parameter is required. 44 45 :param read_file: List of lines from the file. Optional 46 :return: A Range object representing the token's position 47 """ 48 if self.start == self.end: 49 # If the start and end positions are the same, we can create a range directly 50 return Range( 51 start=Position(line=self.line - 1, character=self.col - 1), 52 end=Position(line=self.line - 1, character=self.col), 53 ) 54 55 if read_file is None: 56 raise ValueError("read_file must be provided when start and end positions differ.") 57 58 # Convert from 1-indexed to 0-indexed for line only 59 end_line_0 = self.line - 1 60 end_col_0 = self.col 61 62 # Find the start line and column by counting backwards from the end position 63 start_pos = self.start 64 end_pos = self.end 65 66 # Initialize with the end position 67 start_line_0 = end_line_0 68 start_col_0 = end_col_0 - (end_pos - start_pos + 1) 69 70 # If start_col_0 is negative, we need to go back to previous lines 71 while start_col_0 < 0 and start_line_0 > 0: 72 start_line_0 -= 1 73 start_col_0 += len(read_file[start_line_0]) 74 # Account for newline character 75 if start_col_0 >= 0: 76 break 77 start_col_0 += 1 # For the newline character 78 79 # Ensure we don't have negative values 80 start_col_0 = max(0, start_col_0) 81 return Range( 82 start=Position(line=start_line_0, character=start_col_0), 83 end=Position(line=end_line_0, character=end_col_0), 84 ) 85 86 87def read_range_from_string(content: str, text_range: Range) -> str: 88 lines = content.splitlines(keepends=False) 89 90 # Ensure the range is within bounds 91 start_line = max(0, text_range.start.line) 92 end_line = min(len(lines), text_range.end.line + 1) 93 94 if start_line >= end_line: 95 return "" 96 97 # Extract the relevant portions of each line 98 result = [] 99 for i in range(start_line, end_line): 100 line = lines[i] 101 start_char = text_range.start.character if i == text_range.start.line else 0 102 end_char = text_range.end.character if i == text_range.end.line else len(line) 103 result.append(line[start_char:end_char]) 104 105 return "".join(result) 106 107 108def read_range_from_file(file: Path, text_range: Range) -> str: 109 """ 110 Read the file and return the content within the specified range. 111 112 Args: 113 file: Path to the file to read 114 text_range: The range of text to extract 115 116 Returns: 117 The content within the specified range 118 """ 119 with file.open("r", encoding="utf-8") as f: 120 lines = f.readlines() 121 122 return read_range_from_string("".join(lines), text_range) 123 124 125def get_start_and_end_of_model_block( 126 tokens: t.List[Token], 127) -> t.Optional[t.Tuple[int, int]]: 128 """ 129 Returns the start and end tokens of the MODEL block in an SQL file. 130 The MODEL block is defined as the first occurrence of the keyword "MODEL" followed by 131 an opening parenthesis and a closing parenthesis that matches the opening one. 132 """ 133 # 1) Find the MODEL token 134 try: 135 model_idx = next( 136 i 137 for i, tok in enumerate(tokens) 138 if tok.token_type is TokenType.VAR and tok.text.upper() == "MODEL" 139 ) 140 except StopIteration: 141 return None 142 143 # 2) Find the opening parenthesis for the MODEL properties list 144 try: 145 lparen_idx = next( 146 i 147 for i in range(model_idx + 1, len(tokens)) 148 if tokens[i].token_type is TokenType.L_PAREN 149 ) 150 except StopIteration: 151 return None 152 153 # 3) Find the matching closing parenthesis by looking for the first semicolon after 154 # the opening parenthesis and assuming the MODEL block ends there. 155 try: 156 closing_semicolon = next( 157 i 158 for i in range(lparen_idx + 1, len(tokens)) 159 if tokens[i].token_type is TokenType.SEMICOLON 160 ) 161 # If we find a semicolon, we can assume the MODEL block ends there 162 rparen_idx = closing_semicolon - 1 163 if tokens[rparen_idx].token_type is TokenType.R_PAREN: 164 return (lparen_idx, rparen_idx) 165 return None 166 except StopIteration: 167 return None 168 169 170def get_range_of_model_block( 171 sql: str, 172 dialect: str, 173) -> t.Optional[Range]: 174 """ 175 Get the range of the model block in an SQL file, 176 """ 177 tokens = tokenize(sql, dialect=dialect) 178 block = get_start_and_end_of_model_block(tokens) 179 if not block: 180 return None 181 (start_idx, end_idx) = block 182 start = tokens[start_idx - 1] 183 end = tokens[end_idx + 1] 184 start_position = TokenPositionDetails( 185 line=start.line, 186 col=start.col, 187 start=start.start, 188 end=start.end, 189 ) 190 end_position = TokenPositionDetails( 191 line=end.line, 192 col=end.col, 193 start=end.start, 194 end=end.end, 195 ) 196 splitlines = sql.splitlines() 197 return Range( 198 start=start_position.to_range(splitlines).start, 199 end=end_position.to_range(splitlines).end, 200 ) 201 202 203def get_range_of_a_key_in_model_block( 204 sql: str, 205 dialect: str, 206 key: str, 207) -> t.Optional[t.Tuple[Range, Range]]: 208 """ 209 Get the ranges of a specific key and its value in the MODEL block of an SQL file. 210 211 Returns a tuple of (key_range, value_range) if found, otherwise None. 212 """ 213 tokens = tokenize(sql, dialect=dialect) 214 if not tokens: 215 return None 216 217 block = get_start_and_end_of_model_block(tokens) 218 if not block: 219 return None 220 (lparen_idx, rparen_idx) = block 221 222 # 4) Scan within the MODEL property list for the key at top-level (depth == 1) 223 # Initialize depth to 1 since we're inside the first parentheses 224 depth = 1 225 for i in range(lparen_idx + 1, rparen_idx): 226 tok = tokens[i] 227 tt = tok.token_type 228 229 if tt is TokenType.L_PAREN: 230 depth += 1 231 continue 232 if tt is TokenType.R_PAREN: 233 depth -= 1 234 # If we somehow exit before rparen_idx, stop early 235 if depth <= 0: 236 break 237 continue 238 239 if depth == 1 and tt is TokenType.VAR and tok.text.upper() == key.upper(): 240 # Validate key position: it should immediately follow '(' or ',' at top level 241 prev_idx = i - 1 242 prev_tt = tokens[prev_idx].token_type if prev_idx >= 0 else None 243 if prev_tt not in (TokenType.L_PAREN, TokenType.COMMA): 244 continue 245 246 # Key range 247 lines = sql.splitlines() 248 key_start = TokenPositionDetails( 249 line=tok.line, col=tok.col, start=tok.start, end=tok.end 250 ) 251 key_range = key_start.to_range(lines) 252 253 value_start_idx = i + 1 254 if value_start_idx >= rparen_idx: 255 return None 256 257 # Walk to the end of the value expression: until top-level comma or closing paren 258 # Track internal nesting for (), [], {} 259 nested = 0 260 j = value_start_idx 261 value_end_idx = value_start_idx 262 263 def is_open(t: TokenType) -> bool: 264 return t in (TokenType.L_PAREN, TokenType.L_BRACE, TokenType.L_BRACKET) 265 266 def is_close(t: TokenType) -> bool: 267 return t in (TokenType.R_PAREN, TokenType.R_BRACE, TokenType.R_BRACKET) 268 269 while j < rparen_idx: 270 ttype = tokens[j].token_type 271 if is_open(ttype): 272 nested += 1 273 elif is_close(ttype): 274 nested -= 1 275 276 # End of value: at top-level (nested == 0) encountering a comma or the end paren 277 if nested == 0 and ( 278 ttype is TokenType.COMMA or (ttype is TokenType.R_PAREN and depth == 1) 279 ): 280 # For comma, don't include it in the value range 281 # For closing paren, include it only if it's part of the value structure 282 if ttype is TokenType.COMMA: 283 # Don't include the comma in the value range 284 break 285 else: 286 # Include the closing parenthesis in the value range 287 value_end_idx = j 288 break 289 290 value_end_idx = j 291 j += 1 292 293 value_start_tok = tokens[value_start_idx] 294 value_end_tok = tokens[value_end_idx] 295 296 value_start_pos = TokenPositionDetails( 297 line=value_start_tok.line, 298 col=value_start_tok.col, 299 start=value_start_tok.start, 300 end=value_start_tok.end, 301 ) 302 value_end_pos = TokenPositionDetails( 303 line=value_end_tok.line, 304 col=value_end_tok.col, 305 start=value_end_tok.start, 306 end=value_end_tok.end, 307 ) 308 value_range = Range( 309 start=value_start_pos.to_range(lines).start, 310 end=value_end_pos.to_range(lines).end, 311 ) 312 313 return (key_range, value_range) 314 315 return None
10class TokenPositionDetails(PydanticModel): 11 """ 12 Details about a token's position in the source code in the structure provided by SQLGlot. 13 14 Attributes: 15 line (int): The line that the token ends on. 16 col (int): The column that the token ends on. 17 start (int): The start index of the token. 18 end (int): The ending index of the token. 19 """ 20 21 line: int 22 col: int 23 start: int 24 end: int 25 26 @staticmethod 27 def from_meta(meta: t.Dict[str, int]) -> "TokenPositionDetails": 28 return TokenPositionDetails( 29 line=meta["line"], 30 col=meta["col"], 31 start=meta["start"], 32 end=meta["end"], 33 ) 34 35 def to_range(self, read_file: t.Optional[t.List[str]]) -> Range: 36 """ 37 Convert a TokenPositionDetails object to a Range object. 38 39 In the circumstances where the token's start and end positions are the same, 40 there is no need for a read_file parameter, as the range can be derived from the token's 41 line and column. This is an optimization to avoid unnecessary file reads and should 42 only be used when the token represents a single character or position in the file. 43 44 If the token's start and end positions are different, the read_file parameter is required. 45 46 :param read_file: List of lines from the file. Optional 47 :return: A Range object representing the token's position 48 """ 49 if self.start == self.end: 50 # If the start and end positions are the same, we can create a range directly 51 return Range( 52 start=Position(line=self.line - 1, character=self.col - 1), 53 end=Position(line=self.line - 1, character=self.col), 54 ) 55 56 if read_file is None: 57 raise ValueError("read_file must be provided when start and end positions differ.") 58 59 # Convert from 1-indexed to 0-indexed for line only 60 end_line_0 = self.line - 1 61 end_col_0 = self.col 62 63 # Find the start line and column by counting backwards from the end position 64 start_pos = self.start 65 end_pos = self.end 66 67 # Initialize with the end position 68 start_line_0 = end_line_0 69 start_col_0 = end_col_0 - (end_pos - start_pos + 1) 70 71 # If start_col_0 is negative, we need to go back to previous lines 72 while start_col_0 < 0 and start_line_0 > 0: 73 start_line_0 -= 1 74 start_col_0 += len(read_file[start_line_0]) 75 # Account for newline character 76 if start_col_0 >= 0: 77 break 78 start_col_0 += 1 # For the newline character 79 80 # Ensure we don't have negative values 81 start_col_0 = max(0, start_col_0) 82 return Range( 83 start=Position(line=start_line_0, character=start_col_0), 84 end=Position(line=end_line_0, character=end_col_0), 85 )
Details about a token's position in the source code in the structure provided by SQLGlot.
Attributes:
- line (int): The line that the token ends on.
- col (int): The column that the token ends on.
- start (int): The start index of the token.
- end (int): The ending index of the token.
35 def to_range(self, read_file: t.Optional[t.List[str]]) -> Range: 36 """ 37 Convert a TokenPositionDetails object to a Range object. 38 39 In the circumstances where the token's start and end positions are the same, 40 there is no need for a read_file parameter, as the range can be derived from the token's 41 line and column. This is an optimization to avoid unnecessary file reads and should 42 only be used when the token represents a single character or position in the file. 43 44 If the token's start and end positions are different, the read_file parameter is required. 45 46 :param read_file: List of lines from the file. Optional 47 :return: A Range object representing the token's position 48 """ 49 if self.start == self.end: 50 # If the start and end positions are the same, we can create a range directly 51 return Range( 52 start=Position(line=self.line - 1, character=self.col - 1), 53 end=Position(line=self.line - 1, character=self.col), 54 ) 55 56 if read_file is None: 57 raise ValueError("read_file must be provided when start and end positions differ.") 58 59 # Convert from 1-indexed to 0-indexed for line only 60 end_line_0 = self.line - 1 61 end_col_0 = self.col 62 63 # Find the start line and column by counting backwards from the end position 64 start_pos = self.start 65 end_pos = self.end 66 67 # Initialize with the end position 68 start_line_0 = end_line_0 69 start_col_0 = end_col_0 - (end_pos - start_pos + 1) 70 71 # If start_col_0 is negative, we need to go back to previous lines 72 while start_col_0 < 0 and start_line_0 > 0: 73 start_line_0 -= 1 74 start_col_0 += len(read_file[start_line_0]) 75 # Account for newline character 76 if start_col_0 >= 0: 77 break 78 start_col_0 += 1 # For the newline character 79 80 # Ensure we don't have negative values 81 start_col_0 = max(0, start_col_0) 82 return Range( 83 start=Position(line=start_line_0, character=start_col_0), 84 end=Position(line=end_line_0, character=end_col_0), 85 )
Convert a TokenPositionDetails object to a Range object.
In the circumstances where the token's start and end positions are the same, there is no need for a read_file parameter, as the range can be derived from the token's line and column. This is an optimization to avoid unnecessary file reads and should only be used when the token represents a single character or position in the file.
If the token's start and end positions are different, the read_file parameter is required.
Parameters
- read_file: List of lines from the file. Optional
Returns
A Range object representing the token's position
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
88def read_range_from_string(content: str, text_range: Range) -> str: 89 lines = content.splitlines(keepends=False) 90 91 # Ensure the range is within bounds 92 start_line = max(0, text_range.start.line) 93 end_line = min(len(lines), text_range.end.line + 1) 94 95 if start_line >= end_line: 96 return "" 97 98 # Extract the relevant portions of each line 99 result = [] 100 for i in range(start_line, end_line): 101 line = lines[i] 102 start_char = text_range.start.character if i == text_range.start.line else 0 103 end_char = text_range.end.character if i == text_range.end.line else len(line) 104 result.append(line[start_char:end_char]) 105 106 return "".join(result)
109def read_range_from_file(file: Path, text_range: Range) -> str: 110 """ 111 Read the file and return the content within the specified range. 112 113 Args: 114 file: Path to the file to read 115 text_range: The range of text to extract 116 117 Returns: 118 The content within the specified range 119 """ 120 with file.open("r", encoding="utf-8") as f: 121 lines = f.readlines() 122 123 return read_range_from_string("".join(lines), text_range)
Read the file and return the content within the specified range.
Arguments:
- file: Path to the file to read
- text_range: The range of text to extract
Returns:
The content within the specified range
126def get_start_and_end_of_model_block( 127 tokens: t.List[Token], 128) -> t.Optional[t.Tuple[int, int]]: 129 """ 130 Returns the start and end tokens of the MODEL block in an SQL file. 131 The MODEL block is defined as the first occurrence of the keyword "MODEL" followed by 132 an opening parenthesis and a closing parenthesis that matches the opening one. 133 """ 134 # 1) Find the MODEL token 135 try: 136 model_idx = next( 137 i 138 for i, tok in enumerate(tokens) 139 if tok.token_type is TokenType.VAR and tok.text.upper() == "MODEL" 140 ) 141 except StopIteration: 142 return None 143 144 # 2) Find the opening parenthesis for the MODEL properties list 145 try: 146 lparen_idx = next( 147 i 148 for i in range(model_idx + 1, len(tokens)) 149 if tokens[i].token_type is TokenType.L_PAREN 150 ) 151 except StopIteration: 152 return None 153 154 # 3) Find the matching closing parenthesis by looking for the first semicolon after 155 # the opening parenthesis and assuming the MODEL block ends there. 156 try: 157 closing_semicolon = next( 158 i 159 for i in range(lparen_idx + 1, len(tokens)) 160 if tokens[i].token_type is TokenType.SEMICOLON 161 ) 162 # If we find a semicolon, we can assume the MODEL block ends there 163 rparen_idx = closing_semicolon - 1 164 if tokens[rparen_idx].token_type is TokenType.R_PAREN: 165 return (lparen_idx, rparen_idx) 166 return None 167 except StopIteration: 168 return None
Returns the start and end tokens of the MODEL block in an SQL file. The MODEL block is defined as the first occurrence of the keyword "MODEL" followed by an opening parenthesis and a closing parenthesis that matches the opening one.
171def get_range_of_model_block( 172 sql: str, 173 dialect: str, 174) -> t.Optional[Range]: 175 """ 176 Get the range of the model block in an SQL file, 177 """ 178 tokens = tokenize(sql, dialect=dialect) 179 block = get_start_and_end_of_model_block(tokens) 180 if not block: 181 return None 182 (start_idx, end_idx) = block 183 start = tokens[start_idx - 1] 184 end = tokens[end_idx + 1] 185 start_position = TokenPositionDetails( 186 line=start.line, 187 col=start.col, 188 start=start.start, 189 end=start.end, 190 ) 191 end_position = TokenPositionDetails( 192 line=end.line, 193 col=end.col, 194 start=end.start, 195 end=end.end, 196 ) 197 splitlines = sql.splitlines() 198 return Range( 199 start=start_position.to_range(splitlines).start, 200 end=end_position.to_range(splitlines).end, 201 )
Get the range of the model block in an SQL file,
204def get_range_of_a_key_in_model_block( 205 sql: str, 206 dialect: str, 207 key: str, 208) -> t.Optional[t.Tuple[Range, Range]]: 209 """ 210 Get the ranges of a specific key and its value in the MODEL block of an SQL file. 211 212 Returns a tuple of (key_range, value_range) if found, otherwise None. 213 """ 214 tokens = tokenize(sql, dialect=dialect) 215 if not tokens: 216 return None 217 218 block = get_start_and_end_of_model_block(tokens) 219 if not block: 220 return None 221 (lparen_idx, rparen_idx) = block 222 223 # 4) Scan within the MODEL property list for the key at top-level (depth == 1) 224 # Initialize depth to 1 since we're inside the first parentheses 225 depth = 1 226 for i in range(lparen_idx + 1, rparen_idx): 227 tok = tokens[i] 228 tt = tok.token_type 229 230 if tt is TokenType.L_PAREN: 231 depth += 1 232 continue 233 if tt is TokenType.R_PAREN: 234 depth -= 1 235 # If we somehow exit before rparen_idx, stop early 236 if depth <= 0: 237 break 238 continue 239 240 if depth == 1 and tt is TokenType.VAR and tok.text.upper() == key.upper(): 241 # Validate key position: it should immediately follow '(' or ',' at top level 242 prev_idx = i - 1 243 prev_tt = tokens[prev_idx].token_type if prev_idx >= 0 else None 244 if prev_tt not in (TokenType.L_PAREN, TokenType.COMMA): 245 continue 246 247 # Key range 248 lines = sql.splitlines() 249 key_start = TokenPositionDetails( 250 line=tok.line, col=tok.col, start=tok.start, end=tok.end 251 ) 252 key_range = key_start.to_range(lines) 253 254 value_start_idx = i + 1 255 if value_start_idx >= rparen_idx: 256 return None 257 258 # Walk to the end of the value expression: until top-level comma or closing paren 259 # Track internal nesting for (), [], {} 260 nested = 0 261 j = value_start_idx 262 value_end_idx = value_start_idx 263 264 def is_open(t: TokenType) -> bool: 265 return t in (TokenType.L_PAREN, TokenType.L_BRACE, TokenType.L_BRACKET) 266 267 def is_close(t: TokenType) -> bool: 268 return t in (TokenType.R_PAREN, TokenType.R_BRACE, TokenType.R_BRACKET) 269 270 while j < rparen_idx: 271 ttype = tokens[j].token_type 272 if is_open(ttype): 273 nested += 1 274 elif is_close(ttype): 275 nested -= 1 276 277 # End of value: at top-level (nested == 0) encountering a comma or the end paren 278 if nested == 0 and ( 279 ttype is TokenType.COMMA or (ttype is TokenType.R_PAREN and depth == 1) 280 ): 281 # For comma, don't include it in the value range 282 # For closing paren, include it only if it's part of the value structure 283 if ttype is TokenType.COMMA: 284 # Don't include the comma in the value range 285 break 286 else: 287 # Include the closing parenthesis in the value range 288 value_end_idx = j 289 break 290 291 value_end_idx = j 292 j += 1 293 294 value_start_tok = tokens[value_start_idx] 295 value_end_tok = tokens[value_end_idx] 296 297 value_start_pos = TokenPositionDetails( 298 line=value_start_tok.line, 299 col=value_start_tok.col, 300 start=value_start_tok.start, 301 end=value_start_tok.end, 302 ) 303 value_end_pos = TokenPositionDetails( 304 line=value_end_tok.line, 305 col=value_end_tok.col, 306 start=value_end_tok.start, 307 end=value_end_tok.end, 308 ) 309 value_range = Range( 310 start=value_start_pos.to_range(lines).start, 311 end=value_end_pos.to_range(lines).end, 312 ) 313 314 return (key_range, value_range) 315 316 return None
Get the ranges of a specific key and its value in the MODEL block of an SQL file.
Returns a tuple of (key_range, value_range) if found, otherwise None.