sqlmesh.utils
1from __future__ import annotations 2 3import copy 4import importlib 5import logging 6import os 7import random 8import re 9import string 10import sys 11import time 12import traceback 13import types 14import typing as t 15import uuid 16from dataclasses import dataclass 17from collections import defaultdict 18from contextlib import contextmanager 19from copy import deepcopy 20from enum import IntEnum, Enum 21from functools import lru_cache, reduce, wraps 22from pathlib import Path 23 24import unicodedata 25from sqlglot import exp 26from sqlglot.dialects.dialect import Dialects 27 28logger = logging.getLogger(__name__) 29 30T = t.TypeVar("T") 31KEY = t.TypeVar("KEY", bound=t.Hashable) 32VALUE = t.TypeVar("VALUE") 33ITEM = t.TypeVar("ITEM") 34GROUP = t.TypeVar("GROUP") 35DECORATOR_RETURN_TYPE = t.TypeVar("DECORATOR_RETURN_TYPE") 36 37ALPHANUMERIC = string.ascii_lowercase + string.digits 38 39 40def optional_import(name: str) -> t.Optional[types.ModuleType]: 41 """Optionally import a module. 42 43 Args: 44 name: The name of the module to import. 45 Returns: 46 The module if it is installed. 47 """ 48 try: 49 module = importlib.import_module(name) 50 except ImportError: 51 return None 52 return module 53 54 55def major_minor(version: str) -> t.Tuple[int, int]: 56 """Returns a tuple of just the major.minor for a version string (major.minor.patch).""" 57 return t.cast(t.Tuple[int, int], tuple(int(part) for part in version.split(".")[0:2])) 58 59 60def unique(iterable: t.Iterable[T], by: t.Callable[[T], t.Any] = lambda i: i) -> t.List[T]: 61 return list({by(i): None for i in iterable}) 62 63 64def random_id(short: bool = False) -> str: 65 if short: 66 return "".join(random.choices(ALPHANUMERIC, k=8)) 67 68 return uuid.uuid4().hex 69 70 71class UniqueKeyDict(t.Dict[KEY, VALUE]): 72 """Dict that raises when a duplicate key is set.""" 73 74 def __init__(self, name: str, *args: t.Dict[KEY, VALUE], **kwargs: VALUE) -> None: 75 self.name = name 76 super().__init__(*args, **kwargs) 77 78 def __setitem__(self, k: KEY, v: VALUE) -> None: 79 if k in self: 80 raise ValueError( 81 f"Duplicate key '{k}' found in UniqueKeyDict<{self.name}>. Call dict.update(...) if this is intentional." 82 ) 83 super().__setitem__(k, v) 84 85 86class AttributeDict(dict, t.Mapping[KEY, VALUE]): 87 def __getattr__(self, key: t.Any) -> t.Optional[VALUE]: 88 if key.startswith("__") and not hasattr(self, key): 89 raise AttributeError 90 return self.get(key) 91 92 def set(self, field: str, value: t.Any) -> str: 93 self[field] = value 94 # Return an empty string, so that this method can be used within Jinja 95 return "" 96 97 def __deepcopy__(self, memo: t.Dict[t.Any, AttributeDict]) -> AttributeDict: 98 copy: AttributeDict = AttributeDict() 99 memo[id(self)] = copy 100 for k, v in self.items(): 101 copy[k] = deepcopy(v, memo) 102 return copy 103 104 def __call__(self, **kwargs: t.Dict[str, t.Any]) -> str: 105 self.update(**kwargs) 106 # Return an empty string, so that this method can be used within Jinja 107 return "" 108 109 def __getstate__(self) -> t.Optional[t.Dict[t.Any, t.Any]]: 110 return None 111 112 113class registry_decorator: 114 """A decorator that registers itself.""" 115 116 registry_name = "" 117 _registry: t.Optional[UniqueKeyDict] = None 118 119 @classmethod 120 def registry(cls) -> UniqueKeyDict: 121 if cls._registry is None: 122 cls._registry = UniqueKeyDict(cls.registry_name) 123 return cls._registry 124 125 def __init__(self, name: str = "") -> None: 126 self.name = name 127 128 def __call__( 129 self, func: t.Callable[..., DECORATOR_RETURN_TYPE] 130 ) -> t.Callable[..., DECORATOR_RETURN_TYPE]: 131 self.func = func 132 133 registry = self.registry() 134 func_name = (self.name or func.__name__).lower() 135 136 try: 137 registry[func_name] = self 138 except ValueError: 139 # No need to raise due to duplicate key if the functions are identical 140 if func.__code__.co_code != registry[func_name].func.__code__.co_code: 141 raise ValueError(f"Duplicate name: '{func_name}'.") 142 143 @wraps(func) 144 def wrapper(*args: t.Any, **kwargs: t.Any) -> DECORATOR_RETURN_TYPE: 145 return func(*args, **kwargs) 146 147 return wrapper 148 149 @classmethod 150 def get_registry(cls) -> UniqueKeyDict: 151 """Get a copy of the registry""" 152 return UniqueKeyDict(cls.registry_name, **(cls._registry or {})) 153 154 @classmethod 155 def set_registry(cls, registry: UniqueKeyDict) -> None: 156 """Set the registry.""" 157 cls._registry = registry 158 159 160@contextmanager 161def sys_path(*paths: Path) -> t.Iterator[None]: 162 """A context manager to temporarily add a path to 'sys.path'.""" 163 inserted = set() 164 165 for path in paths: 166 path_str = str(path.absolute()) 167 168 if path_str not in sys.path: 169 sys.path.insert(0, path_str) 170 inserted.add(path_str) 171 172 try: 173 yield 174 finally: 175 for path_str in inserted: 176 sys.path.remove(path_str) 177 178 179def format_exception(exception: BaseException) -> t.List[str]: 180 if sys.version_info < (3, 10): 181 return traceback.format_exception(type(exception), exception, exception.__traceback__) # type: ignore 182 return traceback.format_exception(exception) # type: ignore 183 184 185def word_characters_only(s: str, replacement_char: str = "_") -> str: 186 """ 187 Replace all non-word characters in string with the replacement character. 188 Reference SO: https://stackoverflow.com/questions/1276764/stripping-everything-but-alphanumeric-chars-from-a-string-in-python/70310018#70310018 189 190 >>> word_characters_only("Hello, world!") 191 'Hello__world_' 192 >>> word_characters_only("Hello, world! 123", '') 193 'Helloworld123' 194 """ 195 return re.sub(r"\W", replacement_char, s) 196 197 198def str_to_bool(s: t.Optional[str]) -> bool: 199 """ 200 Convert a string to a boolean. disutils is being deprecated and it is recommended to implement your own version: 201 https://peps.python.org/pep-0632/ 202 203 Unlike disutils, this actually returns a bool and never raises. If a value cannot be determined to be true 204 then false is returned. 205 """ 206 if not s: 207 return False 208 return s.lower() in ("true", "1", "t", "y", "yes", "on") 209 210 211_debug_mode_enabled: bool = False 212 213 214def enable_debug_mode() -> None: 215 global _debug_mode_enabled 216 _debug_mode_enabled = True 217 218 219def debug_mode_enabled() -> bool: 220 return _debug_mode_enabled or str_to_bool(os.environ.get("SQLMESH_DEBUG")) 221 222 223def ttl_cache(ttl: int = 60, maxsize: int = 128000) -> t.Callable: 224 """Caches a function that clears whenever the current epoch / ttl seconds changes. 225 226 TTL is not exact, it is used as a salt. So by default, at every minute mark, the cache will be cleared. 227 This is done for simplicity. 228 229 Args: 230 ttl: The number of seconds to hold the cache for. 231 maxsize: The maximum size of the cache. 232 """ 233 234 def decorator(func: t.Callable) -> t.Any: 235 @lru_cache(maxsize=maxsize) 236 def cache(tick: int, *args: t.Any, **kwargs: t.Any) -> t.Any: 237 return func(*args, **kwargs) 238 239 @wraps(func) 240 def wrap(*args: t.Any, **kwargs: t.Any) -> t.Any: 241 return cache(int(time.time() / ttl), *args, **kwargs) 242 243 return wrap 244 245 return decorator 246 247 248class classproperty(property): 249 """ 250 Similar to a normal property but works for class methods 251 """ 252 253 def __get__(self, obj: t.Any, owner: t.Any = None) -> t.Any: 254 return classmethod(self.fget).__get__(None, owner)() # type: ignore 255 256 257@contextmanager 258def env_vars(environ: dict[str, str]) -> t.Iterator[None]: 259 """A context manager to temporarily modify environment variables.""" 260 old_environ = os.environ.copy() 261 os.environ.update(environ) 262 263 try: 264 yield 265 finally: 266 os.environ.clear() 267 os.environ.update(old_environ) 268 269 270def merge_dicts(*args: t.Dict) -> t.Dict: 271 """ 272 Merges dicts. Just does key collision replacement 273 """ 274 275 def merge(a: t.Dict, b: t.Dict) -> t.Dict: 276 for b_key, b_value in b.items(): 277 a_value = a.get(b_key) 278 if isinstance(a_value, dict) and isinstance(b_value, dict): 279 merge(a_value, b_value) 280 elif isinstance(b_value, dict): 281 a[b_key] = copy.deepcopy(b_value) 282 else: 283 a[b_key] = b_value 284 return a 285 286 return reduce(merge, args, {}) 287 288 289def sqlglot_dialects() -> str: 290 return "'" + "', '".join(Dialects.__members__.values()) + "'" 291 292 293NON_ALNUM = re.compile(r"[^a-zA-Z0-9_]") 294 295NON_ALUM_INCLUDE_UNICODE = re.compile(r"\W", flags=re.UNICODE) 296 297 298def sanitize_name(name: str, *, include_unicode: bool = False) -> str: 299 if include_unicode: 300 s = unicodedata.normalize("NFC", name) 301 s = NON_ALUM_INCLUDE_UNICODE.sub("_", s) 302 return s 303 return NON_ALNUM.sub("_", name) 304 305 306def groupby( 307 items: t.Iterable[ITEM], 308 func: t.Callable[[ITEM], GROUP], 309) -> t.DefaultDict[GROUP, t.List[ITEM]]: 310 grouped = defaultdict(list) 311 for item in items: 312 grouped[func(item)].append(item) 313 return grouped 314 315 316def columns_to_types_to_struct( 317 columns_to_types: t.Union[t.Dict[str, exp.DataType], t.Dict[str, str]], 318) -> exp.DataType: 319 """ 320 Converts a dict of column names to types to a struct. 321 """ 322 return exp.DataType( 323 this=exp.DataType.Type.STRUCT, 324 expressions=[ 325 exp.ColumnDef(this=exp.to_identifier(k), kind=v) for k, v in columns_to_types.items() 326 ], 327 nested=True, 328 ) 329 330 331def type_is_known(d_type: t.Union[exp.DataType, exp.ColumnDef]) -> bool: 332 """Checks that a given column type is known and not NULL.""" 333 if isinstance(d_type, exp.ColumnDef): 334 if not d_type.kind: 335 return False 336 d_type = d_type.kind 337 if isinstance(d_type, exp.DataTypeParam): 338 return True 339 if d_type.is_type(exp.DataType.Type.UNKNOWN, exp.DataType.Type.NULL): 340 return False 341 if d_type.expressions: 342 return all(type_is_known(expression) for expression in d_type.expressions) 343 return True 344 345 346def columns_to_types_all_known(columns_to_types: t.Dict[str, exp.DataType]) -> bool: 347 """Checks that all column types are known and not NULL.""" 348 return all(type_is_known(expression) for expression in columns_to_types.values()) 349 350 351class Verbosity(IntEnum): 352 """Verbosity levels for SQLMesh output.""" 353 354 DEFAULT = 0 355 VERBOSE = 1 356 VERY_VERBOSE = 2 357 358 @property 359 def is_default(self) -> bool: 360 return self == Verbosity.DEFAULT 361 362 @property 363 def is_verbose(self) -> bool: 364 return self == Verbosity.VERBOSE 365 366 @property 367 def is_very_verbose(self) -> bool: 368 return self == Verbosity.VERY_VERBOSE 369 370 371class CompletionStatus(Enum): 372 SUCCESS = "success" 373 FAILURE = "failure" 374 NOTHING_TO_DO = "nothing_to_do" 375 376 @property 377 def is_success(self) -> bool: 378 return self == CompletionStatus.SUCCESS 379 380 @property 381 def is_failure(self) -> bool: 382 return self == CompletionStatus.FAILURE 383 384 @property 385 def is_nothing_to_do(self) -> bool: 386 return self == CompletionStatus.NOTHING_TO_DO 387 388 389def to_snake_case(name: str) -> str: 390 return "".join( 391 f"_{c.lower()}" if c.isupper() and idx != 0 else c.lower() for idx, c in enumerate(name) 392 ) 393 394 395class JobType(Enum): 396 PLAN = "SQLMESH_PLAN" 397 RUN = "SQLMESH_RUN" 398 399 400@dataclass(frozen=True) 401class CorrelationId: 402 """ID that is added to each query in order to identify the job that created it.""" 403 404 job_type: JobType 405 job_id: str 406 407 def __str__(self) -> str: 408 return f"{self.job_type.value}: {self.job_id}" 409 410 @classmethod 411 def from_plan_id(cls, plan_id: str) -> CorrelationId: 412 return CorrelationId(JobType.PLAN, plan_id) 413 414 415def get_source_columns_to_types( 416 columns_to_types: t.Dict[str, exp.DataType], 417 source_columns: t.Optional[t.List[str]], 418) -> t.Dict[str, exp.DataType]: 419 source_column_lookup = set(source_columns) if source_columns else None 420 return { 421 k: v 422 for k, v in columns_to_types.items() 423 if not source_column_lookup or k in source_column_lookup 424 }
41def optional_import(name: str) -> t.Optional[types.ModuleType]: 42 """Optionally import a module. 43 44 Args: 45 name: The name of the module to import. 46 Returns: 47 The module if it is installed. 48 """ 49 try: 50 module = importlib.import_module(name) 51 except ImportError: 52 return None 53 return module
Optionally import a module.
Arguments:
- name: The name of the module to import.
Returns:
The module if it is installed.
56def major_minor(version: str) -> t.Tuple[int, int]: 57 """Returns a tuple of just the major.minor for a version string (major.minor.patch).""" 58 return t.cast(t.Tuple[int, int], tuple(int(part) for part in version.split(".")[0:2]))
Returns a tuple of just the major.minor for a version string (major.minor.patch).
72class UniqueKeyDict(t.Dict[KEY, VALUE]): 73 """Dict that raises when a duplicate key is set.""" 74 75 def __init__(self, name: str, *args: t.Dict[KEY, VALUE], **kwargs: VALUE) -> None: 76 self.name = name 77 super().__init__(*args, **kwargs) 78 79 def __setitem__(self, k: KEY, v: VALUE) -> None: 80 if k in self: 81 raise ValueError( 82 f"Duplicate key '{k}' found in UniqueKeyDict<{self.name}>. Call dict.update(...) if this is intentional." 83 ) 84 super().__setitem__(k, v)
Dict that raises when a duplicate key is set.
Inherited Members
- builtins.dict
- get
- setdefault
- pop
- popitem
- keys
- items
- values
- update
- fromkeys
- clear
- copy
87class AttributeDict(dict, t.Mapping[KEY, VALUE]): 88 def __getattr__(self, key: t.Any) -> t.Optional[VALUE]: 89 if key.startswith("__") and not hasattr(self, key): 90 raise AttributeError 91 return self.get(key) 92 93 def set(self, field: str, value: t.Any) -> str: 94 self[field] = value 95 # Return an empty string, so that this method can be used within Jinja 96 return "" 97 98 def __deepcopy__(self, memo: t.Dict[t.Any, AttributeDict]) -> AttributeDict: 99 copy: AttributeDict = AttributeDict() 100 memo[id(self)] = copy 101 for k, v in self.items(): 102 copy[k] = deepcopy(v, memo) 103 return copy 104 105 def __call__(self, **kwargs: t.Dict[str, t.Any]) -> str: 106 self.update(**kwargs) 107 # Return an empty string, so that this method can be used within Jinja 108 return "" 109 110 def __getstate__(self) -> t.Optional[t.Dict[t.Any, t.Any]]: 111 return None
Inherited Members
- builtins.dict
- get
- setdefault
- pop
- popitem
- keys
- items
- values
- update
- fromkeys
- clear
- copy
114class registry_decorator: 115 """A decorator that registers itself.""" 116 117 registry_name = "" 118 _registry: t.Optional[UniqueKeyDict] = None 119 120 @classmethod 121 def registry(cls) -> UniqueKeyDict: 122 if cls._registry is None: 123 cls._registry = UniqueKeyDict(cls.registry_name) 124 return cls._registry 125 126 def __init__(self, name: str = "") -> None: 127 self.name = name 128 129 def __call__( 130 self, func: t.Callable[..., DECORATOR_RETURN_TYPE] 131 ) -> t.Callable[..., DECORATOR_RETURN_TYPE]: 132 self.func = func 133 134 registry = self.registry() 135 func_name = (self.name or func.__name__).lower() 136 137 try: 138 registry[func_name] = self 139 except ValueError: 140 # No need to raise due to duplicate key if the functions are identical 141 if func.__code__.co_code != registry[func_name].func.__code__.co_code: 142 raise ValueError(f"Duplicate name: '{func_name}'.") 143 144 @wraps(func) 145 def wrapper(*args: t.Any, **kwargs: t.Any) -> DECORATOR_RETURN_TYPE: 146 return func(*args, **kwargs) 147 148 return wrapper 149 150 @classmethod 151 def get_registry(cls) -> UniqueKeyDict: 152 """Get a copy of the registry""" 153 return UniqueKeyDict(cls.registry_name, **(cls._registry or {})) 154 155 @classmethod 156 def set_registry(cls, registry: UniqueKeyDict) -> None: 157 """Set the registry.""" 158 cls._registry = registry
A decorator that registers itself.
150 @classmethod 151 def get_registry(cls) -> UniqueKeyDict: 152 """Get a copy of the registry""" 153 return UniqueKeyDict(cls.registry_name, **(cls._registry or {}))
Get a copy of the registry
161@contextmanager 162def sys_path(*paths: Path) -> t.Iterator[None]: 163 """A context manager to temporarily add a path to 'sys.path'.""" 164 inserted = set() 165 166 for path in paths: 167 path_str = str(path.absolute()) 168 169 if path_str not in sys.path: 170 sys.path.insert(0, path_str) 171 inserted.add(path_str) 172 173 try: 174 yield 175 finally: 176 for path_str in inserted: 177 sys.path.remove(path_str)
A context manager to temporarily add a path to 'sys.path'.
186def word_characters_only(s: str, replacement_char: str = "_") -> str: 187 """ 188 Replace all non-word characters in string with the replacement character. 189 Reference SO: https://stackoverflow.com/questions/1276764/stripping-everything-but-alphanumeric-chars-from-a-string-in-python/70310018#70310018 190 191 >>> word_characters_only("Hello, world!") 192 'Hello__world_' 193 >>> word_characters_only("Hello, world! 123", '') 194 'Helloworld123' 195 """ 196 return re.sub(r"\W", replacement_char, s)
Replace all non-word characters in string with the replacement character. Reference SO: https://stackoverflow.com/questions/1276764/stripping-everything-but-alphanumeric-chars-from-a-string-in-python/70310018#70310018
>>> word_characters_only("Hello, world!")
'Hello__world_'
>>> word_characters_only("Hello, world! 123", '')
'Helloworld123'
199def str_to_bool(s: t.Optional[str]) -> bool: 200 """ 201 Convert a string to a boolean. disutils is being deprecated and it is recommended to implement your own version: 202 https://peps.python.org/pep-0632/ 203 204 Unlike disutils, this actually returns a bool and never raises. If a value cannot be determined to be true 205 then false is returned. 206 """ 207 if not s: 208 return False 209 return s.lower() in ("true", "1", "t", "y", "yes", "on")
Convert a string to a boolean. disutils is being deprecated and it is recommended to implement your own version: https://peps.python.org/pep-0632/
Unlike disutils, this actually returns a bool and never raises. If a value cannot be determined to be true then false is returned.
224def ttl_cache(ttl: int = 60, maxsize: int = 128000) -> t.Callable: 225 """Caches a function that clears whenever the current epoch / ttl seconds changes. 226 227 TTL is not exact, it is used as a salt. So by default, at every minute mark, the cache will be cleared. 228 This is done for simplicity. 229 230 Args: 231 ttl: The number of seconds to hold the cache for. 232 maxsize: The maximum size of the cache. 233 """ 234 235 def decorator(func: t.Callable) -> t.Any: 236 @lru_cache(maxsize=maxsize) 237 def cache(tick: int, *args: t.Any, **kwargs: t.Any) -> t.Any: 238 return func(*args, **kwargs) 239 240 @wraps(func) 241 def wrap(*args: t.Any, **kwargs: t.Any) -> t.Any: 242 return cache(int(time.time() / ttl), *args, **kwargs) 243 244 return wrap 245 246 return decorator
Caches a function that clears whenever the current epoch / ttl seconds changes.
TTL is not exact, it is used as a salt. So by default, at every minute mark, the cache will be cleared. This is done for simplicity.
Arguments:
- ttl: The number of seconds to hold the cache for.
- maxsize: The maximum size of the cache.
249class classproperty(property): 250 """ 251 Similar to a normal property but works for class methods 252 """ 253 254 def __get__(self, obj: t.Any, owner: t.Any = None) -> t.Any: 255 return classmethod(self.fget).__get__(None, owner)() # type: ignore
Similar to a normal property but works for class methods
Inherited Members
- builtins.property
- property
- getter
- setter
- deleter
- fget
- fset
- fdel
258@contextmanager 259def env_vars(environ: dict[str, str]) -> t.Iterator[None]: 260 """A context manager to temporarily modify environment variables.""" 261 old_environ = os.environ.copy() 262 os.environ.update(environ) 263 264 try: 265 yield 266 finally: 267 os.environ.clear() 268 os.environ.update(old_environ)
A context manager to temporarily modify environment variables.
271def merge_dicts(*args: t.Dict) -> t.Dict: 272 """ 273 Merges dicts. Just does key collision replacement 274 """ 275 276 def merge(a: t.Dict, b: t.Dict) -> t.Dict: 277 for b_key, b_value in b.items(): 278 a_value = a.get(b_key) 279 if isinstance(a_value, dict) and isinstance(b_value, dict): 280 merge(a_value, b_value) 281 elif isinstance(b_value, dict): 282 a[b_key] = copy.deepcopy(b_value) 283 else: 284 a[b_key] = b_value 285 return a 286 287 return reduce(merge, args, {})
Merges dicts. Just does key collision replacement
317def columns_to_types_to_struct( 318 columns_to_types: t.Union[t.Dict[str, exp.DataType], t.Dict[str, str]], 319) -> exp.DataType: 320 """ 321 Converts a dict of column names to types to a struct. 322 """ 323 return exp.DataType( 324 this=exp.DataType.Type.STRUCT, 325 expressions=[ 326 exp.ColumnDef(this=exp.to_identifier(k), kind=v) for k, v in columns_to_types.items() 327 ], 328 nested=True, 329 )
Converts a dict of column names to types to a struct.
332def type_is_known(d_type: t.Union[exp.DataType, exp.ColumnDef]) -> bool: 333 """Checks that a given column type is known and not NULL.""" 334 if isinstance(d_type, exp.ColumnDef): 335 if not d_type.kind: 336 return False 337 d_type = d_type.kind 338 if isinstance(d_type, exp.DataTypeParam): 339 return True 340 if d_type.is_type(exp.DataType.Type.UNKNOWN, exp.DataType.Type.NULL): 341 return False 342 if d_type.expressions: 343 return all(type_is_known(expression) for expression in d_type.expressions) 344 return True
Checks that a given column type is known and not NULL.
347def columns_to_types_all_known(columns_to_types: t.Dict[str, exp.DataType]) -> bool: 348 """Checks that all column types are known and not NULL.""" 349 return all(type_is_known(expression) for expression in columns_to_types.values())
Checks that all column types are known and not NULL.
352class Verbosity(IntEnum): 353 """Verbosity levels for SQLMesh output.""" 354 355 DEFAULT = 0 356 VERBOSE = 1 357 VERY_VERBOSE = 2 358 359 @property 360 def is_default(self) -> bool: 361 return self == Verbosity.DEFAULT 362 363 @property 364 def is_verbose(self) -> bool: 365 return self == Verbosity.VERBOSE 366 367 @property 368 def is_very_verbose(self) -> bool: 369 return self == Verbosity.VERY_VERBOSE
Verbosity levels for SQLMesh output.
Inherited Members
- enum.Enum
- name
- value
- builtins.int
- conjugate
- bit_length
- bit_count
- to_bytes
- from_bytes
- as_integer_ratio
- real
- imag
- numerator
- denominator
372class CompletionStatus(Enum): 373 SUCCESS = "success" 374 FAILURE = "failure" 375 NOTHING_TO_DO = "nothing_to_do" 376 377 @property 378 def is_success(self) -> bool: 379 return self == CompletionStatus.SUCCESS 380 381 @property 382 def is_failure(self) -> bool: 383 return self == CompletionStatus.FAILURE 384 385 @property 386 def is_nothing_to_do(self) -> bool: 387 return self == CompletionStatus.NOTHING_TO_DO
An enumeration.
Inherited Members
- enum.Enum
- name
- value
An enumeration.
Inherited Members
- enum.Enum
- name
- value
401@dataclass(frozen=True) 402class CorrelationId: 403 """ID that is added to each query in order to identify the job that created it.""" 404 405 job_type: JobType 406 job_id: str 407 408 def __str__(self) -> str: 409 return f"{self.job_type.value}: {self.job_id}" 410 411 @classmethod 412 def from_plan_id(cls, plan_id: str) -> CorrelationId: 413 return CorrelationId(JobType.PLAN, plan_id)
ID that is added to each query in order to identify the job that created it.
416def get_source_columns_to_types( 417 columns_to_types: t.Dict[str, exp.DataType], 418 source_columns: t.Optional[t.List[str]], 419) -> t.Dict[str, exp.DataType]: 420 source_column_lookup = set(source_columns) if source_columns else None 421 return { 422 k: v 423 for k, v in columns_to_types.items() 424 if not source_column_lookup or k in source_column_lookup 425 }