Edit on GitHub

sqlmesh.utils

  1from __future__ import annotations
  2
  3import copy
  4import importlib
  5import logging
  6import os
  7import random
  8import re
  9import string
 10import sys
 11import time
 12import traceback
 13import types
 14import typing as t
 15import uuid
 16from dataclasses import dataclass
 17from collections import defaultdict
 18from contextlib import contextmanager
 19from copy import deepcopy
 20from enum import IntEnum, Enum
 21from functools import lru_cache, reduce, wraps
 22from pathlib import Path
 23
 24import unicodedata
 25from sqlglot import exp
 26from sqlglot.dialects.dialect import Dialects
 27
 28logger = logging.getLogger(__name__)
 29
 30T = t.TypeVar("T")
 31KEY = t.TypeVar("KEY", bound=t.Hashable)
 32VALUE = t.TypeVar("VALUE")
 33ITEM = t.TypeVar("ITEM")
 34GROUP = t.TypeVar("GROUP")
 35DECORATOR_RETURN_TYPE = t.TypeVar("DECORATOR_RETURN_TYPE")
 36
 37ALPHANUMERIC = string.ascii_lowercase + string.digits
 38
 39
 40def optional_import(name: str) -> t.Optional[types.ModuleType]:
 41    """Optionally import a module.
 42
 43    Args:
 44        name: The name of the module to import.
 45    Returns:
 46        The module if it is installed.
 47    """
 48    try:
 49        module = importlib.import_module(name)
 50    except ImportError:
 51        return None
 52    return module
 53
 54
 55def major_minor(version: str) -> t.Tuple[int, int]:
 56    """Returns a tuple of just the major.minor for a version string (major.minor.patch)."""
 57    return t.cast(t.Tuple[int, int], tuple(int(part) for part in version.split(".")[0:2]))
 58
 59
 60def unique(iterable: t.Iterable[T], by: t.Callable[[T], t.Any] = lambda i: i) -> t.List[T]:
 61    return list({by(i): None for i in iterable})
 62
 63
 64def random_id(short: bool = False) -> str:
 65    if short:
 66        return "".join(random.choices(ALPHANUMERIC, k=8))
 67
 68    return uuid.uuid4().hex
 69
 70
 71class UniqueKeyDict(t.Dict[KEY, VALUE]):
 72    """Dict that raises when a duplicate key is set."""
 73
 74    def __init__(self, name: str, *args: t.Dict[KEY, VALUE], **kwargs: VALUE) -> None:
 75        self.name = name
 76        super().__init__(*args, **kwargs)
 77
 78    def __setitem__(self, k: KEY, v: VALUE) -> None:
 79        if k in self:
 80            raise ValueError(
 81                f"Duplicate key '{k}' found in UniqueKeyDict<{self.name}>. Call dict.update(...) if this is intentional."
 82            )
 83        super().__setitem__(k, v)
 84
 85
 86class AttributeDict(dict, t.Mapping[KEY, VALUE]):
 87    def __getattr__(self, key: t.Any) -> t.Optional[VALUE]:
 88        if key.startswith("__") and not hasattr(self, key):
 89            raise AttributeError
 90        return self.get(key)
 91
 92    def set(self, field: str, value: t.Any) -> str:
 93        self[field] = value
 94        # Return an empty string, so that this method can be used within Jinja
 95        return ""
 96
 97    def __deepcopy__(self, memo: t.Dict[t.Any, AttributeDict]) -> AttributeDict:
 98        copy: AttributeDict = AttributeDict()
 99        memo[id(self)] = copy
