Source code for openstef_beam.benchmarking.benchmark_comparison_pipeline

# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project <openstef@lfenergy.org>
#
# SPDX-License-Identifier: MPL-2.0

"""Multi-run benchmark comparison and analysis pipeline.

Provides tools for comparing results across multiple benchmark runs, enabling
systematic evaluation of model improvements, parameter tuning effects, and
cross-validation analysis. Supports aggregated analysis at global, group,
and individual target levels.

The comparison pipeline operates on existing benchmark results, allowing
retrospective analysis without re-running expensive computations.
"""

import logging
from collections import defaultdict

from openstef_beam.analysis import AnalysisConfig, AnalysisPipeline, AnalysisScope
from openstef_beam.analysis.models import AnalysisAggregation, GroupName, RunName, TargetMetadata, TargetName
from openstef_beam.benchmarking.benchmark_pipeline import read_evaluation_reports
from openstef_beam.benchmarking.models import BenchmarkTarget
from openstef_beam.benchmarking.storage import BenchmarkStorage
from openstef_beam.benchmarking.target_provider import TargetProvider
from openstef_beam.evaluation import EvaluationReport

_logger = logging.getLogger(__name__)


[docs] class BenchmarkComparisonPipeline[T: BenchmarkTarget, F]: """Pipeline for comparing results across multiple benchmark runs. Enables systematic comparison of forecasting models by analyzing results from multiple benchmark runs side-by-side. Provides aggregated analysis at different levels (global, group, target) to identify performance patterns and improvements. Use cases: - Compare model variants (different hyperparameters, algorithms) - Evaluate performance before/after model updates - Cross-validation analysis across different time periods - A/B testing of forecasting approaches The pipeline operates on existing benchmark results, making it efficient for retrospective analysis without re-running expensive computations. Multi-level analysis: The pipeline automatically generates analysis at three aggregation levels: - Global: Overall performance across all runs and targets - Group: Performance comparison within target groups - Target: Individual target performance across runs This hierarchical approach helps identify whether improvements are consistent across the entire portfolio or specific to certain target types. Example: Comparing three model versions across all targets: >>> from openstef_beam.benchmarking import BenchmarkComparisonPipeline >>> from openstef_beam.analysis import AnalysisConfig >>> from openstef_beam.benchmarking.storage import LocalBenchmarkStorage >>> from openstef_beam.analysis.visualizations import SummaryTableVisualization >>> from pathlib import Path >>> >>> from openstef_beam.analysis.visualizations import ( ... GroupedTargetMetricVisualization, ... TimeSeriesVisualization ... ) >>> >>> # Configure analysis >>> analysis_config = AnalysisConfig( ... visualization_providers=[ ... GroupedTargetMetricVisualization(name="model_comparison", metric="rCRPS"), ... SummaryTableVisualization(name="performance_summary"), ... TimeSeriesVisualization(name="prediction_quality") ... ] ... ) >>> >>> # Set up comparison pipeline >>> comparison = BenchmarkComparisonPipeline( ... analysis_config=analysis_config, ... target_provider=..., ... storage=... ... ) >>> >>> # Compare multiple model versions across all targets >>> run_data = { ... "baseline_v1": LocalBenchmarkStorage("results/baseline"), ... "improved_v2": LocalBenchmarkStorage("results/improved"), ... "experimental_v3": LocalBenchmarkStorage("results/experimental") ... } >>> >>> # Generate comparison analysis >>> # comparison.run( >>> # run_data=run_data, >>> # ) """
[docs] def __init__( self, analysis_config: AnalysisConfig, target_provider: TargetProvider[T, F], storage: BenchmarkStorage, ): """Initialize the comparison pipeline. Args: analysis_config: Configuration for analysis and visualization generation. target_provider: Provider that supplies targets for comparison. storage: Storage backend for saving comparison results. """ super().__init__() self.analysis_config = analysis_config self.target_provider = target_provider self.storage = storage self.pipeline = AnalysisPipeline( config=self.analysis_config, )
[docs] def run( self, run_data: dict[RunName, BenchmarkStorage], filter_args: F | None = None, ): """Execute comparison analysis across multiple benchmark runs. Orchestrates the complete comparison workflow: loads evaluation reports from all specified runs, then generates comparative analysis at global, group, and target levels. Args: run_data: Mapping from run names to their corresponding storage backends. Each storage backend should contain evaluation results for the run. filter_args: Optional criteria for filtering targets. Only targets matching these criteria will be included in the comparison. """ targets = self.target_provider.get_targets(filter_args) # Read evaluation reports for each run reports: list[tuple[TargetMetadata, EvaluationReport]] = [] for run_name, run_storage in run_data.items(): run_reports = read_evaluation_reports( targets=targets, storage=run_storage, run_name=run_name, strict=True, ) reports.extend(run_reports) self.run_global(reports) self.run_for_groups(reports) self.run_for_targets(reports)
[docs] def run_global(self, reports: list[tuple[TargetMetadata, EvaluationReport]]): """Generate global comparison analysis across all runs and targets. Creates aggregate visualizations comparing performance across all runs and target groups, providing a high-level overview of model improvements. Args: reports: List of target metadata and evaluation report pairs from all runs. """ scope = AnalysisScope( aggregation=AnalysisAggregation.RUN_AND_GROUP, ) if self.storage.has_analysis_output(scope=scope): _logger.info("Skipping global analysis, already exists") return _logger.info("Running analysis comparison for runs across groups") analysis = self.pipeline.run_for_reports( reports=reports, scope=scope, ) self.storage.save_analysis_output(output=analysis)
[docs] def run_for_groups( self, reports: list[tuple[TargetMetadata, EvaluationReport]], ): """Generate group-level comparison analysis for each target group. Creates comparative visualizations within each target group, showing how different runs perform for similar types of targets. Args: reports: List of target metadata and evaluation report pairs from all runs. """ grouped: dict[GroupName, list[tuple[TargetMetadata, EvaluationReport]]] = defaultdict(list) for metadata, report in reports: grouped[metadata.group_name].append((metadata, report)) for group_name, report_subset in grouped.items(): scope = AnalysisScope( aggregation=AnalysisAggregation.RUN_AND_TARGET, group_name=group_name, ) if self.storage.has_analysis_output(scope=scope): _logger.info( "Skipping analysis for group %s, already exists", group_name, ) continue _logger.info("Running analysis for group comparison") run_analysis = self.pipeline.run_for_reports( reports=report_subset, scope=scope, ) self.storage.save_analysis_output(output=run_analysis)
[docs] def run_for_targets( self, reports: list[tuple[TargetMetadata, EvaluationReport]], ): """Generate target-level comparison analysis for individual targets. Creates detailed comparative visualizations for each individual target, showing how different runs perform on the same forecasting challenge. Args: reports: List of target metadata and evaluation report pairs from all runs. """ grouped: dict[tuple[GroupName, TargetName], list[tuple[TargetMetadata, EvaluationReport]]] = defaultdict(list) for metadata, report in reports: grouped[metadata.group_name, metadata.name].append((metadata, report)) for (group_name, target_name), report_subset in grouped.items(): scope = AnalysisScope( aggregation=AnalysisAggregation.RUN_AND_NONE, target_name=target_name, group_name=group_name, ) if self.storage.has_analysis_output(scope=scope): _logger.info( "Skipping analysis for target %s in group %s, already exists", target_name, group_name, ) continue _logger.info("Running analysis for target comparison") run_analysis = self.pipeline.run_for_reports( reports=report_subset, scope=scope, ) self.storage.save_analysis_output(output=run_analysis)
__all__ = ["BenchmarkComparisonPipeline"]