BenchmarkPipeline#

class openstef_beam.benchmarking.BenchmarkPipeline(backtest_config: BacktestConfig, evaluation_config: EvaluationConfig, analysis_config: AnalysisConfig, target_provider: TargetProvider[TypeVar, TypeVar], storage: BenchmarkStorage | None = None, callbacks: list[BenchmarkCallback] | None = None) None[source]#

Bases: Generic

Orchestrates forecasting model benchmarks across multiple targets.

Provides a standardized framework for systematic evaluation of forecasting models. Coordinates the entire benchmark workflow from data preparation through analysis, ensuring consistent evaluation methodology and result comparability.

Core workflow: 1. Target retrieval: Gets targets from configurable providers with optional filtering 2. Model creation: Uses factory pattern to create target-specific forecasters 3. Backtesting: Generates predictions using historical data with proper validation 4. Evaluation: Computes performance metrics against ground truth 5. Analysis: Creates visualizations and comparative reports 6. Storage: Persists results for future analysis and comparison

Key features: - Parallel execution support for efficient processing of large target sets - Pluggable storage backends (local filesystem, cloud storage, in-memory) - Extensible callback system for monitoring and custom processing - Automatic handling of data dependencies and validation - Consistent error handling and recovery mechanisms

Example

Basic benchmark setup and execution:

>>> from openstef_beam.benchmarking import BenchmarkPipeline
>>> from openstef_beam.backtesting import BacktestConfig
>>> from openstef_beam.evaluation import EvaluationConfig
>>> from openstef_beam.analysis import AnalysisConfig
>>> from openstef_beam.evaluation.metric_providers import RMAEProvider, RCRPSProvider
>>> from openstef_beam.analysis.visualizations import SummaryTableVisualization
>>> from openstef_beam.benchmarking.storage.local_storage import LocalBenchmarkStorage
>>> from datetime import timedelta
>>> from pathlib import Path
>>>
>>> # Configure components
>>> storage = LocalBenchmarkStorage(base_path=Path("./results"))
>>> backtest_config = BacktestConfig(
...     horizon=timedelta(hours=24),
...     window_step=timedelta(days=1)
... )
>>> evaluation_config = EvaluationConfig()
>>> analysis_config = AnalysisConfig(
...     visualization_providers=[SummaryTableVisualization(name="summary")]
... )
>>> # Create benchmark pipeline with target provider
>>> pipeline = BenchmarkPipeline(
...     backtest_config=backtest_config,
...     evaluation_config=evaluation_config,
...     analysis_config=analysis_config,
...     storage=storage,
...     target_provider=...  # Your custom provider
... )
>>>
>>> # Define forecaster factory for target-specific models
>>> def create_forecaster(context, target):
...     # Customize model configuration per target
...     return MyForecaster(config=target.get_model_config())
>>>
>>> # Execute complete benchmark with parallel processing
>>> #pipeline.run(
>>> #    forecaster_factory=create_forecaster,
>>> #    run_name="baseline_comparison",
>>> #    n_processes=4
>>> #)
Parameters:
__init__(backtest_config: BacktestConfig, evaluation_config: EvaluationConfig, analysis_config: AnalysisConfig, target_provider: TargetProvider[TypeVar, TypeVar], storage: BenchmarkStorage | None = None, callbacks: list[BenchmarkCallback] | None = None) None[source]#

Initializes the benchmark pipeline and sets up logging and configuration.

Parameters:
  • backtest_config (BacktestConfig) – Configuration for the backtesting pipeline.

  • evaluation_config (EvaluationConfig) – Configuration for the evaluation pipeline.

  • analysis_config (AnalysisConfig) – Configuration for the analysis pipeline.

  • target_provider (TargetProvider[TypeVar, TypeVar]) – Provider that supplies benchmark targets and their data.

  • storage (BenchmarkStorage | None) – Storage backend for saving benchmark results. Defaults to in-memory storage.

  • callbacks (list[BenchmarkCallback] | None) – Optional list of callbacks to manage benchmark events.

  • backtest_config

  • evaluation_config

  • analysis_config

  • target_provider

  • storage

  • callbacks

run(forecaster_factory: ForecasterFactory, run_name: str = 'default', filter_args: F | None = None, n_processes: int | None = None) None[source]#

Runs the benchmark for all targets, optionally filtered and in parallel.

This is the main entry point for executing a benchmark. It: 1. Gets all available targets from the target provider 2. Optionally filters them based on provided criteria 3. Processes each target sequentially or in parallel 4. For each target, creates a forecastser and runs backtest and evaluation

Parameters:
  • forecaster_factory (GenericAlias[TypeVar(T, bound= BenchmarkTarget)]) – Factory function that creates a forecaster for a target. This allows customizing the model for each target.

  • run_name (str) – Name of the benchmark run, used for logging and result storage.

  • filter_args (Optional[TypeVar(F)]) – Optional filter criteria for targets. If provided, only targets matching these criteria will be processed.

  • n_processes (int | None) – Number of processes to use for parallel execution. If None or 1, targets are processed sequentially.

  • forecaster_factory

  • run_name

  • filter_args

  • n_processes

Return type:

None

run_backtest_for_target(target: T, forecaster: BacktestForecasterMixin)[source]#

Runs the backtest for a single target and stores predictions.

Parameters:
  • target (TypeVar(T, bound= BenchmarkTarget))

  • forecaster (BacktestForecasterMixin)

run_evaluation_for_target(target: T, quantiles: list[Quantile], predictions: TimeSeriesDataset) None[source]#

Runs evaluation for a single target and stores results.

Parameters:
Return type:

None

run_analysis_for_target(context: BenchmarkContext, target: T, report: EvaluationReport)[source]#

Run analysis pipeline for a single target’s evaluation results.

Parameters:
  • context (BenchmarkContext) – Benchmark execution context containing run metadata.

  • target (TypeVar(T, bound= BenchmarkTarget)) – Target that was evaluated.

  • report (EvaluationReport) – Evaluation report containing computed metrics.

  • context

  • target

  • report

run_benchmark_analysis(context: BenchmarkContext, targets: Sequence)[source]#

Runs benchmark analysis for multiple targets.

Parameters: