Edit on GitHub

sqlmesh.utils.metaprogramming

  1from __future__ import annotations
  2
  3import ast
  4import dis
  5import importlib
  6import inspect
  7import linecache
  8import os
  9import re
 10import sys
 11import textwrap
 12import types
 13import typing as t
 14from enum import Enum
 15from pathlib import Path
 16
 17from astor import to_source
 18
 19from sqlmesh.core import constants as c
 20from sqlmesh.utils import format_exception, unique
 21from sqlmesh.utils.errors import SQLMeshError
 22from sqlmesh.utils.pydantic import PydanticModel
 23
 24IGNORE_DECORATORS = {"macro", "model"}
 25
 26
 27def _is_relative_to(path: t.Optional[Path | str], other: t.Optional[Path | str]) -> bool:
 28    """path.is_relative_to compatibility, was only supported >= 3.9"""
 29    if path is None or other is None:
 30        return False
 31
 32    if isinstance(path, str):
 33        path = Path(path)
 34    if isinstance(other, str):
 35        other = Path(other)
 36
 37    if "site-packages" in str(path):
 38        return False
 39
 40    try:
 41        path.absolute().relative_to(other.absolute())
 42        return True
 43    except ValueError:
 44        return False
 45
 46
 47def _code_globals(code: types.CodeType) -> t.Dict[str, None]:
 48    variables = {
 49        instruction.argval: None
 50        for instruction in dis.get_instructions(code)
 51        if instruction.opname == "LOAD_GLOBAL"
 52    }
 53
 54    for const in code.co_consts:
 55        if isinstance(const, types.CodeType):
 56            variables.update(_code_globals(const))
 57
 58    return variables
 59
 60
 61def func_globals(func: t.Callable) -> t.Dict[str, t.Any]:
 62    """Finds all global references and closures in a function and nested functions.
 63
 64    This function treats closures as global variables, which could cause problems in the future.
 65
 66    Args:
 67        func: The function to introspect
 68
 69    Returns:
 70        A dictionary of all global references.
 71    """
 72    variables = {}
 73
 74    if hasattr(func, "__code__"):
 75        code = func.__code__
 76
 77        for var in list(_code_globals(code)) + decorators(func):
 78            if var in func.__globals__:
 79                ref = func.__globals__[var]
 80                variables[var] = ref
 81
 82        if func.__closure__:
 83            for var, value in zip(code.co_freevars, func.__closure__):
 84                variables[var] = value.cell_contents
 85
 86    return variables
 87
 88
 89class ClassFoundException(Exception):
 90    pass
 91
 92
 93class _ClassFinder(ast.NodeVisitor):
 94    def __init__(self, qualname: str) -> None:
 95        self.stack: t.List[str] = []
 96        self.qualname = qualname
 97
 98    def visit_FunctionDef(self, node: ast.FunctionDef) -> None:
 99        self.stack.append(node.name)