100        for k, v in self.items():
101            copy[k] = deepcopy(v, memo)
102        return copy
103
104    def __call__(self, **kwargs: t.Dict[str, t.Any]) -> str:
105        self.update(**kwargs)
106        # Return an empty string, so that this method can be used within Jinja
107        return ""
108
109    def __getstate__(self) -> t.Optional[t.Dict[t.Any, t.Any]]:
110        return None
111
112
113class registry_decorator:
114    """A decorator that registers itself."""
115
116    registry_name = ""
117    _registry: t.Optional[UniqueKeyDict] = None
118
119    @classmethod
120    def registry(cls) -> UniqueKeyDict:
121        if cls._registry is None:
122            cls._registry = UniqueKeyDict(cls.registry_name)
123        return cls._registry
124
125    def __init__(self, name: str = "") -> None:
126        self.name = name
127
128    def __call__(
129        self, func: t.Callable[..., DECORATOR_RETURN_TYPE]
130    ) -> t.Callable[..., DECORATOR_RETURN_TYPE]:
131        self.func = func
132
133        registry = self.registry()
134        func_name = (self.name or func.__name__).lower()
135
136        try:
137            registry[func_name] = self
138        except ValueError:
139            # No need to raise due to duplicate key if the functions are identical
140            if func.__code__.co_code != registry[func_name].func.__code__.co_code:
141                raise ValueError(f"Duplicate name: '{func_name}'.")
142
143        @wraps(func)
144        def wrapper(*args: t.Any, **kwargs: t.Any) -> DECORATOR_RETURN_TYPE:
145            return func(*args, **kwargs)
146
147        return wrapper
148
149    @classmethod
150    def get_registry(cls) -> UniqueKeyDict:
151        """Get a copy of the registry"""
152        return UniqueKeyDict(cls.registry_name, **(cls._registry or {}))
153
154    @classmethod
155    def set_registry(cls, registry: UniqueKeyDict) -> None:
156        """Set the registry."""
157        cls._registry = registry
158
159
160@contextmanager
161def sys_path(*paths: Path) -> t.Iterator[None]:
162    """A context manager to temporarily add a path to 'sys.path'."""
163    inserted = set()
164
165    for path in paths:
166        path_str = str(path.absolute())
167
168        if path_str not in sys.path:
169            sys.path.insert(0, path_str)
170            inserted.add(path_str)
171
172    try:
173        yield
174    finally:
175        for path_str in inserted:
176            sys.path.remove(path_str)
177
178
179def format_exception(exception: BaseException) -> t.List[str]:
180    if sys.version_info < (3, 10):
181        return traceback.format_exception(type(exception), exception, exception.__traceback__)  # type: ignore
182    return traceback.format_exception(exception)  # type: ignore
183
184
185def word_characters_only(s: str, replacement_char: str = "_") -> str:
186    """
187    Replace all non-word characters in string with the replacement character.
188    Reference SO: https://stackoverflow.com/questions/1276764/stripping-everything-but-alphanumeric-chars-from-a-string-in-python/70310018#70310018
189
190    >>> word_characters_only("Hello, world!")
191    'Hello__world_'
192    >>> word_characters_only("Hello, world! 123", '')
193    'Helloworld123'
194    """
195    return re.sub(r"\W", replacement_char, s)
196
197
198def str_to_bool(s: t.Optional[str]) -> bool:
199    """
200    Convert a string to a boolean. disutils is being deprecated and it is recommended to implement your own version:
201    https://peps.python.org/pep-0632/
202
203    Unlike disutils, this actually returns a bool and never raises. If a value cannot be determined to be true
204    then false is returned.
205    """
206    if not s:
207        return False
208    return s.lower() in ("true", "1", "t", "y", "yes", "on")
209
210
211_debug_mode_enabled: bool = False
212
213
214def enable_debug_mode() -> None:
215    global _debug_mode_enabled
216    _debug_mode_enabled = True
217
218
219def debug_mode_enabled() -> bool:
220    return _debug_mode_enabled or str_to_bool(os.environ.get("SQLMESH_DEBUG"))
221
222
223def ttl_cache(ttl: int = 60, maxsize: int = 128000) -> t.Callable:
224    """Caches a function that clears whenever the current epoch / ttl seconds changes.
225
226    TTL is not exact, it is used as a salt. So by default, at every minute mark, the cache will be cleared.
227    This is done for simplicity.
228
229    Args:
230        ttl: The number of seconds to hold the cache for.
231        maxsize: The maximum size of the cache.
232    """
233
234    def decorator(func: t.Callable) -> t.Any:
235        @lru_cache(maxsize=maxsize)
236        def cache(tick: int, *args: t.Any, **kwargs: t.Any) -> t.Any:
237            return func(*args, **kwargs)
238
239        @wraps(func)
240        def wrap(*args: t.Any, **kwargs: t.Any) -> t.Any:
241            return cache(int(time.time() / ttl), *args, **kwargs)
242
243        return wrap
244
245    return decorator
246
247
248class classproperty(property):
249    """
250    Similar to a normal property but works for class methods
251    """
252
253    def __get__(self, obj: t.Any, owner: t.Any = None) -> t.Any:
254        return classmethod(self.fget).__get__(None, owner)()  # type: ignore
255
256
257@contextmanager
258def env_vars(environ: dict[str, str]) -> t.Iterator[None]:
259    """A context manager to temporarily modify environment variables."""
260    old_environ = os.environ.copy()
261    os.environ.update(environ)
262
263    try:
264        yield
265    finally:
266        os.environ.clear()
267        os.environ.update(old_environ)
268
269
270def merge_dicts(*args: t.Dict) -> t.Dict:
271    """
272    Merges dicts. Just does key collision replacement
273    """
274
275    def merge(a: t.Dict, b: t.Dict) -> t.Dict:
276        for b_key, b_value in b.items():
277            a_value = a.get(b_key)
278            if isinstance(a_value, dict) and isinstance(b_value, dict):
279                merge(a_value, b_value)
280            elif isinstance(b_value, dict):
281                a[b_key] = copy.deepcopy(b_value)
282            else:
283                a[b_key] = b_value
284        return a
285
286    return reduce(merge, args, {})
287
288
289def sqlglot_dialects() -> str:
290    return "'" + "', '".join(Dialects.__members__.values()) + "'"
291
292
293NON_ALNUM = re.compile(r"[^a-zA-Z0-9_]")
294
295NON_ALUM_INCLUDE_UNICODE = re.compile(r"\W", flags=re.UNICODE)
296
297
298def sanitize_name(name: str, *, include_unicode: bool = False) -> str:
299    if include_unicode:
300        s = unicodedata.normalize("NFC", name)
301        s = NON_ALUM_INCLUDE_UNICODE.sub("_", s)
302        return s
303    return NON_ALNUM.sub("_", name)
304
305
306def groupby(
307    items: t.Iterable[ITEM],
308    func: t.Callable[[ITEM], GROUP],
309) -> t.DefaultDict[GROUP, t.List[ITEM]]:
310    grouped = defaultdict(list)
311    for item in items:
312        grouped[func(item)].append(item)
313    return grouped
314
315
316def columns_to_types_to_struct(
317    columns_to_types: t.Union[t.Dict[str, exp.DataType], t.Dict[str, str]],
318) -> exp.DataType:
319    """
320    Converts a dict of column names to types to a struct.
321    """
322    return exp.DataType(
323        this=exp.DataType.Type.STRUCT,
324        expressions=[
325            exp.ColumnDef(this=exp.to_identifier(k), kind=v) for k, v in columns_to_types.items()
326        ],
327        nested=True,
328    )
329
330
331def type_is_known(d_type: t.Union[exp.DataType, exp.ColumnDef]) -> bool:
332    """Checks that a given column type is known and not NULL."""
333    if isinstance(d_type, exp.ColumnDef):
334        if not d_type.kind:
335            return False
336        d_type = d_type.kind
337    if isinstance(d_type, exp.DataTypeParam):
338        return True
339    if d_type.is_type(exp.DataType.Type.UNKNOWN, exp.DataType.Type.NULL):
340        return False
341    if d_type.expressions:
342        return all(type_is_known(expression) for expression in d_type.expressions)
343    return True
344
345
346def columns_to_types_all_known(columns_to_types: t.Dict[str, exp.DataType]) -> bool:
347    """Checks that all column types are known and not NULL."""
348    return all(type_is_known(expression) for expression in columns_to_types.values())
349
350
351class Verbosity(IntEnum):
352    """Verbosity levels for SQLMesh output."""
353
354    DEFAULT = 0
355    VERBOSE = 1
356    VERY_VERBOSE = 2
357
358    @property
359    def is_default(self) -> bool:
360        return self == Verbosity.DEFAULT
361
362    @property
363    def is_verbose(self) -> bool:
364        return self == Verbosity.VERBOSE
365
366    @property
367    def is_very_verbose(self) -> bool:
368        return self == Verbosity.VERY_VERBOSE
369
370
371class CompletionStatus(Enum):
372    SUCCESS = "success"
373    FAILURE = "failure"
374    NOTHING_TO_DO = "nothing_to_do"
375
376    @property
377    def is_success(self) -> bool:
378        return self == CompletionStatus.SUCCESS
379
380    @property
381    def is_failure(self) -> bool:
382        return self == CompletionStatus.FAILURE
383
384    @property
385    def is_nothing_to_do(self) -> bool:
386        return self == CompletionStatus.NOTHING_TO_DO
387
388
389def to_snake_case(name: str) -> str:
390    return "".join(
391        f"_{c.lower()}" if c.isupper() and idx != 0 else c.lower() for idx, c in enumerate(name)
392    )
393
394
395class JobType(Enum):
396    PLAN = "SQLMESH_PLAN"
397    RUN = "SQLMESH_RUN"
398
399
400@dataclass(frozen=True)
401class CorrelationId:
402    """ID that is added to each query in order to identify the job that created it."""
403
404    job_type: JobType
405    job_id: str
406
407    def __str__(self) -> str:
408        return f"{self.job_type.value}: {self.job_id}"
409
410    @classmethod
411    def from_plan_id(cls, plan_id: str) -> CorrelationId:
412        return CorrelationId(JobType.PLAN, plan_id)
413
414
415def get_source_columns_to_types(
416    columns_to_types: t.Dict[str, exp.DataType],
417    source_columns: t.Optional[t.List[str]],
418) -> t.Dict[str, exp.DataType]:
419    source_column_lookup = set(source_columns) if source_columns else None
420    return {
421        k: v
422        for k, v in columns_to_types.items()
423        if not source_column_lookup or k in source_column_lookup
424    }
logger = <Logger sqlmesh.utils (WARNING)>
ALPHANUMERIC = 'abcdefghijklmnopqrstuvwxyz0123456789'
def optional_import(name: str) -> Optional[module]:
41def optional_import(name: str) -> t.Optional[types.ModuleType]:
42    """Optionally import a module.
43
44    Args:
45        name: The name of the module to import.
46    Returns:
47        The module if it is installed.
48    """
49    try:
50        module = importlib.import_module(name)
51    except ImportError:
52        return None
53    return module

