Edit on GitHub

sqlmesh.utils.metaprogramming

  1from __future__ import annotations
  2
  3import ast
  4import dis
  5import importlib
  6import inspect
  7import linecache
  8import logging
  9import os
 10import re
 11import sys
 12import textwrap
 13import types
 14import typing as t
 15from dataclasses import dataclass
 16from enum import Enum
 17from numbers import Number
 18from pathlib import Path
 19
 20from astor import to_source
 21
 22from sqlmesh.core import constants as c
 23from sqlmesh.utils import format_exception, unique
 24from sqlmesh.utils.errors import SQLMeshError
 25from sqlmesh.utils.pydantic import PydanticModel
 26
 27logger = logging.getLogger(__name__)
 28
 29
 30IGNORE_DECORATORS = {"macro", "model", "signal"}
 31SERIALIZABLE_CALLABLES = (type, types.FunctionType)
 32LITERALS = (Number, str, bytes, tuple, list, dict, set, bool)
 33
 34
 35def _is_relative_to(path: t.Optional[Path | str], other: t.Optional[Path | str]) -> bool:
 36    if path is None or other is None:
 37        return False
 38
 39    if isinstance(path, str):
 40        path = Path(path)
 41    if isinstance(other, str):
 42        other = Path(other)
 43
 44    if "site-packages" in str(path) or not path.exists() or not other.exists():
 45        return False
 46
 47    try:
 48        path.absolute().relative_to(other.absolute())
 49        return True
 50    except ValueError:
 51        return False
 52
 53
 54def _code_globals(code: types.CodeType) -> t.Dict[str, None]:
 55    variables = {
 56        instruction.argval: None
 57        for instruction in dis.get_instructions(code)
 58        if instruction.opname == "LOAD_GLOBAL"
 59    }
 60
 61    for const in code.co_consts:
 62        if isinstance(const, types.CodeType):
 63            variables.update(_code_globals(const))
 64
 65    return variables
 66
 67
 68def _globals_match(obj1: t.Any, obj2: t.Any) -> bool:
 69    return type(obj1) == type(obj2) and (
 70        obj1 == obj2
 71        or (
 72            getattr(obj1, "__module__", None) == getattr(obj2, "__module__", None)
 73            and getattr(obj1, "__name__", None) == getattr(obj2, "__name__", None)
 74        )
 75    )
 76
 77
 78def func_globals(func: t.Callable) -> t.Dict[str, t.Any]:
 79    """Finds all global references and closures in a function and nested functions.
 80
 81    This function treats closures as global variables, which could cause problems in the future.
 82
 83    Args:
 84        func: The function to introspect
 85
 86    Returns:
 87        A dictionary of all global references.
 88    """
 89    variables = {}
 90
 91    if hasattr(func, "__code__"):
 92        root_node = parse_source(func)
 93
 94        func_args = next(node for node in ast.walk(root_node) if isinstance(node, ast.arguments))
 95        arg_defaults = (d for d in func_args.defaults + func_args.kw_defaults if d is not None)
 96
 97        # ast.Name corresponds to variable references, such as foo or x.foo. The former is
 98        # represented as Name(id=foo), and the latter as Attribute(value=Name(id=x) attr=foo)
 99        arg_globals = [
100            n.id for default in arg_defaults for n in ast.walk(default) if isinstance(n, ast.Name)
101        ]
102
103        code = func.__code__
104        for var in (
105            arg_globals + list(_code_globals(code)) + decorator_vars(func, root_node=root_node)
106        ):
107            if var in func.__globals__:
108                variables[var] = func.__globals__[var]
109
110        if func.__closure__:
111            for var, value in zip(code.co_freevars, func.__closure__):
112                variables[var] = value.cell_contents
113
114    return variables
115
116
117class ClassFoundException(Exception):
118    pass
119
120
121class _ClassFinder(ast.NodeVisitor):
122    def __init__(self, qualname: str) -> None:
123        self.stack: t.List[str] = []
124        self.qualname = qualname
125
126    def visit_FunctionDef(self, node: ast.FunctionDef) -> None:
127        self.stack.append(node.name)
128        self.stack.append("<locals>")
129        self.generic_visit(node)
130        self.stack.pop()
131        self.stack.pop()
132
133    visit_AsyncFunctionDef = visit_FunctionDef  # type: ignore
134
135    def visit_ClassDef(self, node: ast.ClassDef) -> None:
136        self.stack.append(node.name)
137        if self.qualname == ".".join(self.stack):
138            # Return the decorator for the class if present
139            if node.decorator_list:
140                line_number = node.decorator_list[0].lineno
141            else:
142                line_number = node.lineno
143
144            # decrement by one since lines starts with indexing by zero
145            line_number -= 1
146            raise ClassFoundException(line_number)
147        self.generic_visit(node)
148        self.stack.pop()
149
150
151class _DecoratorDependencyFinder(ast.NodeVisitor):
152    def __init__(self) -> None:
153        self.dependencies: t.List[str] = []
154
155    def _extract_dependencies(self, node: ast.ClassDef | ast.FunctionDef) -> None:
156        for decorator in node.decorator_list:
157            dependencies: t.List[str] = []
158            for n in ast.walk(decorator):
159                if isinstance(n, ast.Attribute):
160                    dep = n.attr
161                elif isinstance(n, ast.Name):
162                    dep = n.id
163                else:
164                    continue
165
166                if dep in IGNORE_DECORATORS:
167                    dependencies = []
168                    break
169
170                dependencies.append(dep)
171
172            self.dependencies.extend(dependencies)
173
174    def visit_FunctionDef(self, node: ast.FunctionDef) -> None:
175        self._extract_dependencies(node)
176
177    def visit_ClassDef(self, node: ast.ClassDef) -> None:
178        self._extract_dependencies(node)
179
180    visit_AsyncFunctionDef = visit_FunctionDef  # type: ignore
181
182
183def getsource(obj: t.Any) -> str:
184    """Get the source of a function or class.
185
186    inspect.getsource doesn't find decorators in python < 3.9
187    https://github.com/python/cpython/commit/696136b993e11b37c4f34d729a0375e5ad544ade
188    """
189    path = inspect.getsourcefile(obj)
190    if path:
191        module = inspect.getmodule(obj, path)
192
193        if module:
194            lines = linecache.getlines(path, module.__dict__)
195        else:
196            lines = linecache.getlines(path)
197
198        def join_source(lnum: int) -> str:
199            return "".join(inspect.getblock(lines[lnum:]))
200
201        if inspect.isclass(obj):
202            qualname = obj.__qualname__
203            source = "".join(lines)
204            tree = ast.parse(source)
205            class_finder = _ClassFinder(qualname)
206            try:
207                class_finder.visit(tree)
208            except ClassFoundException as e:
209                return join_source(e.args[0])
210        elif inspect.isfunction(obj):
211            obj = obj.__code__
212            if hasattr(obj, "co_firstlineno"):
213                lnum = obj.co_firstlineno - 1
214                pat = re.compile(r"^(\s*def\s)|(\s*async\s+def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)")
215                while lnum > 0:
216                    try:
217                        line = lines[lnum]
218                    except IndexError:
219                        raise OSError("lineno is out of bounds")
220                    if pat.match(line):
221                        break
222                    lnum = lnum - 1
223                return join_source(lnum)
224    raise SQLMeshError(f"Cannot find source for {obj}")
225
226
227def parse_source(func: t.Callable) -> ast.Module:
228    """Parse a function and returns an ast node."""
229    return ast.parse(textwrap.dedent(getsource(func)))
230
231
232def _decorator_name(decorator: ast.expr) -> str:
233    node = decorator
234    if isinstance(decorator, ast.Call):
235        node = decorator.func
236    return node.id if isinstance(node, ast.Name) else ""
237
238
239def decorator_vars(func: t.Callable, root_node: t.Optional[ast.Module] = None) -> t.List[str]:
240    """
241    Returns a list of all the decorators of a callable, as well as names of objects that
242    are referenced in their argument list. These objects may be transitive dependencies
243    that we need to include in the serialized python environments.
244    """
245    root_node = root_node or parse_source(func)
246    finder = _DecoratorDependencyFinder()
247    finder.visit(root_node)
248    return unique(finder.dependencies)
249
250
251def normalize_source(obj: t.Any) -> str:
252    """Rewrites an object's source with formatting and doc strings removed by using Python ast.
253
254    Args:
255        obj: The object to fetch source from and convert to a string.
256
257    Returns:
258        A string representation of the normalized function.
259    """
260    root_node = parse_source(obj)
261
262    for node in ast.walk(root_node):
263        if isinstance(node, (ast.FunctionDef, ast.ClassDef)):
264            for decorator in node.decorator_list:
265                if _decorator_name(decorator) in IGNORE_DECORATORS:
266                    node.decorator_list.remove(decorator)
267
268            # remove docstrings
269            body = node.body
270            if body and isinstance(body[0], ast.Expr) and isinstance(body[0].value, ast.Str):
271                node.body = body[1:]
272
273            # remove function return type annotation
274            if isinstance(node, ast.FunctionDef):
275                node.returns = None
276
277    return to_source(root_node).strip()
278
279
280def build_env(
281    obj: t.Any,
282    *,
283    env: t.Dict[str, t.Tuple[t.Any, t.Optional[bool]]],
284    name: str,
285    path: Path,
286    is_metadata_obj: bool = False,
287) -> None:
288    """Fills in env dictionary with all globals needed to execute the object.
289
290    Recursively traverse classes and functions.
291
292    Args:
293        obj: Any python object.
294        env: Dictionary to store the env.
295        name: Name of the object in the env.
296        path: The module path to serialize. Other modules will not be walked and treated as imports.
297        is_metadata_obj: An optional flag that determines whether the input object is metadata-only.
298    """
299    # We don't rely on `env` to keep track of visited objects, because it's populated in post-order
300    visited: t.Set[str] = set()
301
302    def walk(obj: t.Any, name: str, is_metadata: bool = False) -> None:
303        obj_module = inspect.getmodule(obj)
304        if obj_module and obj_module.__name__ == "builtins":
305            return
306
307        if name in visited:
308            if name not in env or _globals_match(env[name][0], obj):
309                return
310
311            raise SQLMeshError(
312                f"Cannot store {obj} in environment, duplicate definitions found for '{name}'"
313            )
314
315        visited.add(name)
316        name_missing_from_env = name not in env
317
318        if name_missing_from_env or (not is_metadata and env[name] == (obj, True)):
319            if not name_missing_from_env:
320                # The existing object in the env is "metadata only" but we're walking it again as a
321                # non-"metadata only" dependency, so we update this flag to ensure all transitive
322                # dependencies are also not marked as "metadata only"
323                is_metadata = False
324
325            if hasattr(obj, c.SQLMESH_MACRO):
326                # We only need to add the undecorated code of @macro() functions in env, which
327                # is accessible through the `__wrapped__` attribute added by functools.wraps
328                obj = obj.__wrapped__
329            elif callable(obj) and not isinstance(obj, SERIALIZABLE_CALLABLES):
330                obj = getattr(obj, "__wrapped__", None)
331                name = getattr(obj, "__name__", "")
332
333                # Callable class instances shouldn't be serialized (e.g. tenacity.Retrying).
334                # We still want to walk the callables they decorate, though
335                if not isinstance(obj, SERIALIZABLE_CALLABLES) or name in env:
336                    return
337
338            if (
339                not obj_module
340                or not hasattr(obj_module, "__file__")
341                or not _is_relative_to(obj_module.__file__, path)
342            ):
343                env[name] = (obj, is_metadata)
344                return
345
346            if inspect.isclass(obj):
347                for var in decorator_vars(obj):
348                    if obj_module and var in obj_module.__dict__:
349                        walk(obj_module.__dict__[var], var, is_metadata)
350
351                for base in obj.__bases__:
352                    walk(base, base.__qualname__, is_metadata)
353
354                for k, v in obj.__dict__.items():
355                    # skip dunder methods bar __init__ as it might contain user defined logic with cross class references
356                    if k.startswith("__") and k != "__init__":
357                        continue
358
359                    # Traverse methods in a class to find global references
360                    if isinstance(v, (classmethod, staticmethod)):
361                        v = v.__func__
362
363                    if callable(v):
364                        # Walk the method if it's part of the object, else it's a global function and we just store it
365                        if v.__qualname__.startswith(obj.__qualname__):
366                            try:
367                                for k, v in func_globals(v).items():
368                                    walk(v, k, is_metadata)
369                            except (OSError, TypeError):
370                                # __init__ may come from built-ins or wrapped callables
371                                pass
372                    else:
373                        walk(v, k, is_metadata)
374            elif callable(obj):
375                for k, v in func_globals(obj).items():
376                    walk(v, k, is_metadata)
377
378            # We store the object in the environment after its dependencies, because otherwise we
379            # could crash at environment hydration time, since dicts are ordered and the top-level
380            # objects would be loaded before their dependencies.
381            env[name] = (obj, is_metadata)
382        elif not _globals_match(env[name][0], obj):
383            raise SQLMeshError(
384                f"Cannot store {obj} in environment, duplicate definitions found for '{name}'"
385            )
386
387    # The "metadata only" annotation of the object is transitive
388    walk(obj, name, is_metadata_obj or getattr(obj, c.SQLMESH_METADATA, False))
389
390
391@dataclass
392class SqlValue:
393    """A SQL string representing a generated SQLGlot AST."""
394
395    sql: str
396
397
398class ExecutableKind(str, Enum):
399    """The kind of of executable. The order of the members is used when serializing the python model to text."""
400
401    IMPORT = "import"
402    VALUE = "value"
403    DEFINITION = "definition"
404
405    def __lt__(self, other: t.Any) -> bool:
406        if not isinstance(other, ExecutableKind):
407            return NotImplemented
408        values = list(ExecutableKind.__dict__.values())
409        return values.index(self) < values.index(other)
410
411    def __str__(self) -> str:
412        return self.value
413
414
415class Executable(PydanticModel):
416    payload: str
417    kind: ExecutableKind = ExecutableKind.DEFINITION
418    name: t.Optional[str] = None
419    path: t.Optional[str] = None
420    alias: t.Optional[str] = None
421    is_metadata: t.Optional[bool] = None
422
423    @property
424    def is_definition(self) -> bool:
425        return self.kind == ExecutableKind.DEFINITION
426
427    @property
428    def is_import(self) -> bool:
429        return self.kind == ExecutableKind.IMPORT
430
431    @property
432    def is_value(self) -> bool:
433        return self.kind == ExecutableKind.VALUE
434
435    @classmethod
436    def value(
437        cls, v: t.Any, is_metadata: t.Optional[bool] = None, sort_root_dict: bool = False
438    ) -> Executable:
439        payload = _dict_sort(v) if sort_root_dict else repr(v)
440        return Executable(
441            payload=payload,
442            kind=ExecutableKind.VALUE,
443            is_metadata=is_metadata or None,
444        )
445
446
447def serialize_env(env: t.Dict[str, t.Any], path: Path) -> t.Dict[str, Executable]:
448    """Serializes a python function into a self contained dictionary.
449
450    Recursively walks a function's globals to store all other references inside of env.
451
452    Args:
453        env: Dictionary to store the env.
454        path: The root path to seralize. Other modules will not be walked and treated as imports.
455    """
456    serialized = {}
457
458    for k, (v, is_metadata) in env.items():
459        # We don't store `False` for `is_metadata` to reduce the pydantic model's payload size
460        is_metadata = is_metadata or None
461
462        if isinstance(v, LITERALS) or v is None:
463            serialized[k] = Executable.value(v, is_metadata=is_metadata)
464        elif inspect.ismodule(v):
465            name = v.__name__
466            if hasattr(v, "__file__") and _is_relative_to(v.__file__, path):
467                raise SQLMeshError(
468                    f"Cannot serialize 'import {name}'. Use 'from {name} import ...' instead."
469                )
470            postfix = "" if name == k else f" as {k}"
471            serialized[k] = Executable(
472                payload=f"import {name}{postfix}",
473                kind=ExecutableKind.IMPORT,
474                is_metadata=is_metadata,
475            )
476        elif callable(v):
477            name = v.__name__
478            name = k if name == "<lambda>" else name
479
480            # getfile raises a `TypeError` for built-in modules, classes, or functions
481            # https://docs.python.org/3/library/inspect.html#inspect.getfile
482            try:
483                file_path = Path(inspect.getfile(v))
484                relative_obj_file_path = _is_relative_to(file_path, path)
485
486                # A callable can be a "wrapper" that is defined in a third-party library [1], in which case the file
487                # containing its definition won't be relative to the project's path. This can lead to serializing
488                # it as a "relative import", such as `from models.some_python_model import foo`, because the `wraps`
489                # decorator preserves the wrapped function's module [2]. Payloads like this are invalid, as they
490                # can result in `ModuleNotFoundError`s when hydrating python environments, e.g. if a project's files
491                # are not available during a scheduled cadence run.
492                #
493                # [1]: https://github.com/jd/tenacity/blob/0d40e76f7d06d631fb127e1ec58c8bd776e70d49/tenacity/__init__.py#L322-L346
494                # [2]: https://github.com/python/cpython/blob/f502c8f6a6db4be27c97a0e5466383d117859b7f/Lib/functools.py#L33-L57
495                if not relative_obj_file_path and (wrapped := getattr(v, "__wrapped__", None)):
496                    v = wrapped
497                    file_path = Path(inspect.getfile(wrapped))
498                    relative_obj_file_path = _is_relative_to(file_path, path)
499            except TypeError:
500                file_path = None
501                relative_obj_file_path = False
502
503            if relative_obj_file_path:
504                serialized[k] = Executable(
505                    name=name,
506                    payload=normalize_source(v),
507                    kind=ExecutableKind.DEFINITION,
508                    # Do `as_posix` to serialize windows path back to POSIX
509                    path=t.cast(Path, file_path).relative_to(path.absolute()).as_posix(),
510                    alias=k if name != k else None,
511                    is_metadata=is_metadata,
512                )
513            else:
514                serialized[k] = Executable(
515                    payload=f"from {v.__module__} import {name}",
516                    kind=ExecutableKind.IMPORT,
517                    is_metadata=is_metadata,
518                )
519        else:
520            raise SQLMeshError(
521                f"Object '{v}' cannot be serialized. If it's defined in a library, import the corresponding "
522                "module and reference the object using its fully-qualified name. For example, the datetime "
523                "module's 'UTC' object should be accessed as 'datetime.UTC'."
524            )
525
526    return serialized
527
528
529def prepare_env(
530    python_env: t.Dict[str, Executable],
531    env: t.Optional[t.Dict[str, t.Any]] = None,
532) -> t.Dict[str, t.Any]:
533    """Prepare a python env by hydrating and executing functions.
534
535    The Python ENV is stored in a json serializable format.
536    Functions and imports are stored as a special data class.
537
538    Args:
539        python_env: The dictionary containing the serialized python environment.
540        env: The dictionary to execute code in.
541
542    Returns:
543        The prepared environment with hydrated functions.
544    """
545    env = {} if env is None else env
546
547    for name, executable in sorted(
548        python_env.items(), key=lambda item: 0 if item[1].is_import else 1
549    ):
550        if executable.is_value:
551            env[name] = eval(executable.payload)
552        else:
553            exec(executable.payload, env)
554            if executable.alias and executable.name:
555                env[executable.alias] = env[executable.name]
556
557    return env
558
559
560def format_evaluated_code_exception(
561    exception: Exception,
562    python_env: t.Dict[str, Executable],
563) -> str:
564    """Formats exceptions that occur from evaled code.
565
566    Stack traces generated by evaled code lose code context and are difficult to debug.
567    This intercepts the default stack trace and tries to make it debuggable.
568
569    Args:
570        exception: The exception to print the stack trace for.
571        python_env: The environment containing stringified python code.
572    """
573    tb: t.List[str] = []
574    indent = ""
575
576    skip_patterns = re.compile(
577        r"Traceback \(most recent call last\):|"
578        r'File ".*?core/model/definition\.py|'
579        r'File ".*?core/snapshot/definition\.py|'
580        r'File ".*?core/macros\.py|'
581        r'File ".*?inspect\.py'
582    )
583
584    for error_line in format_exception(exception):
585        if skip_patterns.search(error_line):
586            continue
587
588        error_match = re.search("^.*?Error: ", error_line)
589        if error_match:
590            tb.append(f"{indent * 2}  {error_line}")
591            continue
592
593        eval_code_match = re.search('File "<string>", line (.*), in (.*)', error_line)
594        if not eval_code_match:
595            tb.append(f"{indent}{error_line}")
596            continue
597
598        line_num = int(eval_code_match.group(1))
599        func = eval_code_match.group(2)
600
601        if func not in python_env:
602            tb.append(error_line)
603            continue
604
605        executable = python_env[func]
606        indent = error_line[: eval_code_match.start()]
607
608        error_line = (
609            f"{indent}File '{executable.path}' (or imported file), line {line_num}, in {func}"
610        )
611
612        code = executable.payload
613        formatted = []
614
615        for i, code_line in enumerate(code.splitlines()):
616            if i < line_num:
617                pad = len(code_line) - len(code_line.lstrip())
618                if i + 1 == line_num:
619                    formatted.append(f"{code_line[:pad]}{code_line[pad:]}")
620                else:
621                    formatted.append(code_line)
622
623        tb.extend(
624            (
625                error_line,
626                textwrap.indent(
627                    os.linesep.join(formatted),
628                    indent + "  ",
629                ),
630            )
631        )
632
633    return os.linesep.join(tb)
634
635
636def print_exception(
637    exception: Exception,
638    python_env: t.Dict[str, Executable],
639    out: t.TextIO = sys.stderr,
640) -> None:
641    """Prints exceptions that occur from evaled code.
642
643    Stack traces generated by evaled code lose code context and are difficult to debug.
644    This intercepts the default stack trace and tries to make it debuggable.
645
646    Args:
647        exception: The exception to print the stack trace for.
648        python_env: The environment containing stringified python code.
649        out: The output stream to write to.
650    """
651    tb = format_evaluated_code_exception(exception, python_env)
652    out.write(tb)
653
654
655def _dict_sort(obj: t.Any) -> str:
656    try:
657        if isinstance(obj, dict):
658            obj = dict(sorted(obj.items(), key=lambda x: str(x[0])))
659    except Exception:
660        logger.warning("Failed to sort non-recursive dict", exc_info=True)
661    return repr(obj)
662
663
664def import_python_file(path: Path, relative_base: Path = Path()) -> types.ModuleType:
665    relative_path = path.absolute().relative_to(relative_base.absolute())
666    module_name = str(relative_path.with_suffix("")).replace(os.path.sep, ".")
667
668    # remove the entire module hierarchy in case they were already loaded
669    parts = module_name.split(".")
670    for i in range(len(parts)):
671        sys.modules.pop(".".join(parts[0 : i + 1]), None)
672
673    return importlib.import_module(module_name)
logger = <Logger sqlmesh.utils.metaprogramming (WARNING)>
IGNORE_DECORATORS = {'signal', 'model', 'macro'}
SERIALIZABLE_CALLABLES = (<class 'type'>, <class 'function'>)
LITERALS = (<class 'numbers.Number'>, <class 'str'>, <class 'bytes'>, <class 'tuple'>, <class 'list'>, <class 'dict'>, <class 'set'>, <class 'bool'>)
def func_globals(func: Callable) -> Dict[str, Any]:
 79def func_globals(func: t.Callable) -> t.Dict[str, t.Any]:
 80    """Finds all global references and closures in a function and nested functions.
 81
 82    This function treats closures as global variables, which could cause problems in the future.
 83
 84    Args:
 85        func: The function to introspect
 86
 87    Returns:
 88        A dictionary of all global references.
 89    """
 90    variables = {}
 91
 92    if hasattr(func, "__code__"):
 93        root_node = parse_source(func)
 94
 95        func_args = next(node for node in ast.walk(root_node) if isinstance(node, ast.arguments))
 96        arg_defaults = (d for d in func_args.defaults + func_args.kw_defaults if d is not None)
 97
 98        # ast.Name corresponds to variable references, such as foo or x.foo. The former is
 99        # represented as Name(id=foo), and the latter as Attribute(value=Name(id=x) attr=foo)