100        self.stack.append("<locals>")
101        self.generic_visit(node)
102        self.stack.pop()
103        self.stack.pop()
104
105    visit_AsyncFunctionDef = visit_FunctionDef  # type: ignore
106
107    def visit_ClassDef(self, node: ast.ClassDef) -> None:
108        self.stack.append(node.name)
109        if self.qualname == ".".join(self.stack):
110            # Return the decorator for the class if present
111            if node.decorator_list:
112                line_number = node.decorator_list[0].lineno
113            else:
114                line_number = node.lineno
115
116            # decrement by one since lines starts with indexing by zero
117            line_number -= 1
118            raise ClassFoundException(line_number)
119        self.generic_visit(node)
120        self.stack.pop()
121
122
123def getsource(obj: t.Any) -> str:
124    """Get the source of a function or class.
125
126    inspect.getsource doesn't find decorators in python < 3.9
127    https://github.com/python/cpython/commit/696136b993e11b37c4f34d729a0375e5ad544ade
128    """
129    path = inspect.getsourcefile(obj)
130    if path:
131        module = inspect.getmodule(obj, path)
132
133        if module:
134            lines = linecache.getlines(path, module.__dict__)
135        else:
136            lines = linecache.getlines(path)
137
138        def join_source(lnum: int) -> str:
139            return "".join(inspect.getblock(lines[lnum:]))
140
141        if inspect.isclass(obj):
142            qualname = obj.__qualname__
143            source = "".join(lines)
144            tree = ast.parse(source)
145            class_finder = _ClassFinder(qualname)
146            try:
147                class_finder.visit(tree)
148            except ClassFoundException as e:
149                return join_source(e.args[0])
150        elif inspect.isfunction(obj):
151            obj = obj.__code__
152            if hasattr(obj, "co_firstlineno"):
153                lnum = obj.co_firstlineno - 1
154                pat = re.compile(r"^(\s*def\s)|(\s*async\s+def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)")
155                while lnum > 0:
156                    try:
157                        line = lines[lnum]
158                    except IndexError:
159                        raise OSError("lineno is out of bounds")
160                    if pat.match(line):
161                        break
162                    lnum = lnum - 1
163                return join_source(lnum)
164    raise SQLMeshError(f"Cannot find source for {obj}")
165
166
167def parse_source(func: t.Callable) -> ast.Module:
168    """Parse a function and returns an ast node."""
169    return ast.parse(textwrap.dedent(getsource(func)))
170
171
172def _decorator_name(decorator: ast.expr) -> str:
173    if isinstance(decorator, ast.Call):
174        return decorator.func.id  # type: ignore
175    if isinstance(decorator, ast.Name):
176        return decorator.id
177    return ""
178
179
180def decorators(func: t.Callable) -> t.List[str]:
181    """Finds a list of all the decorators of a callable."""
182    root_node = parse_source(func)
183    decorators = []
184
185    for node in ast.walk(root_node):
186        if isinstance(node, (ast.FunctionDef, ast.ClassDef)):
187            for decorator in node.decorator_list:
188                name = _decorator_name(decorator)
189                if name not in IGNORE_DECORATORS:
190                    decorators.append(name)
191    return unique(decorators)
192
193
194def normalize_source(obj: t.Any) -> str:
195    """Rewrites an object's source with formatting and doc strings removed by using Python ast.
196
197    Args:
198        obj: The object to fetch source from and convert to a string.
199
200    Returns:
201        A string representation of the normalized function.
202    """
203    root_node = parse_source(obj)
204
205    for node in ast.walk(root_node):
206        if isinstance(node, (ast.FunctionDef, ast.ClassDef)):
207            for decorator in node.decorator_list:
208                if _decorator_name(decorator) in IGNORE_DECORATORS:
209                    node.decorator_list.remove(decorator)
210
211            # remove docstrings
212            body = node.body
213            if body and isinstance(body[0], ast.Expr) and isinstance(body[0].value, ast.Str):
214                node.body = body[1:]
215
216            # remove function return type annotation
217            if isinstance(node, ast.FunctionDef):
218                node.returns = None
219
220    return to_source(root_node).strip()
221
222
223def build_env(
224    obj: t.Any,
225    *,
226    env: t.Dict[str, t.Any],
227    name: str,
228    path: Path,
229) -> None:
230    """Fills in env dictionary with all globals needed to execute the object.
231
232    Recursively traverse classes and functions.
233
234    Args:
235        obj: Any python object.
236        env: Dictionary to store the env.
237        name: Name of the object in the env.
238        path: The module path to serialize. Other modules will not be walked and treated as imports.
239    """
240
241    obj_module = inspect.getmodule(obj)
242
243    if obj_module and obj_module.__name__ == "builtins":
244        return
245
246    def walk(obj: t.Any) -> None:
247        if inspect.isclass(obj):
248            for decorator in decorators(obj):
249                if obj_module and decorator in obj_module.__dict__:
250                    build_env(
251                        obj_module.__dict__[decorator],
252                        env=env,
253                        name=decorator,
254                        path=path,
255                    )
256
257            for base in obj.__bases__:
258                build_env(base, env=env, name=base.__qualname__, path=path)
259
260            for k, v in obj.__dict__.items():
261                if k.startswith("__"):
262                    continue
263                # traverse methods in a class to find global references
264                if isinstance(v, (classmethod, staticmethod)):
265                    v = v.__func__
266                if callable(v):
267                    # if the method is a part of the object, walk it
268                    # else it is a global function and we just store it
269                    if v.__qualname__.startswith(obj.__qualname__):
270                        walk(v)
271                    else:
272                        build_env(v, env=env, name=v.__name__, path=path)
273        elif callable(obj):
274            for k, v in func_globals(obj).items():
275                build_env(v, env=env, name=k, path=path)
276
277    if name not in env:
278        # We only need to add the undecorated code of @macro() functions in env, which
279        # is accessible through the `__wrapped__` attribute added by functools.wraps
280        env[name] = obj.__wrapped__ if hasattr(obj, c.SQLMESH_MACRO) else obj
281
282        if (
283            obj_module
284            and hasattr(obj_module, "__file__")
285            and _is_relative_to(obj_module.__file__, path)
286        ):
287            walk(env[name])
288    elif env[name] != obj:
289        raise SQLMeshError(
290            f"Cannot store {obj} in environment, duplicate definitions found for '{name}'"
291        )
292
293
294class ExecutableKind(str, Enum):
295    """The kind of of executable. The order of the members is used when serializing the python model to text."""
296
297    IMPORT = "import"
298    VALUE = "value"
299    DEFINITION = "definition"
300
301    def __lt__(self, other: t.Any) -> bool:
302        if not isinstance(other, ExecutableKind):
303            return NotImplemented
304        values = list(ExecutableKind.__dict__.values())
305        return values.index(self) < values.index(other)
306
307    def __str__(self) -> str:
308        return self.value
309
310
311class Executable(PydanticModel):
312    payload: str
313    kind: ExecutableKind = ExecutableKind.DEFINITION
314    name: t.Optional[str] = None
315    path: t.Optional[str] = None
316    alias: t.Optional[str] = None
317
318    @property
319    def is_definition(self) -> bool:
320        return self.kind == ExecutableKind.DEFINITION
321
322    @property
323    def is_import(self) -> bool:
324        return self.kind == ExecutableKind.IMPORT
325
326    @property
327    def is_value(self) -> bool:
328        return self.kind == ExecutableKind.VALUE
329
330    @classmethod
331    def value(cls, v: t.Any) -> Executable:
332        return Executable(payload=repr(v), kind=ExecutableKind.VALUE)
333
334
335def serialize_env(env: t.Dict[str, t.Any], path: Path) -> t.Dict[str, Executable]:
336    """Serializes a python function into a self contained dictionary.
337
338    Recursively walks a function's globals to store all other references inside of env.
339
340    Args:
341        env: Dictionary to store the env.
342        path: The root path to seralize. Other modules will not be walked and treated as imports.
343    """
344    serialized = {}
345
346    for k, v in env.items():
347        if callable(v):
348            name = v.__name__
349            name = k if name == "<lambda>" else name
350            file_path = Path(inspect.getfile(v))
351
352            if _is_relative_to(file_path, path):
353                serialized[k] = Executable(
354                    name=name,
355                    payload=normalize_source(v),
356                    kind=ExecutableKind.DEFINITION,
357                    # Do `as_posix` to serialize windows path back to POSIX
358                    path=file_path.relative_to(path.absolute()).as_posix(),
359                    alias=k if name != k else None,
360                )
361            else:
362                serialized[k] = Executable(
363                    payload=f"from {v.__module__} import {name}",
364                    kind=ExecutableKind.IMPORT,
365                )
366        elif inspect.ismodule(v):
367            name = v.__name__
368            if hasattr(v, "__file__") and _is_relative_to(v.__file__, path):
369                raise SQLMeshError(
370                    f"Cannot serialize 'import {name}'. Use 'from {name} import ...' instead."
371                )
372            postfix = "" if name == k else f" as {k}"
373            serialized[k] = Executable(
374                payload=f"import {name}{postfix}",
375                kind=ExecutableKind.IMPORT,
376            )
377        else:
378            serialized[k] = Executable.value(v)
379
380    return serialized
381
382
383def prepare_env(
384    python_env: t.Dict[str, Executable],
385    env: t.Optional[t.Dict[str, t.Any]] = None,
386) -> t.Dict[str, t.Any]:
387    """Prepare a python env by hydrating and executing functions.
388
389    The Python ENV is stored in a json serializable format.
390    Functions and imports are stored as a special data class.
391
392    Args:
393        python_env: The dictionary containing the serialized python environment.
394        env: The dictionary to execute code in.
395
396    Returns:
397        The prepared environment with hydrated functions.
398    """
399    env = {} if env is None else env
400
401    for name, executable in sorted(
402        python_env.items(), key=lambda item: 0 if item[1].is_import else 1
403    ):
404        if executable.is_value:
405            env[name] = ast.literal_eval(executable.payload)
406        else:
407            exec(executable.payload, env)
408            if executable.alias and executable.name:
409                env[executable.alias] = env[executable.name]
410    return env
411
412
413def print_exception(
414    exception: Exception,
415    python_env: t.Dict[str, Executable],
416    out: t.TextIO = sys.stderr,
417) -> None:
418    """Formats exceptions that occur from evaled code.
419
420    Stack traces generated by evaled code lose code context and are difficult to debug.
421    This intercepts the default stack trace and tries to make it debuggable.
422
423    Args:
424        exception: The exception to print the stack trace for.
425        python_env: The environment containing stringified python code.
426        out: The output stream to write to.
427    """
428    tb: t.List[str] = []
429
430    for error_line in format_exception(exception):
431        match = re.search('File "<string>", line (.*), in (.*)', error_line)
432
433        if not match:
434            tb.append(error_line)
435            continue
436
437        line_num = int(match.group(1))
438        func = match.group(2)
439
440        if func not in python_env:
441            tb.append(error_line)
442            continue
443
444        executable = python_env[func]
445        indent = error_line[: match.start()]
446
447        error_line = (
448            f"{indent}File '{executable.path}' (or imported file), line {line_num}, in {func}"
449        )
450
451        code = executable.payload
452        formatted = []
453
454        for i, code_line in enumerate(code.splitlines()):
455            if i < line_num:
456                pad = len(code_line) - len(code_line.lstrip())
457                if i + 1 == line_num:
458                    formatted.append(f"{code_line[:pad]}{code_line[pad:]}")
459                else:
460                    formatted.append(code_line)
461
462        tb.extend(
463            (
464                error_line,
465                textwrap.indent(
466                    os.linesep.join(formatted),
467                    indent + "  ",
468                ),
469                os.linesep,
470            )
471        )
472
473    out.write(os.linesep.join(tb))
474
475
476def import_python_file(path: Path, relative_base: Path = Path()) -> types.ModuleType:
477    relative_path = path.absolute().relative_to(relative_base.absolute())
478    module_name = str(relative_path.with_suffix("")).replace(os.path.sep, ".")
479
480    # remove the entire module hierarchy in case they were already loaded
481    parts = module_name.split(".")
482    for i in range(len(parts)):
483        sys.modules.pop(".".join(parts[0 : i + 1]), None)
484
485    return importlib.import_module(module_name)
def func_globals(func: Callable) -> Dict[str, Any]:
62def func_globals(func: t.Callable) -> t.Dict[str, t.Any]:
63    """Finds all global references and closures in a function and nested functions.
64
65    This function treats closures as global variables, which could cause problems in the future.
66
67    Args:
68        func: The function to introspect
69
70    Returns:
71        A dictionary of all global references.
72    """
73    variables = {}
74
75    if hasattr(func, "__code__"):
76        code = func.__code__
77
78        for var in list(_code_globals(code)) + decorators(func):
79            if var in func.__globals__:
80                ref = func.__globals__[var]
81                variables[var] = ref
82
83        if func.__closure__:
84            for var, value in zip(code.co_freevars, func.__closure__):
85                variables[var] = value.cell_contents
86
87    return variables