Optionally import a module.

Arguments:
  • name: The name of the module to import.
Returns:

The module if it is installed.

def major_minor(version: str) -> Tuple[int, int]:
56def major_minor(version: str) -> t.Tuple[int, int]:
57    """Returns a tuple of just the major.minor for a version string (major.minor.patch)."""
58    return t.cast(t.Tuple[int, int], tuple(int(part) for part in version.split(".")[0:2]))

Returns a tuple of just the major.minor for a version string (major.minor.patch).

def unique( iterable: Iterable[~T], by: Callable[[~T], Any] = <function <lambda>>) -> List[~T]:
61def unique(iterable: t.Iterable[T], by: t.Callable[[T], t.Any] = lambda i: i) -> t.List[T]:
62    return list({by(i): None for i in iterable})
def random_id(short: bool = False) -> str:
65def random_id(short: bool = False) -> str:
66    if short:
67        return "".join(random.choices(ALPHANUMERIC, k=8))
68
69    return uuid.uuid4().hex
class UniqueKeyDict(typing.Dict[~KEY, ~VALUE]):
72class UniqueKeyDict(t.Dict[KEY, VALUE]):
73    """Dict that raises when a duplicate key is set."""
74
75    def __init__(self, name: str, *args: t.Dict[KEY, VALUE], **kwargs: VALUE) -> None:
76        self.name = name
77        super().__init__(*args, **kwargs)
78
79    def __setitem__(self, k: KEY, v: VALUE) -> None:
80        if k in self:
81            raise ValueError(
82                f"Duplicate key '{k}' found in UniqueKeyDict<{self.name}>. Call dict.update(...) if this is intentional."
83            )
84        super().__setitem__(k, v)

