sqlmesh.utils
1from __future__ import annotations 2 3import copy 4import importlib 5import logging 6import os 7import random 8import re 9import string 10import sys 11import time 12import traceback 13import types 14import typing as t 15import uuid 16from collections import defaultdict 17from contextlib import contextmanager 18from copy import deepcopy 19from functools import lru_cache, reduce, wraps 20from pathlib import Path 21 22from sqlglot import exp 23from sqlglot.dialects.dialect import Dialects 24 25logger = logging.getLogger(__name__) 26 27T = t.TypeVar("T") 28KEY = t.TypeVar("KEY", bound=t.Hashable) 29VALUE = t.TypeVar("VALUE") 30ITEM = t.TypeVar("ITEM") 31GROUP = t.TypeVar("GROUP") 32DECORATOR_RETURN_TYPE = t.TypeVar("DECORATOR_RETURN_TYPE") 33 34ALPHANUMERIC = string.ascii_lowercase + string.digits 35 36 37def optional_import(name: str) -> t.Optional[types.ModuleType]: 38 """Optionally import a module. 39 40 Args: 41 name: The name of the module to import. 42 Returns: 43 The module if it is installed. 44 """ 45 try: 46 module = importlib.import_module(name) 47 except ImportError: 48 return None 49 return module 50 51 52def major_minor(version: str) -> t.Tuple[int, int]: 53 """Returns a tuple of just the major.minor for a version string (major.minor.patch).""" 54 return t.cast(t.Tuple[int, int], tuple(int(part) for part in version.split(".")[0:2])) 55 56 57def unique(iterable: t.Iterable[T], by: t.Callable[[T], t.Any] = lambda i: i) -> t.List[T]: 58 return list({by(i): None for i in iterable}) 59 60 61def random_id(short: bool = False) -> str: 62 if short: 63 return "".join(random.choices(ALPHANUMERIC, k=8)) 64 65 return uuid.uuid4().hex 66 67 68class UniqueKeyDict(dict, t.Mapping[KEY, VALUE]): 69 """Dict that raises when a duplicate key is set.""" 70 71 def __init__(self, name: str, *args: t.Dict[KEY, VALUE], **kwargs: VALUE) -> None: 72 self.name = name 73 super().__init__(*args, **kwargs) 74 75 def __setitem__(self, k: KEY, v: VALUE) -> None: 76 if k in self: 77 raise ValueError( 78 f"Duplicate key '{k}' found in UniqueKeyDict<{self.name}>. Call dict.update(...) if this is intentional." 79 ) 80 super().__setitem__(k, v) 81 82 83class AttributeDict(dict, t.Mapping[KEY, VALUE]): 84 __getattr__ = dict.get 85 86 def set(self, field: str, value: t.Any) -> str: 87 self[field] = value 88 # Return an empty string, so that this method can be used within Jinja 89 return "" 90 91 def __deepcopy__(self, memo: t.Dict[t.Any, AttributeDict]) -> AttributeDict: 92 copy: AttributeDict = AttributeDict() 93 memo[id(self)] = copy 94 for k, v in self.items(): 95 copy[k] = deepcopy(v, memo) 96 return copy 97 98 def __call__(self, **kwargs: t.Dict[str, t.Any]) -> str: 99 self.update(**kwargs) 100 # Return an empty string, so that this method can be used within Jinja 101 return "" 102 103 104class registry_decorator: 105 """A decorator that registers itself.""" 106 107 registry_name = "" 108 _registry: t.Optional[UniqueKeyDict] = None 109 110 @classmethod 111 def registry(cls) -> UniqueKeyDict: 112 if cls._registry is None: 113 cls._registry = UniqueKeyDict(cls.registry_name) 114 return cls._registry 115 116 def __init__(self, name: str = "") -> None: 117 self.name = name 118 119 def __call__( 120 self, func: t.Callable[..., DECORATOR_RETURN_TYPE] 121 ) -> t.Callable[..., DECORATOR_RETURN_TYPE]: 122 self.func = func 123 124 registry = self.registry() 125 func_name = (self.name or func.__name__).lower() 126 127 try: 128 registry[func_name] = self 129 except ValueError: 130 # No need to raise due to duplicate key if the functions are identical 131 if func.__code__.co_code != registry[func_name].func.__code__.co_code: 132 raise 133 134 @wraps(func) 135 def wrapper(*args: t.Any, **kwargs: t.Any) -> DECORATOR_RETURN_TYPE: 136 return func(*args, **kwargs) 137 138 return wrapper 139 140 @classmethod 141 def get_registry(cls) -> UniqueKeyDict: 142 """Get a copy of the registry""" 143 return UniqueKeyDict(cls.registry_name, **(cls._registry or {})) 144 145 @classmethod 146 def set_registry(cls, registry: UniqueKeyDict) -> None: 147 """Set the registry.""" 148 cls._registry = registry 149 150 151@contextmanager 152def sys_path(*paths: Path) -> t.Iterator[None]: 153 """A context manager to temporarily add a path to 'sys.path'.""" 154 inserted = set() 155 156 for path in paths: 157 path_str = str(path.absolute()) 158 159 if path_str not in sys.path: 160 sys.path.insert(0, path_str) 161 inserted.add(path_str) 162 163 try: 164 yield 165 finally: 166 for path_str in inserted: 167 sys.path.remove(path_str) 168 169 170def format_exception(exception: BaseException) -> t.List[str]: 171 if sys.version_info < (3, 10): 172 return traceback.format_exception( 173 type(exception), exception, exception.__traceback__ 174 ) # type: ignore 175 else: 176 return traceback.format_exception(exception) # type: ignore 177 178 179def word_characters_only(s: str, replacement_char: str = "_") -> str: 180 """ 181 Replace all non-word characters in string with the replacement character. 182 Reference SO: https://stackoverflow.com/questions/1276764/stripping-everything-but-alphanumeric-chars-from-a-string-in-python/70310018#70310018 183 184 >>> word_characters_only("Hello, world!") 185 'Hello__world_' 186 >>> word_characters_only("Hello, world! 123", '') 187 'Helloworld123' 188 """ 189 return re.sub(r"\W", replacement_char, s) 190 191 192def str_to_bool(s: t.Optional[str]) -> bool: 193 """ 194 Convert a string to a boolean. disutils is being deprecated and it is recommended to implement your own version: 195 https://peps.python.org/pep-0632/ 196 197 Unlike disutils, this actually returns a bool and never raises. If a value cannot be determined to be true 198 then false is returned. 199 """ 200 if not s: 201 return False 202 return s.lower() in ("true", "1", "t", "y", "yes", "on") 203 204 205_debug_mode_enabled: bool = False 206 207 208def enable_debug_mode() -> None: 209 global _debug_mode_enabled 210 _debug_mode_enabled = True 211 212 213def debug_mode_enabled() -> bool: 214 return _debug_mode_enabled or str_to_bool(os.environ.get("SQLMESH_DEBUG")) 215 216 217def ttl_cache(ttl: int = 60, maxsize: int = 128000) -> t.Callable: 218 """Caches a function that clears whenever the current epoch / ttl seconds changes. 219 220 TTL is not exact, it is used as a salt. So by default, at every minute mark, the cache will be cleared. 221 This is done for simplicity. 222 223 Args: 224 ttl: The number of seconds to hold the cache for. 225 maxsize: The maximum size of the cache. 226 """ 227 228 def decorator(func: t.Callable) -> t.Any: 229 @lru_cache(maxsize=maxsize) 230 def cache(tick: int, *args: t.Any, **kwargs: t.Any) -> t.Any: 231 return func(*args, **kwargs) 232 233 @wraps(func) 234 def wrap(*args: t.Any, **kwargs: t.Any) -> t.Any: 235 return cache(int(time.time() / ttl), *args, **kwargs) 236 237 return wrap 238 239 return decorator 240 241 242class classproperty(property): 243 """ 244 Similar to a normal property but works for class methods 245 """ 246 247 def __get__(self, obj: t.Any, owner: t.Any = None) -> t.Any: 248 return classmethod(self.fget).__get__(None, owner)() # type: ignore 249 250 251@contextmanager 252def env_vars(environ: dict[str, str]) -> t.Iterator[None]: 253 """A context manager to temporarily modify environment variables.""" 254 old_environ = os.environ.copy() 255 os.environ.update(environ) 256 257 try: 258 yield 259 finally: 260 os.environ.clear() 261 os.environ.update(old_environ) 262 263 264def merge_dicts(*args: t.Dict) -> t.Dict: 265 """ 266 Merges dicts. Just does key collision replacement 267 """ 268 269 def merge(a: t.Dict, b: t.Dict) -> t.Dict: 270 for b_key, b_value in b.items(): 271 a_value = a.get(b_key) 272 if isinstance(a_value, dict) and isinstance(b_value, dict): 273 merge(a_value, b_value) 274 elif isinstance(b_value, dict): 275 a[b_key] = copy.deepcopy(b_value) 276 else: 277 a[b_key] = b_value 278 return a 279 280 return reduce(merge, args, {}) 281 282 283def sqlglot_dialects() -> str: 284 return "'" + "', '".join(Dialects.__members__.values()) + "'" 285 286 287NON_ALNUM = re.compile(r"[^a-zA-Z0-9_]") 288 289 290def sanitize_name(name: str) -> str: 291 return NON_ALNUM.sub("_", name) 292 293 294def groupby( 295 items: t.Iterable[ITEM], 296 func: t.Callable[[ITEM], GROUP], 297) -> t.DefaultDict[GROUP, t.List[ITEM]]: 298 grouped = defaultdict(list) 299 for item in items: 300 grouped[func(item)].append(item) 301 return grouped 302 303 304def columns_to_types_to_struct( 305 columns_to_types: t.Union[t.Dict[str, exp.DataType], t.Dict[str, str]] 306) -> exp.DataType: 307 """ 308 Converts a dict of column names to types to a struct. 309 """ 310 return exp.DataType( 311 this=exp.DataType.Type.STRUCT, 312 expressions=[ 313 exp.ColumnDef(this=exp.to_identifier(k), kind=v) for k, v in columns_to_types.items() 314 ], 315 nested=True, 316 ) 317 318 319def type_is_known(d_type: t.Union[exp.DataType, exp.ColumnDef]) -> bool: 320 """Checks that a given column type is known and not NULL.""" 321 if isinstance(d_type, exp.ColumnDef): 322 if not d_type.kind: 323 return False 324 d_type = d_type.kind 325 if isinstance(d_type, exp.DataTypeParam): 326 return True 327 if d_type.is_type(exp.DataType.Type.UNKNOWN, exp.DataType.Type.NULL): 328 return False 329 if d_type.expressions: 330 return all(type_is_known(expression) for expression in d_type.expressions) 331 return True 332 333 334def columns_to_types_all_known(columns_to_types: t.Dict[str, exp.DataType]) -> bool: 335 """Checks that all column types are known and not NULL.""" 336 return all(type_is_known(expression) for expression in columns_to_types.values())
38def optional_import(name: str) -> t.Optional[types.ModuleType]: 39 """Optionally import a module. 40 41 Args: 42 name: The name of the module to import. 43 Returns: 44 The module if it is installed. 45 """ 46 try: 47 module = importlib.import_module(name) 48 except ImportError: 49 return None 50 return module
Optionally import a module.
Arguments:
- name: The name of the module to import.
Returns:
The module if it is installed.
53def major_minor(version: str) -> t.Tuple[int, int]: 54 """Returns a tuple of just the major.minor for a version string (major.minor.patch).""" 55 return t.cast(t.Tuple[int, int], tuple(int(part) for part in version.split(".")[0:2]))
Returns a tuple of just the major.minor for a version string (major.minor.patch).
69class UniqueKeyDict(dict, t.Mapping[KEY, VALUE]): 70 """Dict that raises when a duplicate key is set.""" 71 72 def __init__(self, name: str, *args: t.Dict[KEY, VALUE], **kwargs: VALUE) -> None: 73 self.name = name 74 super().__init__(*args, **kwargs) 75 76 def __setitem__(self, k: KEY, v: VALUE) -> None: 77 if k in self: 78 raise ValueError( 79 f"Duplicate key '{k}' found in UniqueKeyDict<{self.name}>. Call dict.update(...) if this is intentional." 80 ) 81 super().__setitem__(k, v)
Dict that raises when a duplicate key is set.
Inherited Members
- builtins.dict
- get
- setdefault
- pop
- popitem
- keys
- items
- values
- update
- fromkeys
- clear
- copy
84class AttributeDict(dict, t.Mapping[KEY, VALUE]): 85 __getattr__ = dict.get 86 87 def set(self, field: str, value: t.Any) -> str: 88 self[field] = value 89 # Return an empty string, so that this method can be used within Jinja 90 return "" 91 92 def __deepcopy__(self, memo: t.Dict[t.Any, AttributeDict]) -> AttributeDict: 93 copy: AttributeDict = AttributeDict() 94 memo[id(self)] = copy 95 for k, v in self.items(): 96 copy[k] = deepcopy(v, memo) 97 return copy 98 99 def __call__(self, **kwargs: t.Dict[str, t.Any]) -> str: 100 self.update(**kwargs) 101 # Return an empty string, so that this method can be used within Jinja 102 return ""
Inherited Members
- builtins.dict
- get
- setdefault
- pop
- popitem
- keys
- items
- values
- update
- fromkeys
- clear
- copy
105class registry_decorator: 106 """A decorator that registers itself.""" 107 108 registry_name = "" 109 _registry: t.Optional[UniqueKeyDict] = None 110 111 @classmethod 112 def registry(cls) -> UniqueKeyDict: 113 if cls._registry is None: 114 cls._registry = UniqueKeyDict(cls.registry_name) 115 return cls._registry 116 117 def __init__(self, name: str = "") -> None: 118 self.name = name 119 120 def __call__( 121 self, func: t.Callable[..., DECORATOR_RETURN_TYPE] 122 ) -> t.Callable[..., DECORATOR_RETURN_TYPE]: 123 self.func = func 124 125 registry = self.registry() 126 func_name = (self.name or func.__name__).lower() 127 128 try: 129 registry[func_name] = self 130 except ValueError: 131 # No need to raise due to duplicate key if the functions are identical 132 if func.__code__.co_code != registry[func_name].func.__code__.co_code: 133 raise 134 135 @wraps(func) 136 def wrapper(*args: t.Any, **kwargs: t.Any) -> DECORATOR_RETURN_TYPE: 137 return func(*args, **kwargs) 138 139 return wrapper 140 141 @classmethod 142 def get_registry(cls) -> UniqueKeyDict: 143 """Get a copy of the registry""" 144 return UniqueKeyDict(cls.registry_name, **(cls._registry or {})) 145 146 @classmethod 147 def set_registry(cls, registry: UniqueKeyDict) -> None: 148 """Set the registry.""" 149 cls._registry = registry
A decorator that registers itself.
141 @classmethod 142 def get_registry(cls) -> UniqueKeyDict: 143 """Get a copy of the registry""" 144 return UniqueKeyDict(cls.registry_name, **(cls._registry or {}))
Get a copy of the registry
152@contextmanager 153def sys_path(*paths: Path) -> t.Iterator[None]: 154 """A context manager to temporarily add a path to 'sys.path'.""" 155 inserted = set() 156 157 for path in paths: 158 path_str = str(path.absolute()) 159 160 if path_str not in sys.path: 161 sys.path.insert(0, path_str) 162 inserted.add(path_str) 163 164 try: 165 yield 166 finally: 167 for path_str in inserted: 168 sys.path.remove(path_str)
A context manager to temporarily add a path to 'sys.path'.
180def word_characters_only(s: str, replacement_char: str = "_") -> str: 181 """ 182 Replace all non-word characters in string with the replacement character. 183 Reference SO: https://stackoverflow.com/questions/1276764/stripping-everything-but-alphanumeric-chars-from-a-string-in-python/70310018#70310018 184 185 >>> word_characters_only("Hello, world!") 186 'Hello__world_' 187 >>> word_characters_only("Hello, world! 123", '') 188 'Helloworld123' 189 """ 190 return re.sub(r"\W", replacement_char, s)
Replace all non-word characters in string with the replacement character. Reference SO: https://stackoverflow.com/questions/1276764/stripping-everything-but-alphanumeric-chars-from-a-string-in-python/70310018#70310018
>>> word_characters_only("Hello, world!")
'Hello__world_'
>>> word_characters_only("Hello, world! 123", '')
'Helloworld123'
193def str_to_bool(s: t.Optional[str]) -> bool: 194 """ 195 Convert a string to a boolean. disutils is being deprecated and it is recommended to implement your own version: 196 https://peps.python.org/pep-0632/ 197 198 Unlike disutils, this actually returns a bool and never raises. If a value cannot be determined to be true 199 then false is returned. 200 """ 201 if not s: 202 return False 203 return s.lower() in ("true", "1", "t", "y", "yes", "on")
Convert a string to a boolean. disutils is being deprecated and it is recommended to implement your own version: https://peps.python.org/pep-0632/
Unlike disutils, this actually returns a bool and never raises. If a value cannot be determined to be true then false is returned.
218def ttl_cache(ttl: int = 60, maxsize: int = 128000) -> t.Callable: 219 """Caches a function that clears whenever the current epoch / ttl seconds changes. 220 221 TTL is not exact, it is used as a salt. So by default, at every minute mark, the cache will be cleared. 222 This is done for simplicity. 223 224 Args: 225 ttl: The number of seconds to hold the cache for. 226 maxsize: The maximum size of the cache. 227 """ 228 229 def decorator(func: t.Callable) -> t.Any: 230 @lru_cache(maxsize=maxsize) 231 def cache(tick: int, *args: t.Any, **kwargs: t.Any) -> t.Any: 232 return func(*args, **kwargs) 233 234 @wraps(func) 235 def wrap(*args: t.Any, **kwargs: t.Any) -> t.Any: 236 return cache(int(time.time() / ttl), *args, **kwargs) 237 238 return wrap 239 240 return decorator
Caches a function that clears whenever the current epoch / ttl seconds changes.
TTL is not exact, it is used as a salt. So by default, at every minute mark, the cache will be cleared. This is done for simplicity.
Arguments:
- ttl: The number of seconds to hold the cache for.
- maxsize: The maximum size of the cache.
243class classproperty(property): 244 """ 245 Similar to a normal property but works for class methods 246 """ 247 248 def __get__(self, obj: t.Any, owner: t.Any = None) -> t.Any: 249 return classmethod(self.fget).__get__(None, owner)() # type: ignore
Similar to a normal property but works for class methods
Inherited Members
- builtins.property
- property
- getter
- setter
- deleter
252@contextmanager 253def env_vars(environ: dict[str, str]) -> t.Iterator[None]: 254 """A context manager to temporarily modify environment variables.""" 255 old_environ = os.environ.copy() 256 os.environ.update(environ) 257 258 try: 259 yield 260 finally: 261 os.environ.clear() 262 os.environ.update(old_environ)
A context manager to temporarily modify environment variables.
265def merge_dicts(*args: t.Dict) -> t.Dict: 266 """ 267 Merges dicts. Just does key collision replacement 268 """ 269 270 def merge(a: t.Dict, b: t.Dict) -> t.Dict: 271 for b_key, b_value in b.items(): 272 a_value = a.get(b_key) 273 if isinstance(a_value, dict) and isinstance(b_value, dict): 274 merge(a_value, b_value) 275 elif isinstance(b_value, dict): 276 a[b_key] = copy.deepcopy(b_value) 277 else: 278 a[b_key] = b_value 279 return a 280 281 return reduce(merge, args, {})
Merges dicts. Just does key collision replacement
305def columns_to_types_to_struct( 306 columns_to_types: t.Union[t.Dict[str, exp.DataType], t.Dict[str, str]] 307) -> exp.DataType: 308 """ 309 Converts a dict of column names to types to a struct. 310 """ 311 return exp.DataType( 312 this=exp.DataType.Type.STRUCT, 313 expressions=[ 314 exp.ColumnDef(this=exp.to_identifier(k), kind=v) for k, v in columns_to_types.items() 315 ], 316 nested=True, 317 )
Converts a dict of column names to types to a struct.
320def type_is_known(d_type: t.Union[exp.DataType, exp.ColumnDef]) -> bool: 321 """Checks that a given column type is known and not NULL.""" 322 if isinstance(d_type, exp.ColumnDef): 323 if not d_type.kind: 324 return False 325 d_type = d_type.kind 326 if isinstance(d_type, exp.DataTypeParam): 327 return True 328 if d_type.is_type(exp.DataType.Type.UNKNOWN, exp.DataType.Type.NULL): 329 return False 330 if d_type.expressions: 331 return all(type_is_known(expression) for expression in d_type.expressions) 332 return True
Checks that a given column type is known and not NULL.
335def columns_to_types_all_known(columns_to_types: t.Dict[str, exp.DataType]) -> bool: 336 """Checks that all column types are known and not NULL.""" 337 return all(type_is_known(expression) for expression in columns_to_types.values())
Checks that all column types are known and not NULL.