Finds all global references and closures in a function and nested functions.

This function treats closures as global variables, which could cause problems in the future.

Arguments:
  • func: The function to introspect
Returns:

A dictionary of all global references.

class ClassFoundException(builtins.Exception):
90class ClassFoundException(Exception):
91    pass

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
def getsource(obj: Any) -> str:
124def getsource(obj: t.Any) -> str:
125    """Get the source of a function or class.
126
127    inspect.getsource doesn't find decorators in python < 3.9
128    https://github.com/python/cpython/commit/696136b993e11b37c4f34d729a0375e5ad544ade
129    """
130    path = inspect.getsourcefile(obj)
131    if path:
132        module = inspect.getmodule(obj, path)
133
134        if module:
135            lines = linecache.getlines(path, module.__dict__)
136        else:
137            lines = linecache.getlines(path)
138
139        def join_source(lnum: int) -> str:
140            return "".join(inspect.getblock(lines[lnum:]))
141
142        if inspect.isclass(obj):
143            qualname = obj.__qualname__
144            source = "".join(lines)
145            tree = ast.parse(source)
146            class_finder = _ClassFinder(qualname)
147            try:
148                class_finder.visit(tree)
149            except ClassFoundException as e:
150                return join_source(e.args[0])
151        elif inspect.isfunction(obj):
152            obj = obj.__code__
153            if hasattr(obj, "co_firstlineno"):
154                lnum = obj.co_firstlineno - 1
155                pat = re.compile(r"^(\s*def\s)|(\s*async\s+def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)")
156                while lnum > 0:
157                    try:
158                        line = lines[lnum]
159                    except IndexError:
160                        raise OSError("lineno is out of bounds")
161                    if pat.match(line):
162                        break
163                    lnum = lnum - 1
164                return join_source(lnum)
165    raise SQLMeshError(f"Cannot find source for {obj}")