Dict that raises when a duplicate key is set.

name
Inherited Members
builtins.dict
get
setdefault
pop
popitem
keys
items
values
update
fromkeys
clear
copy
class AttributeDict(builtins.dict, typing.Mapping[~KEY, ~VALUE]):
 87class AttributeDict(dict, t.Mapping[KEY, VALUE]):
 88    def __getattr__(self, key: t.Any) -> t.Optional[VALUE]:
 89        if key.startswith("__") and not hasattr(self, key):
 90            raise AttributeError
 91        return self.get(key)
 92
 93    def set(self, field: str, value: t.Any) -> str:
 94        self[field] = value
 95        # Return an empty string, so that this method can be used within Jinja
 96        return ""
 97
 98    def __deepcopy__(self, memo: t.Dict[t.Any, AttributeDict]) -> AttributeDict:
 99        copy: AttributeDict = AttributeDict()
100        memo[id(self)] = copy
101        for k, v in self.items():
102            copy[k] = deepcopy(v, memo)
103        return copy
104
105    def __call__(self, **kwargs: t.Dict[str, t.Any]) -> str:
106        self.update(**kwargs)
107        # Return an empty string, so that this method can be used within Jinja
108        return ""
109
110    def __getstate__(self) -> t.Optional[t.Dict[t.Any, t.Any]]:
111        return None
def set(self, field: str, value: Any) -> str:
93    def set(self, field: str, value: t.Any) -> str:
94        self[field] = value
95        # Return an empty string, so that this method can be used within Jinja
96        return ""
Inherited Members
builtins.dict
get
setdefault
pop
popitem
keys
items
values
update
fromkeys
clear
copy
class registry_decorator:
114class registry_decorator:
115    """A decorator that registers itself."""
116
117    registry_name = ""
118    _registry: t.Optional[UniqueKeyDict] = None
119
120    @classmethod
121    def registry(cls) -> UniqueKeyDict:
122        if cls._registry is None:
123            cls._registry = UniqueKeyDict(cls.registry_name)
124        return cls._registry
125
126    def __init__(self, name: str = "") -> None:
127        self.name = name
128
129    def __call__(
130        self, func: t.Callable[..., DECORATOR_RETURN_TYPE]
131    ) -> t.Callable[..., DECORATOR_RETURN_TYPE]:
132        self.func = func
133
134        registry = self.registry()
135        func_name = (self.name or func.__name__).lower()
136
137        try:
138            registry[func_name] = self
139        except ValueError:
140            # No need to raise due to duplicate key if the functions are identical
141            if func.__code__.co_code != registry[func_name].func.__code__.co_code:
142                raise ValueError(f"Duplicate name: '{func_name}'.")
143
144        @wraps(func)
145        def wrapper(*args: t.Any, **kwargs: t.Any) -> DECORATOR_RETURN_TYPE:
146            return func(*args, **kwargs)
147
148        return wrapper
149
150    @classmethod
151    def get_registry(cls) -> UniqueKeyDict:
152        """Get a copy of the registry"""
153        return UniqueKeyDict(cls.registry_name, **(cls._registry or {}))
154
155    @classmethod
156    def set_registry(cls, registry: UniqueKeyDict) -> None:
157        """Set the registry."""
158        cls._registry = registry

