Source code for openstef_core.utils.multiprocessing

# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project <openstef@lfenergy.org>
#
# SPDX-License-Identifier: MPL-2.0

"""Multiprocessing utilities for parallel execution of tasks.

Provides simplified parallel processing functions with automatic platform-specific
optimizations for compute-intensive operations like model training and evaluation
across multiple forecasting scenarios.
"""

import multiprocessing
from collections.abc import Callable, Iterable
from typing import Literal


[docs] def run_parallel[T, R]( process_fn: Callable[[T], R], items: Iterable[T], n_processes: int | None = None, mode: Literal["loky", "spawn", "fork"] = "loky", ) -> list[R]: """Execute a function in parallel across multiple processes. On macOS, explicitly uses fork context to avoid issues with the default spawn context that became the default in Python 3.8+. Fork context preserves the parent process memory, making it more efficient for sharing large objects like trained models or data structures. Args: process_fn: Function to apply to each item. Must be picklable for multiprocessing. Lambda functions won't work - use def functions. items: Iterable of items to process. n_processes: Number of processes to use. If None or <= 1, runs sequentially. Typically set to number of CPU cores or logical cores. mode: Multiprocessing start method. 'loky' is recommeneded for robust ml use-cases. 'fork' is more efficient on macOS, while 'spawn' is default on Windows/Linux. Xgboost seems to have bugs when used with 'fork'. Returns: List of results from applying process_fn to each item, in the same order as the input items. Example: >>> def square(x: int) -> int: ... return x * x >>> # Sequential execution (n_processes <= 1) - always works >>> run_parallel(square, [1, 2, 3, 4], n_processes=1) [1, 4, 9, 16] >>> # For parallel execution, use module-level functions: >>> # run_parallel(math.sqrt, [1, 4, 9, 16], n_processes=2) >>> # [1.0, 2.0, 3.0, 4.0] >>> # Empty input handling >>> run_parallel(square, [], n_processes=1) [] """ if n_processes is None or n_processes <= 1: # If only one process is requested, run the function sequentially return [process_fn(item) for item in items] if mode == "loky": from joblib import Parallel, delayed # pyright: ignore[reportUnknownVariableType] # noqa: PLC0415 # Use joblib with loky backend for robust process management return Parallel(n_jobs=n_processes, backend="loky")( # pyright: ignore[reportUnknownVariableType] delayed(process_fn)(item) for item in items ) # type: ignore # Auto-configure for macOS context = multiprocessing.get_context(method=mode) with context.Pool(processes=n_processes) as pool: return pool.map(process_fn, items)