Get the source of a function or class.

inspect.getsource doesn't find decorators in python < 3.9 https://github.com/python/cpython/commit/696136b993e11b37c4f34d729a0375e5ad544ade

def parse_source(func: Callable) -> _ast.Module:
168def parse_source(func: t.Callable) -> ast.Module:
169    """Parse a function and returns an ast node."""
170    return ast.parse(textwrap.dedent(getsource(func)))

Parse a function and returns an ast node.

def decorators(func: Callable) -> List[str]:
181def decorators(func: t.Callable) -> t.List[str]:
182    """Finds a list of all the decorators of a callable."""
183    root_node = parse_source(func)
184    decorators = []
185
186    for node in ast.walk(root_node):
187        if isinstance(node, (ast.FunctionDef, ast.ClassDef)):
188            for decorator in node.decorator_list:
189                name = _decorator_name(decorator)
190                if name not in IGNORE_DECORATORS:
191                    decorators.append(name)
192    return unique(decorators)

Finds a list of all the decorators of a callable.

def normalize_source(obj: Any) -> str:
195def normalize_source(obj: t.Any) -> str:
196    """Rewrites an object's source with formatting and doc strings removed by using Python ast.
197
198    Args:
199        obj: The object to fetch source from and convert to a string.
200
201    Returns:
202        A string representation of the normalized function.
203    """
204    root_node = parse_source(obj)
205
206    for node in ast.walk(root_node):
207        if isinstance(node, (ast.FunctionDef, ast.ClassDef)):
208            for decorator in node.decorator_list:
209                if _decorator_name(decorator) in IGNORE_DECORATORS:
210                    node.decorator_list.remove(decorator)
211
212            # remove docstrings
213            body = node.body
214            if body and isinstance(body[0], ast.Expr) and isinstance(body[0].value, ast.Str):
215                node.body = body[1:]
216
217            # remove function return type annotation
218            if isinstance(node, ast.FunctionDef):
219                node.returns = None
220
221    return to_source(root_node).strip()