A decorator that registers itself.

registry_decorator(name: str = '')
126    def __init__(self, name: str = "") -> None:
127        self.name = name
registry_name = ''
@classmethod
def registry(cls) -> UniqueKeyDict:
120    @classmethod
121    def registry(cls) -> UniqueKeyDict:
122        if cls._registry is None:
123            cls._registry = UniqueKeyDict(cls.registry_name)
124        return cls._registry
name
@classmethod
def get_registry(cls) -> UniqueKeyDict:
150    @classmethod
151    def get_registry(cls) -> UniqueKeyDict:
152        """Get a copy of the registry"""
153        return UniqueKeyDict(cls.registry_name, **(cls._registry or {}))

Get a copy of the registry

@classmethod
def set_registry(cls, registry: UniqueKeyDict) -> None:
155    @classmethod
156    def set_registry(cls, registry: UniqueKeyDict) -> None:
157        """Set the registry."""
158        cls._registry = registry

Set the registry.

@contextmanager
def sys_path(*paths: pathlib.Path) -> Iterator[NoneType]:
161@contextmanager
162def sys_path(*paths: Path) -> t.Iterator[None]:
163    """A context manager to temporarily add a path to 'sys.path'."""
164    inserted = set()
165
166    for path in paths:
167        path_str = str(path.absolute())
168
169        if path_str not in sys.path:
170            sys.path.insert(0, path_str)
171            inserted.add(path_str)
172
173    try:
174        yield
175    finally:
176        for path_str in inserted:
177            sys.path.remove(path_str)

A context manager to temporarily add a path to 'sys.path'.