100        arg_globals = [
101            n.id for default in arg_defaults for n in ast.walk(default) if isinstance(n, ast.Name)
102        ]
103
104        code = func.__code__
105        for var in (
106            arg_globals + list(_code_globals(code)) + decorator_vars(func, root_node=root_node)
107        ):
108            if var in func.__globals__:
109                variables[var] = func.__globals__[var]
110
111        if func.__closure__:
112            for var, value in zip(code.co_freevars, func.__closure__):
113                variables[var] = value.cell_contents
114
115    return variables

Finds all global references and closures in a function and nested functions.

This function treats closures as global variables, which could cause problems in the future.

Arguments:
  • func: The function to introspect
Returns:

A dictionary of all global references.

class ClassFoundException(builtins.Exception):
118class ClassFoundException(Exception):
119    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
def getsource(obj: Any) -> str:
184def getsource(obj: t.Any) -> str:
185    """Get the source of a function or class.
186
187    inspect.getsource doesn't find decorators in python < 3.9
188    https://github.com/python/cpython/commit/696136b993e11b37c4f34d729a0375e5ad544ade
189    """
190    path = inspect.getsourcefile(obj)
191    if path:
192        module = inspect.getmodule(obj, path)
193
194        if module:
195            lines = linecache.getlines(path, module.__dict__)
196        else:
197            lines = linecache.getlines(path)
198
199        def join_source(lnum: int) -> str:
200            return "".join(inspect.getblock(lines[lnum:]))
201
202        if inspect.isclass(obj):
203            qualname = obj.__qualname__
204            source = "".join(lines)
205            tree = ast.parse(source)
206            class_finder = _ClassFinder(qualname)
207            try:
208                class_finder.visit(tree)
209            except ClassFoundException as e:
210                return join_source(e.args[0])
211        elif inspect.isfunction(obj):
212            obj = obj.__code__
213            if hasattr(obj, "co_firstlineno"):
214                lnum = obj.co_firstlineno - 1
215                pat = re.compile(r"^(\s*def\s)|(\s*async\s+def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)")
216                while lnum > 0:
217                    try:
218                        line = lines[lnum]
219                    except IndexError:
220                        raise OSError("lineno is out of bounds")
221                    if pat.match(line):
222                        break
223                    lnum = lnum - 1
224                return join_source(lnum)
225    raise SQLMeshError(f"Cannot find source for {obj}")

