sqlmesh.utils.metaprogramming
1from __future__ import annotations 2 3import ast 4import dis 5import importlib 6import inspect 7import linecache 8import logging 9import os 10import re 11import sys 12import textwrap 13import types 14import typing as t 15from dataclasses import dataclass 16from enum import Enum 17from numbers import Number 18from pathlib import Path 19 20from astor import to_source 21 22from sqlmesh.core import constants as c 23from sqlmesh.utils import format_exception, unique 24from sqlmesh.utils.errors import SQLMeshError 25from sqlmesh.utils.pydantic import PydanticModel 26 27logger = logging.getLogger(__name__) 28 29 30IGNORE_DECORATORS = {"macro", "model", "signal"} 31SERIALIZABLE_CALLABLES = (type, types.FunctionType) 32LITERALS = (Number, str, bytes, tuple, list, dict, set, bool) 33 34 35def _is_relative_to(path: t.Optional[Path | str], other: t.Optional[Path | str]) -> bool: 36 if path is None or other is None: 37 return False 38 39 if isinstance(path, str): 40 path = Path(path) 41 if isinstance(other, str): 42 other = Path(other) 43 44 if "site-packages" in str(path) or not path.exists() or not other.exists(): 45 return False 46 47 try: 48 path.absolute().relative_to(other.absolute()) 49 return True 50 except ValueError: 51 return False 52 53 54def _code_globals(code: types.CodeType) -> t.Dict[str, None]: 55 variables = { 56 instruction.argval: None 57 for instruction in dis.get_instructions(code) 58 if instruction.opname == "LOAD_GLOBAL" 59 } 60 61 for const in code.co_consts: 62 if isinstance(const, types.CodeType): 63 variables.update(_code_globals(const)) 64 65 return variables 66 67 68def _globals_match(obj1: t.Any, obj2: t.Any) -> bool: 69 return type(obj1) == type(obj2) and ( 70 obj1 == obj2 71 or ( 72 getattr(obj1, "__module__", None) == getattr(obj2, "__module__", None) 73 and getattr(obj1, "__name__", None) == getattr(obj2, "__name__", None) 74 ) 75 ) 76 77 78def func_globals(func: t.Callable) -> t.Dict[str, t.Any]: 79 """Finds all global references and closures in a function and nested functions. 80 81 This function treats closures as global variables, which could cause problems in the future. 82 83 Args: 84 func: The function to introspect 85 86 Returns: 87 A dictionary of all global references. 88 """ 89 variables = {} 90 91 if hasattr(func, "__code__"): 92 root_node = parse_source(func) 93 94 func_args = next(node for node in ast.walk(root_node) if isinstance(node, ast.arguments)) 95 arg_defaults = (d for d in func_args.defaults + func_args.kw_defaults if d is not None) 96 97 # ast.Name corresponds to variable references, such as foo or x.foo. The former is 98 # represented as Name(id=foo), and the latter as Attribute(value=Name(id=x) attr=foo) 99 arg_globals = [ 100 n.id for default in arg_defaults for n in ast.walk(default) if isinstance(n, ast.Name) 101 ] 102 103 code = func.__code__ 104 for var in ( 105 arg_globals + list(_code_globals(code)) + decorator_vars(func, root_node=root_node) 106 ): 107 if var in func.__globals__: 108 variables[var] = func.__globals__[var] 109 110 if func.__closure__: 111 for var, value in zip(code.co_freevars, func.__closure__): 112 variables[var] = value.cell_contents 113 114 return variables 115 116 117class ClassFoundException(Exception): 118 pass 119 120 121class _ClassFinder(ast.NodeVisitor): 122 def __init__(self, qualname: str) -> None: 123 self.stack: t.List[str] = [] 124 self.qualname = qualname 125 126 def visit_FunctionDef(self, node: ast.FunctionDef) -> None: 127 self.stack.append(node.name) 128 self.stack.append("<locals>") 129 self.generic_visit(node) 130 self.stack.pop() 131 self.stack.pop() 132 133 visit_AsyncFunctionDef = visit_FunctionDef # type: ignore 134 135 def visit_ClassDef(self, node: ast.ClassDef) -> None: 136 self.stack.append(node.name) 137 if self.qualname == ".".join(self.stack): 138 # Return the decorator for the class if present 139 if node.decorator_list: 140 line_number = node.decorator_list[0].lineno 141 else: 142 line_number = node.lineno 143 144 # decrement by one since lines starts with indexing by zero 145 line_number -= 1 146 raise ClassFoundException(line_number) 147 self.generic_visit(node) 148 self.stack.pop() 149 150 151class _DecoratorDependencyFinder(ast.NodeVisitor): 152 def __init__(self) -> None: 153 self.dependencies: t.List[str] = [] 154 155 def _extract_dependencies(self, node: ast.ClassDef | ast.FunctionDef) -> None: 156 for decorator in node.decorator_list: 157 dependencies: t.List[str] = [] 158 for n in ast.walk(decorator): 159 if isinstance(n, ast.Attribute): 160 dep = n.attr 161 elif isinstance(n, ast.Name): 162 dep = n.id 163 else: 164 continue 165 166 if dep in IGNORE_DECORATORS: 167 dependencies = [] 168 break 169 170 dependencies.append(dep) 171 172 self.dependencies.extend(dependencies) 173 174 def visit_FunctionDef(self, node: ast.FunctionDef) -> None: 175 self._extract_dependencies(node) 176 177 def visit_ClassDef(self, node: ast.ClassDef) -> None: 178 self._extract_dependencies(node) 179 180 visit_AsyncFunctionDef = visit_FunctionDef # type: ignore 181 182 183def getsource(obj: t.Any) -> str: 184 """Get the source of a function or class. 185 186 inspect.getsource doesn't find decorators in python < 3.9 187 https://github.com/python/cpython/commit/696136b993e11b37c4f34d729a0375e5ad544ade 188 """ 189 path = inspect.getsourcefile(obj) 190 if path: 191 module = inspect.getmodule(obj, path) 192 193 if module: 194 lines = linecache.getlines(path, module.__dict__) 195 else: 196 lines = linecache.getlines(path) 197 198 def join_source(lnum: int) -> str: 199 return "".join(inspect.getblock(lines[lnum:])) 200 201 if inspect.isclass(obj): 202 qualname = obj.__qualname__ 203 source = "".join(lines) 204 tree = ast.parse(source) 205 class_finder = _ClassFinder(qualname) 206 try: 207 class_finder.visit(tree) 208 except ClassFoundException as e: 209 return join_source(e.args[0]) 210 elif inspect.isfunction(obj): 211 obj = obj.__code__ 212 if hasattr(obj, "co_firstlineno"): 213 lnum = obj.co_firstlineno - 1 214 pat = re.compile(r"^(\s*def\s)|(\s*async\s+def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)") 215 while lnum > 0: 216 try: 217 line = lines[lnum] 218 except IndexError: 219 raise OSError("lineno is out of bounds") 220 if pat.match(line): 221 break 222 lnum = lnum - 1 223 return join_source(lnum) 224 raise SQLMeshError(f"Cannot find source for {obj}") 225 226 227def parse_source(func: t.Callable) -> ast.Module: 228 """Parse a function and returns an ast node.""" 229 return ast.parse(textwrap.dedent(getsource(func))) 230 231 232def _decorator_name(decorator: ast.expr) -> str: 233 node = decorator 234 if isinstance(decorator, ast.Call): 235 node = decorator.func 236 return node.id if isinstance(node, ast.Name) else "" 237 238 239def decorator_vars(func: t.Callable, root_node: t.Optional[ast.Module] = None) -> t.List[str]: 240 """ 241 Returns a list of all the decorators of a callable, as well as names of objects that 242 are referenced in their argument list. These objects may be transitive dependencies 243 that we need to include in the serialized python environments. 244 """ 245 root_node = root_node or parse_source(func) 246 finder = _DecoratorDependencyFinder() 247 finder.visit(root_node) 248 return unique(finder.dependencies) 249 250 251def normalize_source(obj: t.Any) -> str: 252 """Rewrites an object's source with formatting and doc strings removed by using Python ast. 253 254 Args: 255 obj: The object to fetch source from and convert to a string. 256 257 Returns: 258 A string representation of the normalized function. 259 """ 260 root_node = parse_source(obj) 261 262 for node in ast.walk(root_node): 263 if isinstance(node, (ast.FunctionDef, ast.ClassDef)): 264 for decorator in node.decorator_list: 265 if _decorator_name(decorator) in IGNORE_DECORATORS: 266 node.decorator_list.remove(decorator) 267 268 # remove docstrings 269 body = node.body 270 if body and isinstance(body[0], ast.Expr) and isinstance(body[0].value, ast.Str): 271 node.body = body[1:] 272 273 # remove function return type annotation 274 if isinstance(node, ast.FunctionDef): 275 node.returns = None 276 277 return to_source(root_node).strip() 278 279 280def build_env( 281 obj: t.Any, 282 *, 283 env: t.Dict[str, t.Tuple[t.Any, t.Optional[bool]]], 284 name: str, 285 path: Path, 286 is_metadata_obj: bool = False, 287) -> None: 288 """Fills in env dictionary with all globals needed to execute the object. 289 290 Recursively traverse classes and functions. 291 292 Args: 293 obj: Any python object. 294 env: Dictionary to store the env. 295 name: Name of the object in the env. 296 path: The module path to serialize. Other modules will not be walked and treated as imports. 297 is_metadata_obj: An optional flag that determines whether the input object is metadata-only. 298 """ 299 # We don't rely on `env` to keep track of visited objects, because it's populated in post-order 300 visited: t.Set[str] = set() 301 302 def walk(obj: t.Any, name: str, is_metadata: bool = False) -> None: 303 obj_module = inspect.getmodule(obj) 304 if obj_module and obj_module.__name__ == "builtins": 305 return 306 307 if name in visited: 308 if name not in env or _globals_match(env[name][0], obj): 309 return 310 311 raise SQLMeshError( 312 f"Cannot store {obj} in environment, duplicate definitions found for '{name}'" 313 ) 314 315 visited.add(name) 316 name_missing_from_env = name not in env 317 318 if name_missing_from_env or (not is_metadata and env[name] == (obj, True)): 319 if not name_missing_from_env: 320 # The existing object in the env is "metadata only" but we're walking it again as a 321 # non-"metadata only" dependency, so we update this flag to ensure all transitive 322 # dependencies are also not marked as "metadata only" 323 is_metadata = False 324 325 if hasattr(obj, c.SQLMESH_MACRO): 326 # We only need to add the undecorated code of @macro() functions in env, which 327 # is accessible through the `__wrapped__` attribute added by functools.wraps 328 obj = obj.__wrapped__ 329 elif callable(obj) and not isinstance(obj, SERIALIZABLE_CALLABLES): 330 obj = getattr(obj, "__wrapped__", None) 331 name = getattr(obj, "__name__", "") 332 333 # Callable class instances shouldn't be serialized (e.g. tenacity.Retrying). 334 # We still want to walk the callables they decorate, though 335 if not isinstance(obj, SERIALIZABLE_CALLABLES) or name in env: 336 return 337 338 if ( 339 not obj_module 340 or not hasattr(obj_module, "__file__") 341 or not _is_relative_to(obj_module.__file__, path) 342 ): 343 env[name] = (obj, is_metadata) 344 return 345 346 if inspect.isclass(obj): 347 for var in decorator_vars(obj): 348 if obj_module and var in obj_module.__dict__: 349 walk(obj_module.__dict__[var], var, is_metadata) 350 351 for base in obj.__bases__: 352 walk(base, base.__qualname__, is_metadata) 353 354 for k, v in obj.__dict__.items(): 355 # skip dunder methods bar __init__ as it might contain user defined logic with cross class references 356 if k.startswith("__") and k != "__init__": 357 continue 358 359 # Traverse methods in a class to find global references 360 if isinstance(v, (classmethod, staticmethod)): 361 v = v.__func__ 362 363 if callable(v): 364 # Walk the method if it's part of the object, else it's a global function and we just store it 365 if v.__qualname__.startswith(obj.__qualname__): 366 try: 367 for k, v in func_globals(v).items(): 368 walk(v, k, is_metadata) 369 except (OSError, TypeError): 370 # __init__ may come from built-ins or wrapped callables 371 pass 372 else: 373 walk(v, k, is_metadata) 374 elif callable(obj): 375 for k, v in func_globals(obj).items(): 376 walk(v, k, is_metadata) 377 378 # We store the object in the environment after its dependencies, because otherwise we 379 # could crash at environment hydration time, since dicts are ordered and the top-level 380 # objects would be loaded before their dependencies. 381 env[name] = (obj, is_metadata) 382 elif not _globals_match(env[name][0], obj): 383 raise SQLMeshError( 384 f"Cannot store {obj} in environment, duplicate definitions found for '{name}'" 385 ) 386 387 # The "metadata only" annotation of the object is transitive 388 walk(obj, name, is_metadata_obj or getattr(obj, c.SQLMESH_METADATA, False)) 389 390 391@dataclass 392class SqlValue: 393 """A SQL string representing a generated SQLGlot AST.""" 394 395 sql: str 396 397 398class ExecutableKind(str, Enum): 399 """The kind of of executable. The order of the members is used when serializing the python model to text.""" 400 401 IMPORT = "import" 402 VALUE = "value" 403 DEFINITION = "definition" 404 405 def __lt__(self, other: t.Any) -> bool: 406 if not isinstance(other, ExecutableKind): 407 return NotImplemented 408 values = list(ExecutableKind.__dict__.values()) 409 return values.index(self) < values.index(other) 410 411 def __str__(self) -> str: 412 return self.value 413 414 415class Executable(PydanticModel): 416 payload: str 417 kind: ExecutableKind = ExecutableKind.DEFINITION 418 name: t.Optional[str] = None 419 path: t.Optional[str] = None 420 alias: t.Optional[str] = None 421 is_metadata: t.Optional[bool] = None 422 423 @property 424 def is_definition(self) -> bool: 425 return self.kind == ExecutableKind.DEFINITION 426 427 @property 428 def is_import(self) -> bool: 429 return self.kind == ExecutableKind.IMPORT 430 431 @property 432 def is_value(self) -> bool: 433 return self.kind == ExecutableKind.VALUE 434 435 @classmethod 436 def value( 437 cls, v: t.Any, is_metadata: t.Optional[bool] = None, sort_root_dict: bool = False 438 ) -> Executable: 439 payload = _dict_sort(v) if sort_root_dict else repr(v) 440 return Executable( 441 payload=payload, 442 kind=ExecutableKind.VALUE, 443 is_metadata=is_metadata or None, 444 ) 445 446 447def serialize_env(env: t.Dict[str, t.Any], path: Path) -> t.Dict[str, Executable]: 448 """Serializes a python function into a self contained dictionary. 449 450 Recursively walks a function's globals to store all other references inside of env. 451 452 Args: 453 env: Dictionary to store the env. 454 path: The root path to seralize. Other modules will not be walked and treated as imports. 455 """ 456 serialized = {} 457 458 for k, (v, is_metadata) in env.items(): 459 # We don't store `False` for `is_metadata` to reduce the pydantic model's payload size 460 is_metadata = is_metadata or None 461 462 if isinstance(v, LITERALS) or v is None: 463 serialized[k] = Executable.value(v, is_metadata=is_metadata) 464 elif inspect.ismodule(v): 465 name = v.__name__ 466 if hasattr(v, "__file__") and _is_relative_to(v.__file__, path): 467 raise SQLMeshError( 468 f"Cannot serialize 'import {name}'. Use 'from {name} import ...' instead." 469 ) 470 postfix = "" if name == k else f" as {k}" 471 serialized[k] = Executable( 472 payload=f"import {name}{postfix}", 473 kind=ExecutableKind.IMPORT, 474 is_metadata=is_metadata, 475 ) 476 elif callable(v): 477 name = v.__name__ 478 name = k if name == "<lambda>" else name 479 480 # getfile raises a `TypeError` for built-in modules, classes, or functions 481 # https://docs.python.org/3/library/inspect.html#inspect.getfile 482 try: 483 file_path = Path(inspect.getfile(v)) 484 relative_obj_file_path = _is_relative_to(file_path, path) 485 486 # A callable can be a "wrapper" that is defined in a third-party library [1], in which case the file 487 # containing its definition won't be relative to the project's path. This can lead to serializing 488 # it as a "relative import", such as `from models.some_python_model import foo`, because the `wraps` 489 # decorator preserves the wrapped function's module [2]. Payloads like this are invalid, as they 490 # can result in `ModuleNotFoundError`s when hydrating python environments, e.g. if a project's files 491 # are not available during a scheduled cadence run. 492 # 493 # [1]: https://github.com/jd/tenacity/blob/0d40e76f7d06d631fb127e1ec58c8bd776e70d49/tenacity/__init__.py#L322-L346 494 # [2]: https://github.com/python/cpython/blob/f502c8f6a6db4be27c97a0e5466383d117859b7f/Lib/functools.py#L33-L57 495 if not relative_obj_file_path and (wrapped := getattr(v, "__wrapped__", None)): 496 v = wrapped 497 file_path = Path(inspect.getfile(wrapped)) 498 relative_obj_file_path = _is_relative_to(file_path, path) 499 except TypeError: 500 file_path = None 501 relative_obj_file_path = False 502 503 if relative_obj_file_path: 504 serialized[k] = Executable( 505 name=name, 506 payload=normalize_source(v), 507 kind=ExecutableKind.DEFINITION, 508 # Do `as_posix` to serialize windows path back to POSIX 509 path=t.cast(Path, file_path).relative_to(path.absolute()).as_posix(), 510 alias=k if name != k else None, 511 is_metadata=is_metadata, 512 ) 513 else: 514 serialized[k] = Executable( 515 payload=f"from {v.__module__} import {name}", 516 kind=ExecutableKind.IMPORT, 517 is_metadata=is_metadata, 518 ) 519 else: 520 raise SQLMeshError( 521 f"Object '{v}' cannot be serialized. If it's defined in a library, import the corresponding " 522 "module and reference the object using its fully-qualified name. For example, the datetime " 523 "module's 'UTC' object should be accessed as 'datetime.UTC'." 524 ) 525 526 return serialized 527 528 529def prepare_env( 530 python_env: t.Dict[str, Executable], 531 env: t.Optional[t.Dict[str, t.Any]] = None, 532) -> t.Dict[str, t.Any]: 533 """Prepare a python env by hydrating and executing functions. 534 535 The Python ENV is stored in a json serializable format. 536 Functions and imports are stored as a special data class. 537 538 Args: 539 python_env: The dictionary containing the serialized python environment. 540 env: The dictionary to execute code in. 541 542 Returns: 543 The prepared environment with hydrated functions. 544 """ 545 env = {} if env is None else env 546 547 for name, executable in sorted( 548 python_env.items(), key=lambda item: 0 if item[1].is_import else 1 549 ): 550 if executable.is_value: 551 env[name] = eval(executable.payload) 552 else: 553 exec(executable.payload, env) 554 if executable.alias and executable.name: 555 env[executable.alias] = env[executable.name] 556 557 return env 558 559 560def format_evaluated_code_exception( 561 exception: Exception, 562 python_env: t.Dict[str, Executable], 563) -> str: 564 """Formats exceptions that occur from evaled code. 565 566 Stack traces generated by evaled code lose code context and are difficult to debug. 567 This intercepts the default stack trace and tries to make it debuggable. 568 569 Args: 570 exception: The exception to print the stack trace for. 571 python_env: The environment containing stringified python code. 572 """ 573 tb: t.List[str] = [] 574 indent = "" 575 576 skip_patterns = re.compile( 577 r"Traceback \(most recent call last\):|" 578 r'File ".*?core/model/definition\.py|' 579 r'File ".*?core/snapshot/definition\.py|' 580 r'File ".*?core/macros\.py|' 581 r'File ".*?inspect\.py' 582 ) 583 584 for error_line in format_exception(exception): 585 if skip_patterns.search(error_line): 586 continue 587 588 error_match = re.search("^.*?Error: ", error_line) 589 if error_match: 590 tb.append(f"{indent * 2} {error_line}") 591 continue 592 593 eval_code_match = re.search('File "<string>", line (.*), in (.*)', error_line) 594 if not eval_code_match: 595 tb.append(f"{indent}{error_line}") 596 continue 597 598 line_num = int(eval_code_match.group(1)) 599 func = eval_code_match.group(2) 600 601 if func not in python_env: 602 tb.append(error_line) 603 continue 604 605 executable = python_env[func] 606 indent = error_line[: eval_code_match.start()] 607 608 error_line = ( 609 f"{indent}File '{executable.path}' (or imported file), line {line_num}, in {func}" 610 ) 611 612 code = executable.payload 613 formatted = [] 614 615 for i, code_line in enumerate(code.splitlines()): 616 if i < line_num: 617 pad = len(code_line) - len(code_line.lstrip()) 618 if i + 1 == line_num: 619 formatted.append(f"{code_line[:pad]}{code_line[pad:]}") 620 else: 621 formatted.append(code_line) 622 623 tb.extend( 624 ( 625 error_line, 626 textwrap.indent( 627 os.linesep.join(formatted), 628 indent + " ", 629 ), 630 ) 631 ) 632 633 return os.linesep.join(tb) 634 635 636def print_exception( 637 exception: Exception, 638 python_env: t.Dict[str, Executable], 639 out: t.TextIO = sys.stderr, 640) -> None: 641 """Prints exceptions that occur from evaled code. 642 643 Stack traces generated by evaled code lose code context and are difficult to debug. 644 This intercepts the default stack trace and tries to make it debuggable. 645 646 Args: 647 exception: The exception to print the stack trace for. 648 python_env: The environment containing stringified python code. 649 out: The output stream to write to. 650 """ 651 tb = format_evaluated_code_exception(exception, python_env) 652 out.write(tb) 653 654 655def _dict_sort(obj: t.Any) -> str: 656 try: 657 if isinstance(obj, dict): 658 obj = dict(sorted(obj.items(), key=lambda x: str(x[0]))) 659 except Exception: 660 logger.warning("Failed to sort non-recursive dict", exc_info=True) 661 return repr(obj) 662 663 664def import_python_file(path: Path, relative_base: Path = Path()) -> types.ModuleType: 665 relative_path = path.absolute().relative_to(relative_base.absolute()) 666 module_name = str(relative_path.with_suffix("")).replace(os.path.sep, ".") 667 668 # remove the entire module hierarchy in case they were already loaded 669 parts = module_name.split(".") 670 for i in range(len(parts)): 671 sys.modules.pop(".".join(parts[0 : i + 1]), None) 672 673 return importlib.import_module(module_name)
79def func_globals(func: t.Callable) -> t.Dict[str, t.Any]: 80 """Finds all global references and closures in a function and nested functions. 81 82 This function treats closures as global variables, which could cause problems in the future. 83 84 Args: 85 func: The function to introspect 86 87 Returns: 88 A dictionary of all global references. 89 """ 90 variables = {} 91 92 if hasattr(func, "__code__"): 93 root_node = parse_source(func) 94 95 func_args = next(node for node in ast.walk(root_node) if isinstance(node, ast.arguments)) 96 arg_defaults = (d for d in func_args.defaults + func_args.kw_defaults if d is not None) 97 98 # ast.Name corresponds to variable references, such as foo or x.foo. The former is 99 # represented as Name(id=foo), and the latter as Attribute(value=Name(id=x) attr=foo) 100 arg_globals = [ 101 n.id for default in arg_defaults for n in ast.walk(default) if isinstance(n, ast.Name) 102 ] 103 104 code = func.__code__ 105 for var in ( 106 arg_globals + list(_code_globals(code)) + decorator_vars(func, root_node=root_node) 107 ): 108 if var in func.__globals__: 109 variables[var] = func.__globals__[var] 110 111 if func.__closure__: 112 for var, value in zip(code.co_freevars, func.__closure__): 113 variables[var] = value.cell_contents 114 115 return variables
Finds all global references and closures in a function and nested functions.
This function treats closures as global variables, which could cause problems in the future.
Arguments:
- func: The function to introspect
Returns:
A dictionary of all global references.
Common base class for all non-exit exceptions.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- args
184def getsource(obj: t.Any) -> str: 185 """Get the source of a function or class. 186 187 inspect.getsource doesn't find decorators in python < 3.9 188 https://github.com/python/cpython/commit/696136b993e11b37c4f34d729a0375e5ad544ade 189 """ 190 path = inspect.getsourcefile(obj) 191 if path: 192 module = inspect.getmodule(obj, path) 193 194 if module: 195 lines = linecache.getlines(path, module.__dict__) 196 else: 197 lines = linecache.getlines(path) 198 199 def join_source(lnum: int) -> str: 200 return "".join(inspect.getblock(lines[lnum:])) 201 202 if inspect.isclass(obj): 203 qualname = obj.__qualname__ 204 source = "".join(lines) 205 tree = ast.parse(source) 206 class_finder = _ClassFinder(qualname) 207 try: 208 class_finder.visit(tree) 209 except ClassFoundException as e: 210 return join_source(e.args[0]) 211 elif inspect.isfunction(obj): 212 obj = obj.__code__ 213 if hasattr(obj, "co_firstlineno"): 214 lnum = obj.co_firstlineno - 1 215 pat = re.compile(r"^(\s*def\s)|(\s*async\s+def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)") 216 while lnum > 0: 217 try: 218 line = lines[lnum] 219 except IndexError: 220 raise OSError("lineno is out of bounds") 221 if pat.match(line): 222 break 223 lnum = lnum - 1 224 return join_source(lnum) 225 raise SQLMeshError(f"Cannot find source for {obj}")
Get the source of a function or class.
inspect.getsource doesn't find decorators in python < 3.9 https://github.com/python/cpython/commit/696136b993e11b37c4f34d729a0375e5ad544ade
228def parse_source(func: t.Callable) -> ast.Module: 229 """Parse a function and returns an ast node.""" 230 return ast.parse(textwrap.dedent(getsource(func)))
Parse a function and returns an ast node.
240def decorator_vars(func: t.Callable, root_node: t.Optional[ast.Module] = None) -> t.List[str]: 241 """ 242 Returns a list of all the decorators of a callable, as well as names of objects that 243 are referenced in their argument list. These objects may be transitive dependencies 244 that we need to include in the serialized python environments. 245 """ 246 root_node = root_node or parse_source(func) 247 finder = _DecoratorDependencyFinder() 248 finder.visit(root_node) 249 return unique(finder.dependencies)
Returns a list of all the decorators of a callable, as well as names of objects that are referenced in their argument list. These objects may be transitive dependencies that we need to include in the serialized python environments.
252def normalize_source(obj: t.Any) -> str: 253 """Rewrites an object's source with formatting and doc strings removed by using Python ast. 254 255 Args: 256 obj: The object to fetch source from and convert to a string. 257 258 Returns: 259 A string representation of the normalized function. 260 """ 261 root_node = parse_source(obj) 262 263 for node in ast.walk(root_node): 264 if isinstance(node, (ast.FunctionDef, ast.ClassDef)): 265 for decorator in node.decorator_list: 266 if _decorator_name(decorator) in IGNORE_DECORATORS: 267 node.decorator_list.remove(decorator) 268 269 # remove docstrings 270 body = node.body 271 if body and isinstance(body[0], ast.Expr) and isinstance(body[0].value, ast.Str): 272 node.body = body[1:] 273 274 # remove function return type annotation 275 if isinstance(node, ast.FunctionDef): 276 node.returns = None 277 278 return to_source(root_node).strip()
Rewrites an object's source with formatting and doc strings removed by using Python ast.
Arguments:
- obj: The object to fetch source from and convert to a string.
Returns:
A string representation of the normalized function.
281def build_env( 282 obj: t.Any, 283 *, 284 env: t.Dict[str, t.Tuple[t.Any, t.Optional[bool]]], 285 name: str, 286 path: Path, 287 is_metadata_obj: bool = False, 288) -> None: 289 """Fills in env dictionary with all globals needed to execute the object. 290 291 Recursively traverse classes and functions. 292 293 Args: 294 obj: Any python object. 295 env: Dictionary to store the env. 296 name: Name of the object in the env. 297 path: The module path to serialize. Other modules will not be walked and treated as imports. 298 is_metadata_obj: An optional flag that determines whether the input object is metadata-only. 299 """ 300 # We don't rely on `env` to keep track of visited objects, because it's populated in post-order 301 visited: t.Set[str] = set() 302 303 def walk(obj: t.Any, name: str, is_metadata: bool = False) -> None: 304 obj_module = inspect.getmodule(obj) 305 if obj_module and obj_module.__name__ == "builtins": 306 return 307 308 if name in visited: 309 if name not in env or _globals_match(env[name][0], obj): 310 return 311 312 raise SQLMeshError( 313 f"Cannot store {obj} in environment, duplicate definitions found for '{name}'" 314 ) 315 316 visited.add(name) 317 name_missing_from_env = name not in env 318 319 if name_missing_from_env or (not is_metadata and env[name] == (obj, True)): 320 if not name_missing_from_env: 321 # The existing object in the env is "metadata only" but we're walking it again as a 322 # non-"metadata only" dependency, so we update this flag to ensure all transitive 323 # dependencies are also not marked as "metadata only" 324 is_metadata = False 325 326 if hasattr(obj, c.SQLMESH_MACRO): 327 # We only need to add the undecorated code of @macro() functions in env, which 328 # is accessible through the `__wrapped__` attribute added by functools.wraps 329 obj = obj.__wrapped__ 330 elif callable(obj) and not isinstance(obj, SERIALIZABLE_CALLABLES): 331 obj = getattr(obj, "__wrapped__", None) 332 name = getattr(obj, "__name__", "") 333 334 # Callable class instances shouldn't be serialized (e.g. tenacity.Retrying). 335 # We still want to walk the callables they decorate, though 336 if not isinstance(obj, SERIALIZABLE_CALLABLES) or name in env: 337 return 338 339 if ( 340 not obj_module 341 or not hasattr(obj_module, "__file__") 342 or not _is_relative_to(obj_module.__file__, path) 343 ): 344 env[name] = (obj, is_metadata) 345 return 346 347 if inspect.isclass(obj): 348 for var in decorator_vars(obj): 349 if obj_module and var in obj_module.__dict__: 350 walk(obj_module.__dict__[var], var, is_metadata) 351 352 for base in obj.__bases__: 353 walk(base, base.__qualname__, is_metadata) 354 355 for k, v in obj.__dict__.items(): 356 # skip dunder methods bar __init__ as it might contain user defined logic with cross class references 357 if k.startswith("__") and k != "__init__": 358 continue 359 360 # Traverse methods in a class to find global references 361 if isinstance(v, (classmethod, staticmethod)): 362 v = v.__func__ 363 364 if callable(v): 365 # Walk the method if it's part of the object, else it's a global function and we just store it 366 if v.__qualname__.startswith(obj.__qualname__): 367 try: 368 for k, v in func_globals(v).items(): 369 walk(v, k, is_metadata) 370 except (OSError, TypeError): 371 # __init__ may come from built-ins or wrapped callables 372 pass 373 else: 374 walk(v, k, is_metadata) 375 elif callable(obj): 376 for k, v in func_globals(obj).items(): 377 walk(v, k, is_metadata) 378 379 # We store the object in the environment after its dependencies, because otherwise we 380 # could crash at environment hydration time, since dicts are ordered and the top-level 381 # objects would be loaded before their dependencies. 382 env[name] = (obj, is_metadata) 383 elif not _globals_match(env[name][0], obj): 384 raise SQLMeshError( 385 f"Cannot store {obj} in environment, duplicate definitions found for '{name}'" 386 ) 387 388 # The "metadata only" annotation of the object is transitive 389 walk(obj, name, is_metadata_obj or getattr(obj, c.SQLMESH_METADATA, False))
Fills in env dictionary with all globals needed to execute the object.
Recursively traverse classes and functions.
Arguments:
- obj: Any python object.
- env: Dictionary to store the env.
- name: Name of the object in the env.
- path: The module path to serialize. Other modules will not be walked and treated as imports.
- is_metadata_obj: An optional flag that determines whether the input object is metadata-only.
392@dataclass 393class SqlValue: 394 """A SQL string representing a generated SQLGlot AST.""" 395 396 sql: str
A SQL string representing a generated SQLGlot AST.
399class ExecutableKind(str, Enum): 400 """The kind of of executable. The order of the members is used when serializing the python model to text.""" 401 402 IMPORT = "import" 403 VALUE = "value" 404 DEFINITION = "definition" 405 406 def __lt__(self, other: t.Any) -> bool: 407 if not isinstance(other, ExecutableKind): 408 return NotImplemented 409 values = list(ExecutableKind.__dict__.values()) 410 return values.index(self) < values.index(other) 411 412 def __str__(self) -> str: 413 return self.value
The kind of of executable. The order of the members is used when serializing the python model to text.
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
416class Executable(PydanticModel): 417 payload: str 418 kind: ExecutableKind = ExecutableKind.DEFINITION 419 name: t.Optional[str] = None 420 path: t.Optional[str] = None 421 alias: t.Optional[str] = None 422 is_metadata: t.Optional[bool] = None 423 424 @property 425 def is_definition(self) -> bool: 426 return self.kind == ExecutableKind.DEFINITION 427 428 @property 429 def is_import(self) -> bool: 430 return self.kind == ExecutableKind.IMPORT 431 432 @property 433 def is_value(self) -> bool: 434 return self.kind == ExecutableKind.VALUE 435 436 @classmethod 437 def value( 438 cls, v: t.Any, is_metadata: t.Optional[bool] = None, sort_root_dict: bool = False 439 ) -> Executable: 440 payload = _dict_sort(v) if sort_root_dict else repr(v) 441 return Executable( 442 payload=payload, 443 kind=ExecutableKind.VALUE, 444 is_metadata=is_metadata or None, 445 )
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
- __class_vars__: The names of the class variables defined on the model.
- __private_attributes__: Metadata about the private attributes of the model.
- __signature__: The synthesized
__init__[Signature][inspect.Signature] of the model. - __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
- __pydantic_core_schema__: The core schema of the model.
- __pydantic_custom_init__: Whether the model has a custom
__init__function. - __pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces
Model.__validators__andModel.__root_validators__from Pydantic V1. - __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
- __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
- __pydantic_post_init__: The name of the post-init method for the model, if defined.
- __pydantic_root_model__: Whether the model is a [
RootModel][pydantic.root_model.RootModel]. - __pydantic_serializer__: The
pydantic-coreSchemaSerializerused to dump instances of the model. - __pydantic_validator__: The
pydantic-coreSchemaValidatorused to validate instances of the model. - __pydantic_fields__: A dictionary of field names and their corresponding [
FieldInfo][pydantic.fields.FieldInfo] objects. - __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [
ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects. - __pydantic_extra__: A dictionary containing extra values, if [
extra][pydantic.config.ConfigDict.extra] is set to'allow'. - __pydantic_fields_set__: The names of fields explicitly set during instantiation.
- __pydantic_private__: Values of private attributes set on the model instance.
436 @classmethod 437 def value( 438 cls, v: t.Any, is_metadata: t.Optional[bool] = None, sort_root_dict: bool = False 439 ) -> Executable: 440 payload = _dict_sort(v) if sort_root_dict else repr(v) 441 return Executable( 442 payload=payload, 443 kind=ExecutableKind.VALUE, 444 is_metadata=is_metadata or None, 445 )
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_fields
- model_computed_fields
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- parse_file
- from_orm
- construct
- schema
- schema_json
- validate
- update_forward_refs
448def serialize_env(env: t.Dict[str, t.Any], path: Path) -> t.Dict[str, Executable]: 449 """Serializes a python function into a self contained dictionary. 450 451 Recursively walks a function's globals to store all other references inside of env. 452 453 Args: 454 env: Dictionary to store the env. 455 path: The root path to seralize. Other modules will not be walked and treated as imports. 456 """ 457 serialized = {} 458 459 for k, (v, is_metadata) in env.items(): 460 # We don't store `False` for `is_metadata` to reduce the pydantic model's payload size 461 is_metadata = is_metadata or None 462 463 if isinstance(v, LITERALS) or v is None: 464 serialized[k] = Executable.value(v, is_metadata=is_metadata) 465 elif inspect.ismodule(v): 466 name = v.__name__ 467 if hasattr(v, "__file__") and _is_relative_to(v.__file__, path): 468 raise SQLMeshError( 469 f"Cannot serialize 'import {name}'. Use 'from {name} import ...' instead." 470 ) 471 postfix = "" if name == k else f" as {k}" 472 serialized[k] = Executable( 473 payload=f"import {name}{postfix}", 474 kind=ExecutableKind.IMPORT, 475 is_metadata=is_metadata, 476 ) 477 elif callable(v): 478 name = v.__name__ 479 name = k if name == "<lambda>" else name 480 481 # getfile raises a `TypeError` for built-in modules, classes, or functions 482 # https://docs.python.org/3/library/inspect.html#inspect.getfile 483 try: 484 file_path = Path(inspect.getfile(v)) 485 relative_obj_file_path = _is_relative_to(file_path, path) 486 487 # A callable can be a "wrapper" that is defined in a third-party library [1], in which case the file 488 # containing its definition won't be relative to the project's path. This can lead to serializing 489 # it as a "relative import", such as `from models.some_python_model import foo`, because the `wraps` 490 # decorator preserves the wrapped function's module [2]. Payloads like this are invalid, as they 491 # can result in `ModuleNotFoundError`s when hydrating python environments, e.g. if a project's files 492 # are not available during a scheduled cadence run. 493 # 494 # [1]: https://github.com/jd/tenacity/blob/0d40e76f7d06d631fb127e1ec58c8bd776e70d49/tenacity/__init__.py#L322-L346 495 # [2]: https://github.com/python/cpython/blob/f502c8f6a6db4be27c97a0e5466383d117859b7f/Lib/functools.py#L33-L57 496 if not relative_obj_file_path and (wrapped := getattr(v, "__wrapped__", None)): 497 v = wrapped 498 file_path = Path(inspect.getfile(wrapped)) 499 relative_obj_file_path = _is_relative_to(file_path, path) 500 except TypeError: 501 file_path = None 502 relative_obj_file_path = False 503 504 if relative_obj_file_path: 505 serialized[k] = Executable( 506 name=name, 507 payload=normalize_source(v), 508 kind=ExecutableKind.DEFINITION, 509 # Do `as_posix` to serialize windows path back to POSIX 510 path=t.cast(Path, file_path).relative_to(path.absolute()).as_posix(), 511 alias=k if name != k else None, 512 is_metadata=is_metadata, 513 ) 514 else: 515 serialized[k] = Executable( 516 payload=f"from {v.__module__} import {name}", 517 kind=ExecutableKind.IMPORT, 518 is_metadata=is_metadata, 519 ) 520 else: 521 raise SQLMeshError( 522 f"Object '{v}' cannot be serialized. If it's defined in a library, import the corresponding " 523 "module and reference the object using its fully-qualified name. For example, the datetime " 524 "module's 'UTC' object should be accessed as 'datetime.UTC'." 525 ) 526 527 return serialized
Serializes a python function into a self contained dictionary.
Recursively walks a function's globals to store all other references inside of env.
Arguments:
- env: Dictionary to store the env.
- path: The root path to seralize. Other modules will not be walked and treated as imports.
530def prepare_env( 531 python_env: t.Dict[str, Executable], 532 env: t.Optional[t.Dict[str, t.Any]] = None, 533) -> t.Dict[str, t.Any]: 534 """Prepare a python env by hydrating and executing functions. 535 536 The Python ENV is stored in a json serializable format. 537 Functions and imports are stored as a special data class. 538 539 Args: 540 python_env: The dictionary containing the serialized python environment. 541 env: The dictionary to execute code in. 542 543 Returns: 544 The prepared environment with hydrated functions. 545 """ 546 env = {} if env is None else env 547 548 for name, executable in sorted( 549 python_env.items(), key=lambda item: 0 if item[1].is_import else 1 550 ): 551 if executable.is_value: 552 env[name] = eval(executable.payload) 553 else: 554 exec(executable.payload, env) 555 if executable.alias and executable.name: 556 env[executable.alias] = env[executable.name] 557 558 return env
Prepare a python env by hydrating and executing functions.
The Python ENV is stored in a json serializable format. Functions and imports are stored as a special data class.
Arguments:
- python_env: The dictionary containing the serialized python environment.
- env: The dictionary to execute code in.
Returns:
The prepared environment with hydrated functions.
561def format_evaluated_code_exception( 562 exception: Exception, 563 python_env: t.Dict[str, Executable], 564) -> str: 565 """Formats exceptions that occur from evaled code. 566 567 Stack traces generated by evaled code lose code context and are difficult to debug. 568 This intercepts the default stack trace and tries to make it debuggable. 569 570 Args: 571 exception: The exception to print the stack trace for. 572 python_env: The environment containing stringified python code. 573 """ 574 tb: t.List[str] = [] 575 indent = "" 576 577 skip_patterns = re.compile( 578 r"Traceback \(most recent call last\):|" 579 r'File ".*?core/model/definition\.py|' 580 r'File ".*?core/snapshot/definition\.py|' 581 r'File ".*?core/macros\.py|' 582 r'File ".*?inspect\.py' 583 ) 584 585 for error_line in format_exception(exception): 586 if skip_patterns.search(error_line): 587 continue 588 589 error_match = re.search("^.*?Error: ", error_line) 590 if error_match: 591 tb.append(f"{indent * 2} {error_line}") 592 continue 593 594 eval_code_match = re.search('File "<string>", line (.*), in (.*)', error_line) 595 if not eval_code_match: 596 tb.append(f"{indent}{error_line}") 597 continue 598 599 line_num = int(eval_code_match.group(1)) 600 func = eval_code_match.group(2) 601 602 if func not in python_env: 603 tb.append(error_line) 604 continue 605 606 executable = python_env[func] 607 indent = error_line[: eval_code_match.start()] 608 609 error_line = ( 610 f"{indent}File '{executable.path}' (or imported file), line {line_num}, in {func}" 611 ) 612 613 code = executable.payload 614 formatted = [] 615 616 for i, code_line in enumerate(code.splitlines()): 617 if i < line_num: 618 pad = len(code_line) - len(code_line.lstrip()) 619 if i + 1 == line_num: 620 formatted.append(f"{code_line[:pad]}{code_line[pad:]}") 621 else: 622 formatted.append(code_line) 623 624 tb.extend( 625 ( 626 error_line, 627 textwrap.indent( 628 os.linesep.join(formatted), 629 indent + " ", 630 ), 631 ) 632 ) 633 634 return os.linesep.join(tb)
Formats exceptions that occur from evaled code.
Stack traces generated by evaled code lose code context and are difficult to debug. This intercepts the default stack trace and tries to make it debuggable.
Arguments:
- exception: The exception to print the stack trace for.
- python_env: The environment containing stringified python code.
637def print_exception( 638 exception: Exception, 639 python_env: t.Dict[str, Executable], 640 out: t.TextIO = sys.stderr, 641) -> None: 642 """Prints exceptions that occur from evaled code. 643 644 Stack traces generated by evaled code lose code context and are difficult to debug. 645 This intercepts the default stack trace and tries to make it debuggable. 646 647 Args: 648 exception: The exception to print the stack trace for. 649 python_env: The environment containing stringified python code. 650 out: The output stream to write to. 651 """ 652 tb = format_evaluated_code_exception(exception, python_env) 653 out.write(tb)
Prints exceptions that occur from evaled code.
Stack traces generated by evaled code lose code context and are difficult to debug. This intercepts the default stack trace and tries to make it debuggable.
Arguments:
- exception: The exception to print the stack trace for.
- python_env: The environment containing stringified python code.
- out: The output stream to write to.
665def import_python_file(path: Path, relative_base: Path = Path()) -> types.ModuleType: 666 relative_path = path.absolute().relative_to(relative_base.absolute()) 667 module_name = str(relative_path.with_suffix("")).replace(os.path.sep, ".") 668 669 # remove the entire module hierarchy in case they were already loaded 670 parts = module_name.split(".") 671 for i in range(len(parts)): 672 sys.modules.pop(".".join(parts[0 : i + 1]), None) 673 674 return importlib.import_module(module_name)