def format_exception(exception: BaseException) -> List[str]:
180def format_exception(exception: BaseException) -> t.List[str]:
181    if sys.version_info < (3, 10):
182        return traceback.format_exception(type(exception), exception, exception.__traceback__)  # type: ignore
183    return traceback.format_exception(exception)  # type: ignore
def word_characters_only(s: str, replacement_char: str = '_') -> str:
186def word_characters_only(s: str, replacement_char: str = "_") -> str:
187    """
188    Replace all non-word characters in string with the replacement character.
189    Reference SO: https://stackoverflow.com/questions/1276764/stripping-everything-but-alphanumeric-chars-from-a-string-in-python/70310018#70310018
190
191    >>> word_characters_only("Hello, world!")
192    'Hello__world_'
193    >>> word_characters_only("Hello, world! 123", '')
194    'Helloworld123'
195    """
196    return re.sub(r"\W", replacement_char, s)

Replace all non-word characters in string with the replacement character. Reference SO: https://stackoverflow.com/questions/1276764/stripping-everything-but-alphanumeric-chars-from-a-string-in-python/70310018#70310018

>>> word_characters_only("Hello, world!")
'Hello__world_'
>>> word_characters_only("Hello, world! 123", '')
'Helloworld123'
def str_to_bool(s: Optional[str]) -> bool:
199def str_to_bool(s: t.Optional[str]) -> bool:
200    """
201    Convert a string to a boolean. disutils is being deprecated and it is recommended to implement your own version:
202    https://peps.python.org/pep-0632/
203
204    Unlike disutils, this actually returns a bool and never raises. If a value cannot be determined to be true
205    then false is returned.
206    """
207    if not s:
208        return False
209    return s.lower() in ("true", "1", "t", "y", "yes", "on")

Convert a string to a boolean. disutils is being deprecated and it is recommended to implement your own version: https://peps.python.org/pep-0632/

Unlike disutils, this actually returns a bool and never raises. If a value cannot be determined to be true then false is returned.

def enable_debug_mode() -> None:
215def enable_debug_mode() -> None:
216    global _debug_mode_enabled
217    _debug_mode_enabled = True
def debug_mode_enabled() -> bool:
220def debug_mode_enabled() -> bool:
221    return _debug_mode_enabled or str_to_bool(os.environ.get("SQLMESH_DEBUG"))
def ttl_cache(ttl: int = 60, maxsize: int = 128000) -> Callable:
224def ttl_cache(ttl: int = 60, maxsize: int = 128000) -> t.Callable:
225    """Caches a function that clears whenever the current epoch / ttl seconds changes.
226
227    TTL is not exact, it is used as a salt. So by default, at every minute mark, the cache will be cleared.
228    This is done for simplicity.
229
230    Args:
231        ttl: The number of seconds to hold the cache for.
232        maxsize: The maximum size of the cache.
233    """
234
235    def decorator(func: t.Callable) -> t.Any:
236        @lru_cache(maxsize=maxsize)
237        def cache(tick: int, *args: t.Any, **kwargs: t.Any) -> t.Any:
238            return func(*args, **kwargs)
239
240        @wraps(func)
241        def wrap(*args: t.Any, **kwargs: t.Any) -> t.Any:
242            return cache(int(time.time() / ttl), *args, **kwargs)
243
244        return wrap
245
246    return decorator

Caches a function that clears whenever the current epoch / ttl seconds changes.

TTL is not exact, it is used as a salt. So by default, at every minute mark, the cache will be cleared. This is done for simplicity.

Arguments:
  • ttl: The number of seconds to hold the cache for.
  • maxsize: The maximum size of the cache.
class classproperty(builtins.property):
249class classproperty(property):
250    """
251    Similar to a normal property but works for class methods
252    """
253
254    def __get__(self, obj: t.Any, owner: t.Any = None) -> t.Any:
255        return classmethod(self.fget).__get__(None, owner)()  # type: ignore

Similar to a normal property but works for class methods

Inherited Members
builtins.property
property
getter
setter
deleter
fget
fset
fdel
@contextmanager
def env_vars(environ: dict[str, str]) -> Iterator[NoneType]:
258@contextmanager
259def env_vars(environ: dict[str, str]) -> t.Iterator[None]:
260    """A context manager to temporarily modify environment variables."""
261    old_environ = os.environ.copy()
262    os.environ.update(environ)
263
264    try:
265        yield
266    finally:
267        os.environ.clear()
268        os.environ.update(old_environ)