Get the source of a function or class.

inspect.getsource doesn't find decorators in python < 3.9 https://github.com/python/cpython/commit/696136b993e11b37c4f34d729a0375e5ad544ade

def parse_source(func: Callable) -> ast.Module:
228def parse_source(func: t.Callable) -> ast.Module:
229    """Parse a function and returns an ast node."""
230    return ast.parse(textwrap.dedent(getsource(func)))

Parse a function and returns an ast node.

def decorator_vars(func: Callable, root_node: Optional[ast.Module] = None) -> List[str]:
240def decorator_vars(func: t.Callable, root_node: t.Optional[ast.Module] = None) -> t.List[str]:
241    """
242    Returns a list of all the decorators of a callable, as well as names of objects that
243    are referenced in their argument list. These objects may be transitive dependencies
244    that we need to include in the serialized python environments.
245    """
246    root_node = root_node or parse_source(func)
247    finder = _DecoratorDependencyFinder()
248    finder.visit(root_node)
249    return unique(finder.dependencies)

Returns a list of all the decorators of a callable, as well as names of objects that are referenced in their argument list. These objects may be transitive dependencies that we need to include in the serialized python environments.

def normalize_source(obj: Any) -> str:
252def normalize_source(obj: t.Any) -> str:
253    """Rewrites an object's source with formatting and doc strings removed by using Python ast.
254
255    Args:
256        obj: The object to fetch source from and convert to a string.
257
258    Returns:
259        A string representation of the normalized function.
260    """
261    root_node = parse_source(obj)
262
263    for node in ast.walk(root_node):
264        if isinstance(node, (ast.FunctionDef, ast.ClassDef)):
265            for decorator in node.decorator_list:
266                if _decorator_name(decorator) in IGNORE_DECORATORS:
267                    node.decorator_list.remove(decorator)
268
269            # remove docstrings
270            body = node.body
271            if body and isinstance(body[0], ast.Expr) and isinstance(body[0].value, ast.Str):
272                node.body = body[1:]
273
274            # remove function return type annotation
275            if isinstance(node, ast.FunctionDef):
276                node.returns = None
277
278    return to_source(root_node).strip()

