Source code for openstef_beam.benchmarking.storage.local_storage
# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project <openstef@lfenergy.org>
#
# SPDX-License-Identifier: MPL-2.0
"""Local file system storage implementation for benchmark results.
Provides file system-based storage for benchmark artifacts including predictions,
evaluations, and analysis visualizations. Organizes results in a structured
directory hierarchy that supports efficient retrieval and conditional processing.
"""
from pathlib import Path
from typing import override
from openstef_beam.analysis import AnalysisOutput, AnalysisScope
from openstef_beam.analysis.models import AnalysisAggregation
from openstef_beam.benchmarking.models import BenchmarkTarget
from openstef_beam.benchmarking.storage.base import BenchmarkStorage
from openstef_beam.evaluation import EvaluationReport
from openstef_core.datasets import TimeSeriesDataset
[docs]
class LocalBenchmarkStorage(BenchmarkStorage):
"""File system-based storage implementation for benchmark results.
Stores benchmark artifacts (predictions, evaluations, and visualizations) in a
structured directory hierarchy on the local file system. Supports conditional
skipping of existing files to avoid redundant processing.
Directory structure:
base_path/
├── backtest/
│ └── group_name/
│ └── target_name/
│ └── predictions.parquet
├── evaluation/
│ └── group_name/
│ └── target_name/
└── analysis/
├── group_name/
│ ├── target_name/ # Target-specific visualizations
│ └── global/ # Group-level aggregated visualizations
└── global/ # Global aggregated visualizations
"""
[docs]
def __init__(
self,
base_path: Path,
*,
skip_when_existing: bool = True,
predictions_filename: str = "predictions.parquet",
backtest_dirname: str = "backtest",
evaluations_dirname: str = "evaluation",
analysis_dirname: str = "analysis",
):
"""Initialize local file system storage.
Args:
base_path: Root directory where all benchmark artifacts will be stored.
skip_when_existing: When True, has_* methods consider existing files as
valid and skip reprocessing. When False, always indicates missing data.
predictions_filename: Name of the parquet file for storing backtest predictions.
backtest_dirname: Directory name for backtest predictions within base_path.
evaluations_dirname: Directory name for evaluation reports within each target.
analysis_dirname: Directory name for analysis visualizations.
"""
self.base_path = base_path
self.skip_when_existing = skip_when_existing
self.predictions_filename = predictions_filename
self.backtest_dirname = backtest_dirname
self.evaluations_dirname = evaluations_dirname
self.analysis_dirname = analysis_dirname
[docs]
@override
def save_backtest_output(self, target: BenchmarkTarget, output: TimeSeriesDataset) -> None:
"""Save backtest predictions to a parquet file."""
predictions_path = self.get_predictions_path_for_target(target)
predictions_path.parent.mkdir(parents=True, exist_ok=True)
output.to_parquet(predictions_path)
[docs]
@override
def load_backtest_output(self, target: BenchmarkTarget) -> TimeSeriesDataset:
"""Load backtest predictions from a parquet file.
Returns:
TimeSeriesDataset: The loaded prediction data.
"""
return TimeSeriesDataset.read_parquet(
path=self.get_predictions_path_for_target(target),
)
[docs]
@override
def has_backtest_output(self, target: BenchmarkTarget) -> bool:
return self.get_predictions_path_for_target(target).exists() and self.skip_when_existing
[docs]
@override
def save_evaluation_output(self, target: BenchmarkTarget, output: EvaluationReport) -> None:
output.to_parquet(path=self.get_evaluations_path_for_target(target))
[docs]
@override
def load_evaluation_output(self, target: BenchmarkTarget) -> EvaluationReport:
return EvaluationReport.read_parquet(path=self.get_evaluations_path_for_target(target))
[docs]
@override
def has_evaluation_output(self, target: BenchmarkTarget) -> bool:
return self.get_evaluations_path_for_target(target).exists() and self.skip_when_existing
[docs]
@override
def save_analysis_output(self, output: AnalysisOutput) -> None:
"""Save analysis visualizations to HTML files."""
for filtering, visualizations in output.visualizations.items():
output_dir = self.get_analysis_path(output.scope) / str(filtering)
output_dir.mkdir(parents=True, exist_ok=True)
for visualization in visualizations:
visualization.write_html(output_dir / f"{visualization.name}.html")
[docs]
@override
def has_analysis_output(self, scope: AnalysisScope) -> bool:
return self.get_analysis_path(scope).exists() and self.skip_when_existing
[docs]
def get_predictions_path_for_target(self, target: BenchmarkTarget) -> Path:
"""Returns the path for storing predictions for a target."""
return (
self.base_path
/ self.backtest_dirname
/ str(target.group_name)
/ str(target.name)
/ self.predictions_filename
)
[docs]
def get_evaluations_path_for_target(self, target: BenchmarkTarget) -> Path:
"""Returns the path for storing evaluation results for a target."""
return self.base_path / self.evaluations_dirname / str(target.group_name) / str(target.name)
[docs]
def get_analysis_path(self, scope: AnalysisScope) -> Path:
"""Get the file path for storing analysis output based on aggregation scope.
Returns:
Path: Directory path where analysis results should be stored.
"""
base_dir = self.base_path / self.analysis_dirname
if scope.aggregation == AnalysisAggregation.NONE:
output_dir = base_dir / str(scope.group_name) / str(scope.target_name)
elif scope.aggregation == AnalysisAggregation.TARGET:
output_dir = base_dir / str(scope.group_name) / "global"
elif scope.aggregation == AnalysisAggregation.GROUP:
output_dir = base_dir / "global"
elif scope.aggregation == AnalysisAggregation.RUN_AND_NONE:
output_dir = base_dir / str(scope.group_name) / str(scope.target_name)
elif scope.aggregation == AnalysisAggregation.RUN_AND_GROUP:
output_dir = base_dir
elif scope.aggregation == AnalysisAggregation.RUN_AND_TARGET:
output_dir = base_dir / str(scope.group_name) / "global"
else:
# Default case for any new or unexpected aggregation types
output_dir = base_dir
return output_dir
__all__ = ["LocalBenchmarkStorage"]