sqlmesh.utils.process
1# mypy: disable-error-code=no-untyped-def 2 3from concurrent.futures import Future, ProcessPoolExecutor 4import typing as t 5import multiprocessing as mp 6from sqlmesh.utils.windows import IS_WINDOWS 7 8 9class SynchronousPoolExecutor: 10 """A mock implementation of the ProcessPoolExecutor for synchronous use. 11 12 This executor runs functions synchronously in the same process, avoiding the issues 13 with forking in test environments or when forking isn't possible (non-posix). 14 """ 15 16 def __init__(self, max_workers=None, mp_context=None, initializer=None, initargs=()): 17 if initializer is not None: 18 try: 19 initializer(*initargs) 20 except BaseException as ex: 21 raise RuntimeError(f"Exception in initializer: {ex}") 22 23 def __enter__(self): 24 return self 25 26 def __exit__(self, *args): 27 self.shutdown(wait=True) 28 return False 29 30 def shutdown(self, wait=True, cancel_futures=False): 31 """No-op method to match ProcessPoolExecutor API. 32 33 Since this executor runs synchronously, there are no background processes 34 or resources to shut down and all futures will have completed already. 35 """ 36 pass 37 38 def submit(self, fn, *args, **kwargs): 39 """Execute the function synchronously and return a Future with the result.""" 40 future = Future() 41 try: 42 result = fn(*args, **kwargs) 43 future.set_result(result) 44 except Exception as e: 45 future.set_exception(e) 46 return future 47 48 def map(self, fn, *iterables, timeout=None, chunksize=1): 49 """Synchronous implementation of ProcessPoolExecutor.map. 50 51 This executes the function for each set of inputs from the iterables in the 52 current process using Python's built-in map, rather than distributing work. 53 """ 54 return map(fn, *iterables) 55 56 57PoolExecutor = t.Union[SynchronousPoolExecutor, ProcessPoolExecutor] 58 59 60def create_process_pool_executor( 61 initializer: t.Callable, initargs: t.Tuple, max_workers: t.Optional[int] 62) -> PoolExecutor: 63 if max_workers == 1 or IS_WINDOWS: 64 return SynchronousPoolExecutor( 65 initializer=initializer, 66 initargs=initargs, 67 ) 68 return ProcessPoolExecutor( 69 mp_context=mp.get_context("fork"), 70 initializer=initializer, 71 initargs=initargs, 72 max_workers=max_workers, 73 )
class
SynchronousPoolExecutor:
10class SynchronousPoolExecutor: 11 """A mock implementation of the ProcessPoolExecutor for synchronous use. 12 13 This executor runs functions synchronously in the same process, avoiding the issues 14 with forking in test environments or when forking isn't possible (non-posix). 15 """ 16 17 def __init__(self, max_workers=None, mp_context=None, initializer=None, initargs=()): 18 if initializer is not None: 19 try: 20 initializer(*initargs) 21 except BaseException as ex: 22 raise RuntimeError(f"Exception in initializer: {ex}") 23 24 def __enter__(self): 25 return self 26 27 def __exit__(self, *args): 28 self.shutdown(wait=True) 29 return False 30 31 def shutdown(self, wait=True, cancel_futures=False): 32 """No-op method to match ProcessPoolExecutor API. 33 34 Since this executor runs synchronously, there are no background processes 35 or resources to shut down and all futures will have completed already. 36 """ 37 pass 38 39 def submit(self, fn, *args, **kwargs): 40 """Execute the function synchronously and return a Future with the result.""" 41 future = Future() 42 try: 43 result = fn(*args, **kwargs) 44 future.set_result(result) 45 except Exception as e: 46 future.set_exception(e) 47 return future 48 49 def map(self, fn, *iterables, timeout=None, chunksize=1): 50 """Synchronous implementation of ProcessPoolExecutor.map. 51 52 This executes the function for each set of inputs from the iterables in the 53 current process using Python's built-in map, rather than distributing work. 54 """ 55 return map(fn, *iterables)
A mock implementation of the ProcessPoolExecutor for synchronous use.
This executor runs functions synchronously in the same process, avoiding the issues with forking in test environments or when forking isn't possible (non-posix).
def
shutdown(self, wait=True, cancel_futures=False):
31 def shutdown(self, wait=True, cancel_futures=False): 32 """No-op method to match ProcessPoolExecutor API. 33 34 Since this executor runs synchronously, there are no background processes 35 or resources to shut down and all futures will have completed already. 36 """ 37 pass
No-op method to match ProcessPoolExecutor API.
Since this executor runs synchronously, there are no background processes or resources to shut down and all futures will have completed already.
def
submit(self, fn, *args, **kwargs):
39 def submit(self, fn, *args, **kwargs): 40 """Execute the function synchronously and return a Future with the result.""" 41 future = Future() 42 try: 43 result = fn(*args, **kwargs) 44 future.set_result(result) 45 except Exception as e: 46 future.set_exception(e) 47 return future
Execute the function synchronously and return a Future with the result.
def
map(self, fn, *iterables, timeout=None, chunksize=1):
49 def map(self, fn, *iterables, timeout=None, chunksize=1): 50 """Synchronous implementation of ProcessPoolExecutor.map. 51 52 This executes the function for each set of inputs from the iterables in the 53 current process using Python's built-in map, rather than distributing work. 54 """ 55 return map(fn, *iterables)
Synchronous implementation of ProcessPoolExecutor.map.
This executes the function for each set of inputs from the iterables in the current process using Python's built-in map, rather than distributing work.
PoolExecutor =
typing.Union[SynchronousPoolExecutor, concurrent.futures.process.ProcessPoolExecutor]
def
create_process_pool_executor( initializer: Callable, initargs: Tuple, max_workers: Optional[int]) -> Union[SynchronousPoolExecutor, concurrent.futures.process.ProcessPoolExecutor]:
61def create_process_pool_executor( 62 initializer: t.Callable, initargs: t.Tuple, max_workers: t.Optional[int] 63) -> PoolExecutor: 64 if max_workers == 1 or IS_WINDOWS: 65 return SynchronousPoolExecutor( 66 initializer=initializer, 67 initargs=initargs, 68 ) 69 return ProcessPoolExecutor( 70 mp_context=mp.get_context("fork"), 71 initializer=initializer, 72 initargs=initargs, 73 max_workers=max_workers, 74 )