Rewrites an object's source with formatting and doc strings removed by using Python ast.

Arguments:
  • obj: The object to fetch source from and convert to a string.
Returns:

A string representation of the normalized function.

def build_env( obj: Any, *, env: Dict[str, Tuple[Any, Optional[bool]]], name: str, path: pathlib.Path, is_metadata_obj: bool = False) -> None:
281def build_env(
282    obj: t.Any,
283    *,
284    env: t.Dict[str, t.Tuple[t.Any, t.Optional[bool]]],
285    name: str,
286    path: Path,
287    is_metadata_obj: bool = False,
288) -> None:
289    """Fills in env dictionary with all globals needed to execute the object.
290
291    Recursively traverse classes and functions.
292
293    Args:
294        obj: Any python object.
295        env: Dictionary to store the env.
296        name: Name of the object in the env.
297        path: The module path to serialize. Other modules will not be walked and treated as imports.
298        is_metadata_obj: An optional flag that determines whether the input object is metadata-only.
299    """
300    # We don't rely on `env` to keep track of visited objects, because it's populated in post-order
301    visited: t.Set[str] = set()
302
303    def walk(obj: t.Any, name: str, is_metadata: bool = False) -> None:
304        obj_module = inspect.getmodule(obj)
305        if obj_module and obj_module.__name__ == "builtins":
306            return
307
308        if name in visited:
309            if name not in env or _globals_match(env[name][0], obj):
310                return
311
312            raise SQLMeshError(
313                f"Cannot store {obj} in environment, duplicate definitions found for '{name}'"
314            )
315
316        visited.add(name)
317        name_missing_from_env = name not in env
318
319        if name_missing_from_env or (not is_metadata and env[name] == (obj, True)):
320            if not name_missing_from_env:
321                # The existing object in the env is "metadata only" but we're walking it again as a
322                # non-"metadata only" dependency, so we update this flag to ensure all transitive
323                # dependencies are also not marked as "metadata only"
324                is_metadata = False
325
326            if hasattr(obj, c.SQLMESH_MACRO):
327                # We only need to add the undecorated code of @macro() functions in env, which
328                # is accessible through the `__wrapped__` attribute added by functools.wraps
329                obj = obj.__wrapped__
330            elif callable(obj) and not isinstance(obj, SERIALIZABLE_CALLABLES):
331                obj = getattr(obj, "__wrapped__", None)
332                name = getattr(obj, "__name__", "")
333
334                # Callable class instances shouldn't be serialized (e.g. tenacity.Retrying).
335                # We still want to walk the callables they decorate, though
336                if not isinstance(obj, SERIALIZABLE_CALLABLES) or name in env:
337                    return
338
339            if (
340                not obj_module
341                or not hasattr(obj_module, "__file__")
342                or not _is_relative_to(obj_module.__file__, path)
343            ):
344                env[name] = (obj, is_metadata)
345                return
346
347            if inspect.isclass(obj):
348                for var in decorator_vars(obj):
349                    if obj_module and var in obj_module.__dict__:
350                        walk(obj_module.__dict__[var], var, is_metadata)
351
352                for base in obj.__bases__:
353                    walk(base, base.__qualname__, is_metadata)
354
355                for k, v in obj.__dict__.items():
356                    # skip dunder methods bar __init__ as it might contain user defined logic with cross class references
357                    if k.startswith("__") and k != "__init__":
358                        continue
359
360                    # Traverse methods in a class to find global references
361                    if isinstance(v, (classmethod, staticmethod)):
362                        v = v.__func__
363
364                    if callable(v):
365                        # Walk the method if it's part of the object, else it's a global function and we just store it
366                        if v.__qualname__.startswith(obj.__qualname__):
367                            try:
368                                for k, v in func_globals(v).items():
369                                    walk(v, k, is_metadata)
370                            except (OSError, TypeError):
371                                # __init__ may come from built-ins or wrapped callables
372                                pass
373                    else:
374                        walk(v, k, is_metadata)
375            elif callable(obj):
376                for k, v in func_globals(obj).items():
377                    walk(v, k, is_metadata)
378
379            # We store the object in the environment after its dependencies, because otherwise we
380            # could crash at environment hydration time, since dicts are ordered and the top-level
381            # objects would be loaded before their dependencies.
382            env[name] = (obj, is_metadata)
383        elif not _globals_match(env[name][0], obj):
384            raise SQLMeshError(
385                f"Cannot store {obj} in environment, duplicate definitions found for '{name}'"
386            )
387
388    # The "metadata only" annotation of the object is transitive
389    walk(obj, name, is_metadata_obj or getattr(obj, c.SQLMESH_METADATA, False))