Rewrites an object's source with formatting and doc strings removed by using Python ast.

Arguments:
  • obj: The object to fetch source from and convert to a string.
Returns:

A string representation of the normalized function.

def build_env(obj: Any, *, env: Dict[str, Any], name: str, path: pathlib.Path) -> None:
224def build_env(
225    obj: t.Any,
226    *,
227    env: t.Dict[str, t.Any],
228    name: str,
229    path: Path,
230) -> None:
231    """Fills in env dictionary with all globals needed to execute the object.
232
233    Recursively traverse classes and functions.
234
235    Args:
236        obj: Any python object.
237        env: Dictionary to store the env.
238        name: Name of the object in the env.
239        path: The module path to serialize. Other modules will not be walked and treated as imports.
240    """
241
242    obj_module = inspect.getmodule(obj)
243
244    if obj_module and obj_module.__name__ == "builtins":
245        return
246
247    def walk(obj: t.Any) -> None:
248        if inspect.isclass(obj):
249            for decorator in decorators(obj):
250                if obj_module and decorator in obj_module.__dict__:
251                    build_env(
252                        obj_module.__dict__[decorator],
253                        env=env,
254                        name=decorator,
255                        path=path,
256                    )
257
258            for base in obj.__bases__:
259                build_env(base, env=env, name=base.__qualname__, path=path)
260
261            for k, v in obj.__dict__.items():
262                if k.startswith("__"):
263                    continue
264                # traverse methods in a class to find global references
265                if isinstance(v, (classmethod, staticmethod)):
266                    v = v.__func__
267                if callable(v):
268                    # if the method is a part of the object, walk it
269                    # else it is a global function and we just store it
270                    if v.__qualname__.startswith(obj.__qualname__):
271                        walk(v)
272                    else:
273                        build_env(v, env=env, name=v.__name__, path=path)
274        elif callable(obj):
275            for k, v in func_globals(obj).items():
276                build_env(v, env=env, name=k, path=path)
277
278    if name not in env:
279        # We only need to add the undecorated code of @macro() functions in env, which
280        # is accessible through the `__wrapped__` attribute added by functools.wraps
281        env[name] = obj.__wrapped__ if hasattr(obj, c.SQLMESH_MACRO) else obj
282
283        if (
284            obj_module
285            and hasattr(obj_module, "__file__")
286            and _is_relative_to(obj_module.__file__, path)
287        ):
288            walk(env[name])
289    elif env[name] != obj:
290        raise SQLMeshError(
291            f"Cannot store {obj} in environment, duplicate definitions found for '{name}'"
292        )

