Edit on GitHub

sqlmesh.utils.process

 1# mypy: disable-error-code=no-untyped-def
 2
 3from concurrent.futures import Future, ProcessPoolExecutor
 4import typing as t
 5import multiprocessing as mp
 6from sqlmesh.utils.windows import IS_WINDOWS
 7
 8
 9class SynchronousPoolExecutor:
10    """A mock implementation of the ProcessPoolExecutor for synchronous use.
11
12    This executor runs functions synchronously in the same process, avoiding the issues
13    with forking in test environments or when forking isn't possible (non-posix).
14    """
15
16    def __init__(self, max_workers=None, mp_context=None, initializer=None, initargs=()):
17        if initializer is not None:
18            try:
19                initializer(*initargs)
20            except BaseException as ex:
21                raise RuntimeError(f"Exception in initializer: {ex}")
22
23    def __enter__(self):
24        return self
25
26    def __exit__(self, *args):
27        self.shutdown(wait=True)
28        return False
29
30    def shutdown(self, wait=True, cancel_futures=False):
31        """No-op method to match ProcessPoolExecutor API.
32
33        Since this executor runs synchronously, there are no background processes
34        or resources to shut down and all futures will have completed already.
35        """
36        pass
37
38    def submit(self, fn, *args, **kwargs):
39        """Execute the function synchronously and return a Future with the result."""
40        future = Future()
41        try:
42            result = fn(*args, **kwargs)
43            future.set_result(result)
44        except Exception as e:
45            future.set_exception(e)
46        return future
47
48    def map(self, fn, *iterables, timeout=None, chunksize=1):
49        """Synchronous implementation of ProcessPoolExecutor.map.
50
51        This executes the function for each set of inputs from the iterables in the
52        current process using Python's built-in map, rather than distributing work.
53        """
54        return map(fn, *iterables)
55
56
57PoolExecutor = t.Union[SynchronousPoolExecutor, ProcessPoolExecutor]
58
59
60def create_process_pool_executor(
61    initializer: t.Callable, initargs: t.Tuple, max_workers: t.Optional[int]
62) -> PoolExecutor:
63    if max_workers == 1 or IS_WINDOWS:
64        return SynchronousPoolExecutor(
65            initializer=initializer,
66            initargs=initargs,
67        )
68    return ProcessPoolExecutor(
69        mp_context=mp.get_context("fork"),
70        initializer=initializer,
71        initargs=initargs,
72        max_workers=max_workers,
73    )
class SynchronousPoolExecutor:
10class SynchronousPoolExecutor:
11    """A mock implementation of the ProcessPoolExecutor for synchronous use.
12
13    This executor runs functions synchronously in the same process, avoiding the issues
14    with forking in test environments or when forking isn't possible (non-posix).
15    """
16
17    def __init__(self, max_workers=None, mp_context=None, initializer=None, initargs=()):
18        if initializer is not None:
19            try:
20                initializer(*initargs)
21            except BaseException as ex:
22                raise RuntimeError(f"Exception in initializer: {ex}")
23
24    def __enter__(self):
25        return self
26
27    def __exit__(self, *args):
28        self.shutdown(wait=True)
29        return False
30
31    def shutdown(self, wait=True, cancel_futures=False):
32        """No-op method to match ProcessPoolExecutor API.
33
34        Since this executor runs synchronously, there are no background processes
35        or resources to shut down and all futures will have completed already.
36        """
37        pass
38
39    def submit(self, fn, *args, **kwargs):
40        """Execute the function synchronously and return a Future with the result."""
41        future = Future()
42        try:
43            result = fn(*args, **kwargs)
44            future.set_result(result)
45        except Exception as e:
46            future.set_exception(e)
47        return future
48
49    def map(self, fn, *iterables, timeout=None, chunksize=1):
50        """Synchronous implementation of ProcessPoolExecutor.map.
51
52        This executes the function for each set of inputs from the iterables in the
53        current process using Python's built-in map, rather than distributing work.
54        """
55        return map(fn, *iterables)

A mock implementation of the ProcessPoolExecutor for synchronous use.

This executor runs functions synchronously in the same process, avoiding the issues with forking in test environments or when forking isn't possible (non-posix).

SynchronousPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
17    def __init__(self, max_workers=None, mp_context=None, initializer=None, initargs=()):
18        if initializer is not None:
19            try:
20                initializer(*initargs)
21            except BaseException as ex:
22                raise RuntimeError(f"Exception in initializer: {ex}")
def shutdown(self, wait=True, cancel_futures=False):
31    def shutdown(self, wait=True, cancel_futures=False):
32        """No-op method to match ProcessPoolExecutor API.
33
34        Since this executor runs synchronously, there are no background processes
35        or resources to shut down and all futures will have completed already.
36        """
37        pass

No-op method to match ProcessPoolExecutor API.

Since this executor runs synchronously, there are no background processes or resources to shut down and all futures will have completed already.

def submit(self, fn, *args, **kwargs):
39    def submit(self, fn, *args, **kwargs):
40        """Execute the function synchronously and return a Future with the result."""
41        future = Future()
42        try:
43            result = fn(*args, **kwargs)
44            future.set_result(result)
45        except Exception as e:
46            future.set_exception(e)
47        return future

Execute the function synchronously and return a Future with the result.

def map(self, fn, *iterables, timeout=None, chunksize=1):
49    def map(self, fn, *iterables, timeout=None, chunksize=1):
50        """Synchronous implementation of ProcessPoolExecutor.map.
51
52        This executes the function for each set of inputs from the iterables in the
53        current process using Python's built-in map, rather than distributing work.
54        """
55        return map(fn, *iterables)

Synchronous implementation of ProcessPoolExecutor.map.

This executes the function for each set of inputs from the iterables in the current process using Python's built-in map, rather than distributing work.

PoolExecutor = typing.Union[SynchronousPoolExecutor, concurrent.futures.process.ProcessPoolExecutor]
def create_process_pool_executor( initializer: Callable, initargs: Tuple, max_workers: Optional[int]) -> Union[SynchronousPoolExecutor, concurrent.futures.process.ProcessPoolExecutor]:
61def create_process_pool_executor(
62    initializer: t.Callable, initargs: t.Tuple, max_workers: t.Optional[int]
63) -> PoolExecutor:
64    if max_workers == 1 or IS_WINDOWS:
65        return SynchronousPoolExecutor(
66            initializer=initializer,
67            initargs=initargs,
68        )
69    return ProcessPoolExecutor(
70        mp_context=mp.get_context("fork"),
71        initializer=initializer,
72        initargs=initargs,
73        max_workers=max_workers,
74    )