run_parallel#

openstef_core.utils.run_parallel(process_fn: Callable[[T], R], items: Iterable, n_processes: int | None = None, mode: Literal['loky', 'spawn', 'fork'] = 'loky') list[R][source]#

Execute a function in parallel across multiple processes.

On macOS, explicitly uses fork context to avoid issues with the default spawn context that became the default in Python 3.8+. Fork context preserves the parent process memory, making it more efficient for sharing large objects like trained models or data structures.

Parameters:
  • process_fn (Callable[[T], R]) – Function to apply to each item. Must be picklable for multiprocessing. Lambda functions won’t work - use def functions.

  • items (Iterable) – Iterable of items to process.

  • n_processes (int | None) – Number of processes to use. If None or <= 1, runs sequentially. Typically set to number of CPU cores or logical cores.

  • mode (Literal['loky', 'spawn', 'fork']) – Multiprocessing start method. ‘loky’ is recommeneded for robust ml use-cases. ‘fork’ is more efficient on macOS, while ‘spawn’ is default on Windows/Linux. Xgboost seems to have bugs when used with ‘fork’.

Returns:

List of results from applying process_fn to each item, in the same order as the input items.

Return type:

list[R]

Example

>>> def square(x: int) -> int:
...     return x * x
>>> # Sequential execution (n_processes <= 1) - always works
>>> run_parallel(square, [1, 2, 3, 4], n_processes=1)
[1, 4, 9, 16]
>>> # For parallel execution, use module-level functions:
>>> # run_parallel(math.sqrt, [1, 4, 9, 16], n_processes=2)
>>> # [1.0, 2.0, 3.0, 4.0]
>>> # Empty input handling
>>> run_parallel(square, [], n_processes=1)
[]
Parameters:
  • process_fn (Callable[[TypeVar(T)], TypeVar(R)])

  • items (Iterable[TypeVar(T)])

  • n_processes (int | None)

  • mode (Literal['loky', 'spawn', 'fork'])

Return type:

list[TypeVar(R)]