BenchmarkPipeline#
- class openstef_beam.benchmarking.BenchmarkPipeline(backtest_config: BacktestConfig, evaluation_config: EvaluationConfig, analysis_config: AnalysisConfig, target_provider: TargetProvider[TypeVar, TypeVar], storage: BenchmarkStorage | None = None, callbacks: list[BenchmarkCallback] | None = None) None[source]#
Bases:
GenericOrchestrates forecasting model benchmarks across multiple targets.
Provides a standardized framework for systematic evaluation of forecasting models. Coordinates the entire benchmark workflow from data preparation through analysis, ensuring consistent evaluation methodology and result comparability.
Core workflow: 1. Target retrieval: Gets targets from configurable providers with optional filtering 2. Model creation: Uses factory pattern to create target-specific forecasters 3. Backtesting: Generates predictions using historical data with proper validation 4. Evaluation: Computes performance metrics against ground truth 5. Analysis: Creates visualizations and comparative reports 6. Storage: Persists results for future analysis and comparison
Key features: - Parallel execution support for efficient processing of large target sets - Pluggable storage backends (local filesystem, cloud storage, in-memory) - Extensible callback system for monitoring and custom processing - Automatic handling of data dependencies and validation - Consistent error handling and recovery mechanisms
Example
Basic benchmark setup and execution:
>>> from openstef_beam.benchmarking import BenchmarkPipeline >>> from openstef_beam.backtesting import BacktestConfig >>> from openstef_beam.evaluation import EvaluationConfig >>> from openstef_beam.analysis import AnalysisConfig >>> from openstef_beam.evaluation.metric_providers import RMAEProvider, RCRPSProvider >>> from openstef_beam.analysis.visualizations import SummaryTableVisualization >>> from openstef_beam.benchmarking.storage.local_storage import LocalBenchmarkStorage >>> from datetime import timedelta >>> from pathlib import Path >>> >>> # Configure components >>> storage = LocalBenchmarkStorage(base_path=Path("./results")) >>> backtest_config = BacktestConfig( ... horizon=timedelta(hours=24), ... window_step=timedelta(days=1) ... ) >>> evaluation_config = EvaluationConfig() >>> analysis_config = AnalysisConfig( ... visualization_providers=[SummaryTableVisualization(name="summary")] ... ) >>> # Create benchmark pipeline with target provider >>> pipeline = BenchmarkPipeline( ... backtest_config=backtest_config, ... evaluation_config=evaluation_config, ... analysis_config=analysis_config, ... storage=storage, ... target_provider=... # Your custom provider ... ) >>> >>> # Define forecaster factory for target-specific models >>> def create_forecaster(context, target): ... # Customize model configuration per target ... return MyForecaster(config=target.get_model_config()) >>> >>> # Execute complete benchmark with parallel processing >>> #pipeline.run( >>> # forecaster_factory=create_forecaster, >>> # run_name="baseline_comparison", >>> # n_processes=4 >>> #)
- Parameters:
backtest_config (
BacktestConfig)evaluation_config (
EvaluationConfig)analysis_config (
AnalysisConfig)target_provider (
TargetProvider[TypeVar, TypeVar])storage (
BenchmarkStorage|None)callbacks (
list[BenchmarkCallback] |None)
- __init__(backtest_config: BacktestConfig, evaluation_config: EvaluationConfig, analysis_config: AnalysisConfig, target_provider: TargetProvider[TypeVar, TypeVar], storage: BenchmarkStorage | None = None, callbacks: list[BenchmarkCallback] | None = None) None[source]#
Initializes the benchmark pipeline and sets up logging and configuration.
- Parameters:
backtest_config (
BacktestConfig) – Configuration for the backtesting pipeline.evaluation_config (
EvaluationConfig) – Configuration for the evaluation pipeline.analysis_config (
AnalysisConfig) – Configuration for the analysis pipeline.target_provider (
TargetProvider[TypeVar, TypeVar]) – Provider that supplies benchmark targets and their data.storage (
BenchmarkStorage|None) – Storage backend for saving benchmark results. Defaults to in-memory storage.callbacks (
list[BenchmarkCallback] |None) – Optional list of callbacks to manage benchmark events.backtest_config
evaluation_config
analysis_config
target_provider
storage
callbacks
- run(forecaster_factory: ForecasterFactory, run_name: str = 'default', filter_args: F | None = None, n_processes: int | None = None) None[source]#
Runs the benchmark for all targets, optionally filtered and in parallel.
This is the main entry point for executing a benchmark. It: 1. Gets all available targets from the target provider 2. Optionally filters them based on provided criteria 3. Processes each target sequentially or in parallel 4. For each target, creates a forecastser and runs backtest and evaluation
- Parameters:
forecaster_factory (
GenericAlias[TypeVar(T, bound=BenchmarkTarget)]) – Factory function that creates a forecaster for a target. This allows customizing the model for each target.run_name (
str) – Name of the benchmark run, used for logging and result storage.filter_args (
Optional[TypeVar(F)]) – Optional filter criteria for targets. If provided, only targets matching these criteria will be processed.n_processes (
int|None) – Number of processes to use for parallel execution. If None or 1, targets are processed sequentially.forecaster_factory
run_name
filter_args
n_processes
- Return type:
None
- run_backtest_for_target(target: T, forecaster: BacktestForecasterMixin)[source]#
Runs the backtest for a single target and stores predictions.
- Parameters:
target (
TypeVar(T, bound=BenchmarkTarget))forecaster (
BacktestForecasterMixin)
- run_evaluation_for_target(target: T, quantiles: list[Quantile], predictions: TimeSeriesDataset) None[source]#
Runs evaluation for a single target and stores results.
- Parameters:
target (
TypeVar(T, bound=BenchmarkTarget))quantiles (
list[Quantile])predictions (
TimeSeriesDataset)
- Return type:
None
- run_analysis_for_target(context: BenchmarkContext, target: T, report: EvaluationReport)[source]#
Run analysis pipeline for a single target’s evaluation results.
- Parameters:
context (
BenchmarkContext) – Benchmark execution context containing run metadata.target (
TypeVar(T, bound=BenchmarkTarget)) – Target that was evaluated.report (
EvaluationReport) – Evaluation report containing computed metrics.context
target
report
- run_benchmark_analysis(context: BenchmarkContext, targets: Sequence)[source]#
Runs benchmark analysis for multiple targets.
- Parameters:
context (
BenchmarkContext)targets (
Sequence[TypeVar(T, bound=BenchmarkTarget)])