Fills in env dictionary with all globals needed to execute the object.

Recursively traverse classes and functions.

Arguments:
  • obj: Any python object.
  • env: Dictionary to store the env.
  • name: Name of the object in the env.
  • path: The module path to serialize. Other modules will not be walked and treated as imports.
class ExecutableKind(builtins.str, enum.Enum):
295class ExecutableKind(str, Enum):
296    """The kind of of executable. The order of the members is used when serializing the python model to text."""
297
298    IMPORT = "import"
299    VALUE = "value"
300    DEFINITION = "definition"
301
302    def __lt__(self, other: t.Any) -> bool:
303        if not isinstance(other, ExecutableKind):
304            return NotImplemented
305        values = list(ExecutableKind.__dict__.values())
306        return values.index(self) < values.index(other)
307
308    def __str__(self) -> str:
309        return self.value

The kind of of executable. The order of the members is used when serializing the python model to text.

IMPORT = <ExecutableKind.IMPORT: 'import'>
VALUE = <ExecutableKind.VALUE: 'value'>
DEFINITION = <ExecutableKind.DEFINITION: 'definition'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class Executable(sqlmesh.utils.pydantic.PydanticModel):
312class Executable(PydanticModel):
313    payload: str
314    kind: ExecutableKind = ExecutableKind.DEFINITION
315    name: t.Optional[str] = None
316    path: t.Optional[str] = None
317    alias: t.Optional[str] = None
318
319    @property
320    def is_definition(self) -> bool:
321        return self.kind == ExecutableKind.DEFINITION
322
323    @property
324    def is_import(self) -> bool:
325        return self.kind == ExecutableKind.IMPORT
326
327    @property
328    def is_value(self) -> bool:
329        return self.kind == ExecutableKind.VALUE
330
331    @classmethod
332    def value(cls, v: t.Any) -> Executable:
333        return Executable(payload=repr(v), kind=ExecutableKind.VALUE)

Usage docs: https://docs.pydantic.dev/2.7/concepts/models/

A base class for creating Pydantic models.

Attributes:
  • __class_vars__: The names of classvars defined on the model.
  • __private_attributes__: Metadata about the private attributes of the model.
  • __signature__: The signature for instantiating the model.
  • __pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
  • __pydantic_core_schema__: The pydantic-core schema used to build the SchemaValidator and SchemaSerializer.
  • __pydantic_custom_init__: Whether the model has a custom __init__ function.
  • __pydantic_decorators__: Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
  • __pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
  • __pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
  • __pydantic_post_init__: The name of the post-init method for the model, if defined.
  • __pydantic_root_model__: Whether the model is a RootModel.
  • __pydantic_serializer__: The pydantic-core SchemaSerializer used to dump instances of the model.
  • __pydantic_validator__: The pydantic-core SchemaValidator used to validate instances of the model.
  • __pydantic_extra__: An instance attribute with the values of extra fields from validation when model_config['extra'] == 'allow'.
  • __pydantic_fields_set__: An instance attribute with the names of fields explicitly set.
  • __pydantic_private__: Instance attribute with the values of private attributes set on the model instance.
