Source code for openstef_beam.backtesting.backtest_pipeline

# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project <openstef@lfenergy.org>
#
# SPDX-License-Identifier: MPL-2.0

"""Backtesting pipeline for evaluating energy forecasting models.

Simulates realistic forecasting scenarios by replaying historical data with
proper temporal constraints. Executes prediction and retraining schedules
that mirror operational deployment conditions, ensuring evaluation results
accurately reflect real-world model performance.
"""

import logging
from datetime import datetime, time, timedelta
from typing import cast

import pandas as pd
from pydantic import Field
from tqdm.auto import tqdm

from openstef_beam.backtesting.backtest_event import BacktestEvent
from openstef_beam.backtesting.backtest_event_generator import BacktestEventGenerator
from openstef_beam.backtesting.backtest_forecaster.mixins import BacktestBatchForecasterMixin, BacktestForecasterMixin
from openstef_beam.backtesting.restricted_horizon_timeseries import RestrictedHorizonVersionedTimeSeries
from openstef_core.base_model import BaseConfig
from openstef_core.datasets import TimeSeriesDataset, VersionedTimeSeriesDataset

_logger = logging.getLogger(__name__)


[docs] class BacktestConfig(BaseConfig): """Configuration for backtesting energy forecasting models.""" prediction_sample_interval: timedelta = Field( default=timedelta(minutes=15), description="Time interval between prediction samples in the output forecast", ) predict_interval: timedelta = Field( default=timedelta(hours=6), description="Time interval between generating new predictions during backtesting", ) train_interval: timedelta = Field( default=timedelta(days=7), description="Time interval between model retraining events", ) align_time: time = Field( default=time.fromisoformat("00:00+00"), description="Reference time for aligning prediction schedules to regular intervals", )
[docs] class BacktestPipeline: """Pipeline for conducting realistic backtesting of energy forecasting models. This class orchestrates the backtesting process by simulating the operational environment where forecasts are generated at regular intervals with limited historical data availability. It supports both single and batch prediction modes and handles periodic model retraining. Attributes: config: Configuration parameters for the backtesting process. forecaster: The forecasting model implementing either BacktestForecasterMixin or BacktestBatchForecasterMixin interface. start: Start datetime for the backtesting period. end: End datetime for the backtesting period. Note: The pipeline ensures temporal consistency by preventing data leakage and respecting the operational constraints that would exist in a real-time forecasting system. """ config: BacktestConfig forecaster: BacktestForecasterMixin start: datetime end: datetime
[docs] def __init__( self, config: BacktestConfig, forecaster: BacktestForecasterMixin, ) -> None: """Initialize the backtesting pipeline. Args: config: Backtesting configuration including prediction and training intervals. forecaster: Model implementing the required forecasting interface. Raises: ValueError: If the prediction sample intervals don't match between config and forecaster. """ if config.prediction_sample_interval != forecaster.config.predict_sample_interval: raise ValueError( "The prediction sample interval of the backtest config must match the forecaster's predict sample " "interval.", config.prediction_sample_interval, forecaster.config.predict_sample_interval, ) self.config = config self.forecaster = forecaster
[docs] def run( self, ground_truth: VersionedTimeSeriesDataset, predictors: VersionedTimeSeriesDataset, start: datetime | None, end: datetime | None, *, show_progress: bool = True, ) -> TimeSeriesDataset: """Execute the backtesting simulation and return predictions. Runs the complete backtesting process by generating events, processing training and prediction operations in chronological order, and collecting all predictions into a single versioned dataset. Args: ground_truth: Historical target values with timestamps. predictors: Feature data for model predictions. start: Start datetime for backtesting. Uses data minimum if None. end: End datetime for backtesting. Uses data maximum if None. show_progress: Whether to display progress bar during execution. Returns: VersionedTimeSeriesDataset containing all predictions with timestamps and availability information. Empty dataset if no predictions made. """ min_start = cast("pd.Series[pd.Timestamp]", ground_truth.index).min().to_pydatetime() max_end = cast("pd.Series[pd.Timestamp]", ground_truth.index).max().to_pydatetime() # Prepare the input data dataset = VersionedTimeSeriesDataset.concat([ground_truth, predictors], mode="outer") _logger.info("Initialized backtest dataset with %d features", len(dataset.feature_names)) # Prepare backtesting events with batch awareness event_factory = BacktestEventGenerator( start=max(start, min_start) if start else min_start, end=min(end, max_end) if end else max_end, index=ground_truth.index, sample_interval=ground_truth.sample_interval, predict_interval=self.config.predict_interval, train_interval=self.config.train_interval, align_time=self.config.align_time, forecaster_config=self.forecaster.config, ) # Determine batch processing approach if isinstance(self.forecaster, BacktestBatchForecasterMixin): batch_size = self.forecaster.batch_size supports_batching = batch_size is not None and batch_size > 1 else: batch_size = None supports_batching = False _logger.info("Starting the backtest pipeline") prediction_list: list[TimeSeriesDataset] = [] prediction_list.extend( self._process_events( event_factory=event_factory, dataset=dataset, batch_size=batch_size if supports_batching else None, show_progress=show_progress, ) ) _logger.info("Finished backtest pipeline") if not prediction_list: return TimeSeriesDataset( data=pd.DataFrame( { "available_at": pd.Series(dtype="datetime64[ns]"), }, index=pd.DatetimeIndex([]), ), sample_interval=self.config.prediction_sample_interval, ) return TimeSeriesDataset( data=pd.concat([pred.data for pred in prediction_list], axis=0), sample_interval=self.config.prediction_sample_interval, )
def _process_train_event(self, event: BacktestEvent, dataset: VersionedTimeSeriesDataset) -> None: """Process a single training event.""" horizon_dataset = RestrictedHorizonVersionedTimeSeries(dataset=dataset, horizon=event.timestamp) self.forecaster.fit(horizon_dataset) _logger.debug("Processed train event", extra={"event": event}) def _process_single_prediction( self, event: BacktestEvent, dataset: VersionedTimeSeriesDataset ) -> list[TimeSeriesDataset]: """Process a single prediction event. Args: event: Prediction event to process. dataset: Time series data for prediction. Returns: List containing single prediction dataset if successful, empty list otherwise. """ horizon_dataset = RestrictedHorizonVersionedTimeSeries(dataset=dataset, horizon=event.timestamp) prediction = self.forecaster.predict_versioned(horizon_dataset) if prediction is not None: _logger.debug("Processed single prediction") return [prediction] else: # noqa: RET505 - kept for readability _logger.warning("Model interface returned None for prediction", extra={"event": event}) return [] def _process_batch_prediction( self, batch_events: list[BacktestEvent], dataset: VersionedTimeSeriesDataset ) -> list[TimeSeriesDataset]: """Process a batch of prediction events and return valid predictions. Args: batch_events: List of prediction events to process as a batch. dataset: Time series data for predictions. Returns: List of valid prediction datasets, excluding any None results. """ if not batch_events: return [] # Process batch horizon_datasets = [ RestrictedHorizonVersionedTimeSeries(dataset=dataset, horizon=event.timestamp) for event in batch_events ] batch_predictions = cast(BacktestBatchForecasterMixin, self.forecaster).predict_batch_versioned( horizon_datasets ) valid_predictions = [pred for pred in batch_predictions if pred is not None] # Log warnings for None predictions none_count = len(batch_predictions) - len(valid_predictions) if none_count > 0: _logger.warning( "Model interface returned (%d/%d) None predictions in batch.", none_count, len(batch_events) ) _logger.debug("Processed prediction batch (size=%d)", len(batch_events)) return valid_predictions def _process_events( self, event_factory: BacktestEventGenerator, dataset: VersionedTimeSeriesDataset, batch_size: int | None, *, show_progress: bool = True, ) -> list[TimeSeriesDataset]: """Process events using the factory's batching logic. Args: event_factory: Generator for creating backtest events. dataset: Time series data for processing. batch_size: Maximum batch size for predictions, None for single processing. show_progress: Whether to display progress bar. Returns: List of all prediction datasets generated during processing. """ predictions: list[TimeSeriesDataset] = [] # Get total count for progress bar events = list(event_factory.iterate()) total_events = len(events) pbar = tqdm(total=total_events, smoothing=0.0, disable=not show_progress) try: for event_batch in BacktestEventGenerator.iterate_batched(events=events, batch_size=batch_size): if event_batch.is_training: self._process_train_event(event_batch.events[0], dataset) elif event_batch.is_prediction: # Use batch processing if batch_size is enabled, single processing otherwise if batch_size is not None: batch_predictions = self._process_batch_prediction( batch_events=event_batch.events, dataset=dataset ) else: batch_predictions = self._process_single_prediction( event=event_batch.events[0], dataset=dataset ) predictions.extend(batch_predictions) pbar.update(len(event_batch)) finally: pbar.close() return predictions
__all__ = [ "BacktestConfig", "BacktestPipeline", ]