Fills in env dictionary with all globals needed to execute the object.

Recursively traverse classes and functions.

Arguments:
  • obj: Any python object.
  • env: Dictionary to store the env.
  • name: Name of the object in the env.
  • path: The module path to serialize. Other modules will not be walked and treated as imports.
  • is_metadata_obj: An optional flag that determines whether the input object is metadata-only.
@dataclass
class SqlValue:
392@dataclass
393class SqlValue:
394    """A SQL string representing a generated SQLGlot AST."""
395
396    sql: str

A SQL string representing a generated SQLGlot AST.

SqlValue(sql: str)
sql: str
class ExecutableKind(builtins.str, enum.Enum):
399class ExecutableKind(str, Enum):
400    """The kind of of executable. The order of the members is used when serializing the python model to text."""
401
402    IMPORT = "import"
403    VALUE = "value"
404    DEFINITION = "definition"
405
406    def __lt__(self, other: t.Any) -> bool:
407        if not isinstance(other, ExecutableKind):
408            return NotImplemented
409        values = list(ExecutableKind.__dict__.values())
410        return values.index(self) < values.index(other)
411
412    def __str__(self) -> str:
413        return self.value

The kind of of executable. The order of the members is used when serializing the python model to text.

IMPORT = <ExecutableKind.IMPORT: 'import'>
VALUE = <ExecutableKind.VALUE: 'value'>
DEFINITION = <ExecutableKind.DEFINITION: 'definition'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class Executable(sqlmesh.utils.pydantic.PydanticModel):
416class Executable(PydanticModel):
417    payload: str
418    kind: ExecutableKind = ExecutableKind.DEFINITION
419    name: t.Optional[str] = None
420    path: t.Optional[str] = None
421    alias: t.Optional[str] = None
422    is_metadata: t.Optional[bool] = None
423
424    @property
425    def is_definition(self) -> bool:
426        return self.kind == ExecutableKind.DEFINITION
427
428    @property
429    def is_import(self) -> bool:
430        return self.kind == ExecutableKind.IMPORT
431
432    @property
433    def is_value(self) -> bool:
434        return self.kind == ExecutableKind.VALUE
435
436    @classmethod
437    def value(
438        cls, v: t.Any, is_metadata: t.Optional[bool] = None, sort_root_dict: bool = False
439    ) -> Executable:
440        payload = _dict_sort(v) if sort_root_dict else repr(v)
441        return Executable(
442            payload=payload,
443            kind=ExecutableKind.VALUE,
444            is_metadata=is_metadata or None,
445        )

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of the class variables defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The core schema of the model.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a [RootModel][pydantic.root_model.RootModel].
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_fields__: A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
  • __pydantic_computed_fields__: A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
  • __pydantic_extra__: A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
  • __pydantic_fields_set__: The names of fields explicitly set during instantiation.
  • __pydantic_private__: Values of private attributes set on the model instance.