@classmethod
def value(cls, v: Any) -> sqlmesh.utils.metaprogramming.Executable:
331    @classmethod
332    def value(cls, v: t.Any) -> Executable:
333        return Executable(payload=repr(v), kind=ExecutableKind.VALUE)
Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_rebuild
model_validate
model_validate_json
model_validate_strings
parse_file
from_orm
construct
schema
schema_json
validate
update_forward_refs
sqlmesh.utils.pydantic.PydanticModel
dict
json
copy
parse_obj
parse_raw
missing_required_fields
extra_fields
all_fields
all_field_infos
required_fields
model_post_init
def serialize_env( env: Dict[str, Any], path: pathlib.Path) -> Dict[str, sqlmesh.utils.metaprogramming.Executable]:
336def serialize_env(env: t.Dict[str, t.Any], path: Path) -> t.Dict[str, Executable]:
337    """Serializes a python function into a self contained dictionary.
338
339    Recursively walks a function's globals to store all other references inside of env.
340
341    Args:
342        env: Dictionary to store the env.
343        path: The root path to seralize. Other modules will not be walked and treated as imports.
344    """
345    serialized = {}
346
347    for k, v in env.items():
348        if callable(v):
349            name = v.__name__
350            name = k if name == "<lambda>" else name
351            file_path = Path(inspect.getfile(v))
352
353            if _is_relative_to(file_path, path):
354                serialized[k] = Executable(
355                    name=name,
356                    payload=normalize_source(v),
357                    kind=ExecutableKind.DEFINITION,
358                    # Do `as_posix` to serialize windows path back to POSIX
359                    path=file_path.relative_to(path.absolute()).as_posix(),
360                    alias=k if name != k else None,
361                )
362            else:
363                serialized[k] = Executable(
364                    payload=f"from {v.__module__} import {name}",
365                    kind=ExecutableKind.IMPORT,
366                )
367        elif inspect.ismodule(v):
368            name = v.__name__
369            if hasattr(v, "__file__") and _is_relative_to(v.__file__, path):
370                raise SQLMeshError(
371                    f"Cannot serialize 'import {name}'. Use 'from {name} import ...' instead."
372                )
373            postfix = "" if name == k else f" as {k}"
374            serialized[k] = Executable(
375                payload=f"import {name}{postfix}",
376                kind=ExecutableKind.IMPORT,
377            )
378        else:
379            serialized[k] = Executable.value(v)
380
381    return serialized

Serializes a python function into a self contained dictionary.

Recursively walks a function's globals to store all other references inside of env.

Arguments:
  • env: Dictionary to store the env.
  • path: The root path to seralize. Other modules will not be walked and treated as imports.
def prepare_env( python_env: Dict[str, sqlmesh.utils.metaprogramming.Executable], env: Union[Dict[str, Any], NoneType] = None) -> Dict[str, Any]:
384def prepare_env(
385    python_env: t.Dict[str, Executable],
386    env: t.Optional[t.Dict[str, t.Any]] = None,
387) -> t.Dict[str, t.Any]:
388    """Prepare a python env by hydrating and executing functions.
389
390    The Python ENV is stored in a json serializable format.
391    Functions and imports are stored as a special data class.
392
393    Args:
394        python_env: The dictionary containing the serialized python environment.
395        env: The dictionary to execute code in.
396
397    Returns:
398        The prepared environment with hydrated functions.
399    """
400    env = {} if env is None else env
401
402    for name, executable in sorted(
403        python_env.items(), key=lambda item: 0 if item[1].is_import else 1
404    ):
405        if executable.is_value:
406            env[name] = ast.literal_eval(executable.payload)
407        else:
408            exec(executable.payload, env)
409            if executable.alias and executable.name:
410                env[executable.alias] = env[executable.name]
411    return env

Prepare a python env by hydrating and executing functions.

The Python ENV is stored in a json serializable format. Functions and imports are stored as a special data class.

Arguments:
  • python_env: The dictionary containing the serialized python environment.
  • env: The dictionary to execute code in.
Returns:

The prepared environment with hydrated functions.

def import_python_file( path: pathlib.Path, relative_base: pathlib.Path = PosixPath('.')) -> module:
477def import_python_file(path: Path, relative_base: Path = Path()) -> types.ModuleType:
478    relative_path = path.absolute().relative_to(relative_base.absolute())
479    module_name = str(relative_path.with_suffix("")).replace(os.path.sep, ".")
480
481    # remove the entire module hierarchy in case they were already loaded
482    parts = module_name.split(".")
483    for i in range(len(parts)):
484        sys.modules.pop(".".join(parts[0 : i + 1]), None)
485
486    return importlib.import_module(module_name)