A context manager to temporarily modify environment variables.

def merge_dicts(*args: Dict) -> Dict:
271def merge_dicts(*args: t.Dict) -> t.Dict:
272    """
273    Merges dicts. Just does key collision replacement
274    """
275
276    def merge(a: t.Dict, b: t.Dict) -> t.Dict:
277        for b_key, b_value in b.items():
278            a_value = a.get(b_key)
279            if isinstance(a_value, dict) and isinstance(b_value, dict):
280                merge(a_value, b_value)
281            elif isinstance(b_value, dict):
282                a[b_key] = copy.deepcopy(b_value)
283            else:
284                a[b_key] = b_value
285        return a
286
287    return reduce(merge, args, {})

Merges dicts. Just does key collision replacement

def sqlglot_dialects() -> str:
290def sqlglot_dialects() -> str:
291    return "'" + "', '".join(Dialects.__members__.values()) + "'"
NON_ALNUM = re.compile('[^a-zA-Z0-9_]')
NON_ALUM_INCLUDE_UNICODE = re.compile('\\W')
def sanitize_name(name: str, *, include_unicode: bool = False) -> str:
299def sanitize_name(name: str, *, include_unicode: bool = False) -> str:
300    if include_unicode:
301        s = unicodedata.normalize("NFC", name)
302        s = NON_ALUM_INCLUDE_UNICODE.sub("_", s)
303        return s
304    return NON_ALNUM.sub("_", name)
def groupby( items: Iterable[~ITEM], func: Callable[[~ITEM], ~GROUP]) -> DefaultDict[~GROUP, List[~ITEM]]:
307def groupby(
308    items: t.Iterable[ITEM],
309    func: t.Callable[[ITEM], GROUP],
310) -> t.DefaultDict[GROUP, t.List[ITEM]]:
311    grouped = defaultdict(list)
312    for item in items:
313        grouped[func(item)].append(item)
314    return grouped
def columns_to_types_to_struct( columns_to_types: Union[Dict[str, sqlglot.expressions.datatypes.DataType], Dict[str, str]]) -> sqlglot.expressions.datatypes.DataType:
317def columns_to_types_to_struct(
318    columns_to_types: t.Union[t.Dict[str, exp.DataType], t.Dict[str, str]],
319) -> exp.DataType:
320    """
321    Converts a dict of column names to types to a struct.
322    """
323    return exp.DataType(
324        this=exp.DataType.Type.STRUCT,
325        expressions=[
326            exp.ColumnDef(this=exp.to_identifier(k), kind=v) for k, v in columns_to_types.items()
327        ],
328        nested=True,
329    )

Converts a dict of column names to types to a struct.

def type_is_known( d_type: Union[sqlglot.expressions.datatypes.DataType, sqlglot.expressions.query.ColumnDef]) -> bool:
332def type_is_known(d_type: t.Union[exp.DataType, exp.ColumnDef]) -> bool:
333    """Checks that a given column type is known and not NULL."""
334    if isinstance(d_type, exp.ColumnDef):
335        if not d_type.kind:
336            return False
337        d_type = d_type.kind
338    if isinstance(d_type, exp.DataTypeParam):
339        return True
340    if d_type.is_type(exp.DataType.Type.UNKNOWN, exp.DataType.Type.NULL):
341        return False
342    if d_type.expressions:
343        return all(type_is_known(expression) for expression in d_type.expressions)
344    return True

Checks that a given column type is known and not NULL.

def columns_to_types_all_known( columns_to_types: Dict[str, sqlglot.expressions.datatypes.DataType]) -> bool:
347def columns_to_types_all_known(columns_to_types: t.Dict[str, exp.DataType]) -> bool:
348    """Checks that all column types are known and not NULL."""
349    return all(type_is_known(expression) for expression in columns_to_types.values())

Checks that all column types are known and not NULL.

class Verbosity(enum.IntEnum):
352class Verbosity(IntEnum):
353    """Verbosity levels for SQLMesh output."""
354
355    DEFAULT = 0
356    VERBOSE = 1
357    VERY_VERBOSE = 2
358
359    @property
360    def is_default(self) -> bool:
361        return self == Verbosity.DEFAULT
362
363    @property
364    def is_verbose(self) -> bool:
365        return self == Verbosity.VERBOSE
366
367    @property
368    def is_very_verbose(self) -> bool:
369        return self == Verbosity.VERY_VERBOSE