payload: str
name: Optional[str]
path: Optional[str]
alias: Optional[str]
is_metadata: Optional[bool]
is_definition: bool
424    @property
425    def is_definition(self) -> bool:
426        return self.kind == ExecutableKind.DEFINITION
is_import: bool
428    @property
429    def is_import(self) -> bool:
430        return self.kind == ExecutableKind.IMPORT
is_value: bool
432    @property
433    def is_value(self) -> bool:
434        return self.kind == ExecutableKind.VALUE
@classmethod
def value( cls, v: Any, is_metadata: Optional[bool] = None, sort_root_dict: bool = False) -> Executable:
436    @classmethod
437    def value(
438        cls, v: t.Any, is_metadata: t.Optional[bool] = None, sort_root_dict: bool = False
439    ) -> Executable:
440        payload = _dict_sort(v) if sort_root_dict else repr(v)
441        return Executable(
442            payload=payload,
443            kind=ExecutableKind.VALUE,
444            is_metadata=is_metadata or None,
445        )
model_config = {'json_encoders': {<class 'sqlglot.expressions.Expression'>: <function _expression_encoder>, <class 'sqlglot.expressions.DataType'>: <function _expression_encoder>, <class 'sqlglot.expressions.Tuple'>: <function _expression_encoder>, typing.Union[sqlglot.expressions.Query, sqlmesh.core.dialect.JinjaQuery]: <function _expression_encoder>, typing.Union[sqlglot.expressions.Query, sqlmesh.core.dialect.JinjaQuery, sqlmesh.core.dialect.MacroFunc]: <function _expression_encoder>, <class 'datetime.tzinfo'>: <function PydanticModel.<lambda>>}, 'arbitrary_types_allowed': True, 'extra': 'forbid', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
pydantic.main.BaseModel
BaseModel
model_fields
model_computed_fields
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
fields_set
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
def serialize_env( env: Dict[str, Any], path: pathlib.Path) -> Dict[str, Executable]:
448def serialize_env(env: t.Dict[str, t.Any], path: Path) -> t.Dict[str, Executable]:
449    """Serializes a python function into a self contained dictionary.
450
451    Recursively walks a function's globals to store all other references inside of env.
452
453    Args:
454        env: Dictionary to store the env.
455        path: The root path to seralize. Other modules will not be walked and treated as imports.
456    """
457    serialized = {}
458
459    for k, (v, is_metadata) in env.items():
460        # We don't store `False` for `is_metadata` to reduce the pydantic model's payload size
461        is_metadata = is_metadata or None
462
463        if isinstance(v, LITERALS) or v is None:
464            serialized[k] = Executable.value(v, is_metadata=is_metadata)
465        elif inspect.ismodule(v):
466            name = v.__name__
467            if hasattr(v, "__file__") and _is_relative_to(v.__file__, path):
468                raise SQLMeshError(
469                    f"Cannot serialize 'import {name}'. Use 'from {name} import ...' instead."
470                )
471            postfix = "" if name == k else f" as {k}"
472            serialized[k] = Executable(
473                payload=f"import {name}{postfix}",
474                kind=ExecutableKind.IMPORT,
475                is_metadata=is_metadata,
476            )
477        elif callable(v):
478            name = v.__name__
479            name = k if name == "<lambda>" else name
480
481            # getfile raises a `TypeError` for built-in modules, classes, or functions
482            # https://docs.python.org/3/library/inspect.html#inspect.getfile
483            try:
484                file_path = Path(inspect.getfile(v))
485                relative_obj_file_path = _is_relative_to(file_path, path)
486
487                # A callable can be a "wrapper" that is defined in a third-party library [1], in which case the file
488                # containing its definition won't be relative to the project's path. This can lead to serializing
489                # it as a "relative import", such as `from models.some_python_model import foo`, because the `wraps`
490                # decorator preserves the wrapped function's module [2]. Payloads like this are invalid, as they
491                # can result in `ModuleNotFoundError`s when hydrating python environments, e.g. if a project's files
492                # are not available during a scheduled cadence run.
493                #
494                # [1]: https://github.com/jd/tenacity/blob/0d40e76f7d06d631fb127e1ec58c8bd776e70d49/tenacity/__init__.py#L322-L346
495                # [2]: https://github.com/python/cpython/blob/f502c8f6a6db4be27c97a0e5466383d117859b7f/Lib/functools.py#L33-L57
496                if not relative_obj_file_path and (wrapped := getattr(v, "__wrapped__", None)):
497                    v = wrapped
498                    file_path = Path(inspect.getfile(wrapped))
499                    relative_obj_file_path = _is_relative_to(file_path, path)
500            except TypeError:
501                file_path = None
502                relative_obj_file_path = False
503
504            if relative_obj_file_path:
505                serialized[k] = Executable(
506                    name=name,
507                    payload=normalize_source(v),
508                    kind=ExecutableKind.DEFINITION,
509                    # Do `as_posix` to serialize windows path back to POSIX
510                    path=t.cast(Path, file_path).relative_to(path.absolute()).as_posix(),
511                    alias=k if name != k else None,
512                    is_metadata=is_metadata,
513                )
514            else:
515                serialized[k] = Executable(
516                    payload=f"from {v.__module__} import {name}",
517                    kind=ExecutableKind.IMPORT,
518                    is_metadata=is_metadata,
519                )
520        else:
521            raise SQLMeshError(
522                f"Object '{v}' cannot be serialized. If it's defined in a library, import the corresponding "
523                "module and reference the object using its fully-qualified name. For example, the datetime "
524                "module's 'UTC' object should be accessed as 'datetime.UTC'."
525            )
526
527    return serialized

Serializes a python function into a self contained dictionary.

Recursively walks a function's globals to store all other references inside of env.

Arguments:
  • env: Dictionary to store the env.
  • path: The root path to seralize. Other modules will not be walked and treated as imports.
def prepare_env( python_env: Dict[str, Executable], env: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
530def prepare_env(
531    python_env: t.Dict[str, Executable],
532    env: t.Optional[t.Dict[str, t.Any]] = None,
533) -> t.Dict[str, t.Any]:
534    """Prepare a python env by hydrating and executing functions.
535
536    The Python ENV is stored in a json serializable format.
537    Functions and imports are stored as a special data class.
538
539    Args:
540        python_env: The dictionary containing the serialized python environment.
541        env: The dictionary to execute code in.
542
543    Returns:
544        The prepared environment with hydrated functions.
545    """
546    env = {} if env is None else env
547
548    for name, executable in sorted(
549        python_env.items(), key=lambda item: 0 if item[1].is_import else 1
550    ):
551        if executable.is_value:
552            env[name] = eval(executable.payload)
553        else:
554            exec(executable.payload, env)
555            if executable.alias and executable.name:
556                env[executable.alias] = env[executable.name]
557
558    return env

Prepare a python env by hydrating and executing functions.

The Python ENV is stored in a json serializable format. Functions and imports are stored as a special data class.

Arguments:
  • python_env: The dictionary containing the serialized python environment.
  • env: The dictionary to execute code in.
Returns:

The prepared environment with hydrated functions.

def format_evaluated_code_exception( exception: Exception, python_env: Dict[str, Executable]) -> str:
561def format_evaluated_code_exception(
562    exception: Exception,
563    python_env: t.Dict[str, Executable],
564) -> str:
565    """Formats exceptions that occur from evaled code.
566
567    Stack traces generated by evaled code lose code context and are difficult to debug.
568    This intercepts the default stack trace and tries to make it debuggable.
569
570    Args:
571        exception: The exception to print the stack trace for.
572        python_env: The environment containing stringified python code.
573    """
574    tb: t.List[str] = []
575    indent = ""
576
577    skip_patterns = re.compile(
578        r"Traceback \(most recent call last\):|"
579        r'File ".*?core/model/definition\.py|'
580        r'File ".*?core/snapshot/definition\.py|'
581        r'File ".*?core/macros\.py|'
582        r'File ".*?inspect\.py'
583    )
584
585    for error_line in format_exception(exception):
586        if skip_patterns.search(error_line):
587            continue
588
589        error_match = re.search("^.*?Error: ", error_line)
590        if error_match:
591            tb.append(f"{indent * 2}  {error_line}")
592            continue
593
594        eval_code_match = re.search('File "<string>", line (.*), in (.*)', error_line)
595        if not eval_code_match:
596            tb.append(f"{indent}{error_line}")
597            continue
598
599        line_num = int(eval_code_match.group(1))
600        func = eval_code_match.group(2)
601
602        if func not in python_env:
603            tb.append(error_line)
604            continue
605
606        executable = python_env[func]
607        indent = error_line[: eval_code_match.start()]
608
609        error_line = (
610            f"{indent}File '{executable.path}' (or imported file), line {line_num}, in {func}"
611        )
612
613        code = executable.payload
614        formatted = []
615
616        for i, code_line in enumerate(code.splitlines()):
617            if i < line_num:
618                pad = len(code_line) - len(code_line.lstrip())
619                if i + 1 == line_num:
620                    formatted.append(f"{code_line[:pad]}{code_line[pad:]}")
621                else:
622                    formatted.append(code_line)
623
624        tb.extend(
625            (
626                error_line,
627                textwrap.indent(
628                    os.linesep.join(formatted),
629                    indent + "  ",
630                ),
631            )
632        )
633
634    return os.linesep.join(tb)

Formats exceptions that occur from evaled code.

Stack traces generated by evaled code lose code context and are difficult to debug. This intercepts the default stack trace and tries to make it debuggable.

Arguments:
  • exception: The exception to print the stack trace for.
  • python_env: The environment containing stringified python code.
def import_python_file( path: pathlib.Path, relative_base: pathlib.Path = PosixPath('.')) -> module:
665def import_python_file(path: Path, relative_base: Path = Path()) -> types.ModuleType:
666    relative_path = path.absolute().relative_to(relative_base.absolute())
667    module_name = str(relative_path.with_suffix("")).replace(os.path.sep, ".")
668
669    # remove the entire module hierarchy in case they were already loaded
670    parts = module_name.split(".")
671    for i in range(len(parts)):
672        sys.modules.pop(".".join(parts[0 : i + 1]), None)
673
674    return importlib.import_module(module_name)