Verbosity levels for SQLMesh output.

DEFAULT = <Verbosity.DEFAULT: 0>
VERBOSE = <Verbosity.VERBOSE: 1>
VERY_VERBOSE = <Verbosity.VERY_VERBOSE: 2>
is_default: bool
359    @property
360    def is_default(self) -> bool:
361        return self == Verbosity.DEFAULT
is_verbose: bool
363    @property
364    def is_verbose(self) -> bool:
365        return self == Verbosity.VERBOSE
is_very_verbose: bool
367    @property
368    def is_very_verbose(self) -> bool:
369        return self == Verbosity.VERY_VERBOSE
Inherited Members
enum.Enum
name
value
builtins.int
conjugate
bit_length
bit_count
to_bytes
from_bytes
as_integer_ratio
real
imag
numerator
denominator
class CompletionStatus(enum.Enum):
372class CompletionStatus(Enum):
373    SUCCESS = "success"
374    FAILURE = "failure"
375    NOTHING_TO_DO = "nothing_to_do"
376
377    @property
378    def is_success(self) -> bool:
379        return self == CompletionStatus.SUCCESS
380
381    @property
382    def is_failure(self) -> bool:
383        return self == CompletionStatus.FAILURE
384
385    @property
386    def is_nothing_to_do(self) -> bool:
387        return self == CompletionStatus.NOTHING_TO_DO

An enumeration.

SUCCESS = <CompletionStatus.SUCCESS: 'success'>
FAILURE = <CompletionStatus.FAILURE: 'failure'>
NOTHING_TO_DO = <CompletionStatus.NOTHING_TO_DO: 'nothing_to_do'>
is_success: bool
377    @property
378    def is_success(self) -> bool:
379        return self == CompletionStatus.SUCCESS
is_failure: bool
381    @property
382    def is_failure(self) -> bool:
383        return self == CompletionStatus.FAILURE
is_nothing_to_do: bool
385    @property
386    def is_nothing_to_do(self) -> bool:
387        return self == CompletionStatus.NOTHING_TO_DO
Inherited Members
enum.Enum
name
value
def to_snake_case(name: str) -> str:
390def to_snake_case(name: str) -> str:
391    return "".join(
392        f"_{c.lower()}" if c.isupper() and idx != 0 else c.lower() for idx, c in enumerate(name)
393    )
class JobType(enum.Enum):
396class JobType(Enum):
397    PLAN = "SQLMESH_PLAN"
398    RUN = "SQLMESH_RUN"

An enumeration.

PLAN = <JobType.PLAN: 'SQLMESH_PLAN'>
RUN = <JobType.RUN: 'SQLMESH_RUN'>
Inherited Members
enum.Enum
name
value
@dataclass(frozen=True)
class CorrelationId:
401@dataclass(frozen=True)
402class CorrelationId:
403    """ID that is added to each query in order to identify the job that created it."""
404
405    job_type: JobType
406    job_id: str
407
408    def __str__(self) -> str:
409        return f"{self.job_type.value}: {self.job_id}"
410
411    @classmethod
412    def from_plan_id(cls, plan_id: str) -> CorrelationId:
413        return CorrelationId(JobType.PLAN, plan_id)

ID that is added to each query in order to identify the job that created it.

CorrelationId(job_type: JobType, job_id: str)
job_type: JobType
job_id: str
@classmethod
def from_plan_id(cls, plan_id: str) -> CorrelationId:
411    @classmethod
412    def from_plan_id(cls, plan_id: str) -> CorrelationId:
413        return CorrelationId(JobType.PLAN, plan_id)
def get_source_columns_to_types( columns_to_types: Dict[str, sqlglot.expressions.datatypes.DataType], source_columns: Optional[List[str]]) -> Dict[str, sqlglot.expressions.datatypes.DataType]:
416def get_source_columns_to_types(
417    columns_to_types: t.Dict[str, exp.DataType],
418    source_columns: t.Optional[t.List[str]],
419) -> t.Dict[str, exp.DataType]:
420    source_column_lookup = set(source_columns) if source_columns else None
421    return {
422        k: v
423        for k, v in columns_to_types.items()
424        if not source_column_lookup or k in source_column_lookup
425    }