Source code for openstef_beam.backtesting.backtest_event_generator
# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project <openstef@lfenergy.org>
#
# SPDX-License-Identifier: MPL-2.0
"""Event scheduling engine for realistic backtesting simulations.
Determines when training and prediction events should occur during backtesting,
respecting data availability constraints and operational scheduling requirements.
Acts as the temporal coordinator between data availability and model operations.
"""
from collections.abc import Iterator
from datetime import datetime, time, timedelta
import pandas as pd
from pydantic import Field
from openstef_beam.backtesting.backtest_event import BacktestEvent, BacktestEventBatch
from openstef_beam.backtesting.backtest_forecaster.mixins import BacktestForecasterConfig
from openstef_core.base_model import BaseModel
from openstef_core.utils.datetime import align_datetime_to_time
from openstef_core.utils.itertools import merge_iterators
[docs]
class BacktestEventGenerator(BaseModel):
"""Component for generating ordered sequences of backtest events.
Generates train and predict events based on configured intervals, ensuring that
each event has sufficient context data. Events are ordered chronologically with
train events preceding predict events at the same timestamp.
"""
start: datetime
end: datetime
index: pd.DatetimeIndex = Field(..., description="Timestamps for the prediction target series.")
sample_interval: timedelta = Field(..., description="Interval between timestamps.")
predict_interval: timedelta = Field(..., description="Interval between predictions.")
train_interval: timedelta = Field(..., description="Interval between retrains.")
align_time: time = Field(
..., description="Time alignment for predictions, ensuring they are aligned to this interval."
)
forecaster_config: BacktestForecasterConfig = Field(..., description="Model interface configuration.")
[docs]
def iterate(self) -> Iterator[BacktestEvent]:
"""Creates an ordered iterator of train and predict events.
Combines training and prediction events in chronological order, with
training events preceding prediction events at the same timestamp.
If the model doesn't require training, only prediction events are returned.
Returns:
An iterator of chronologically ordered BacktestEvents.
"""
if not self.forecaster_config.requires_training:
return self._predict_iterator()
train_events = list(self._train_iterator())
if len(train_events) == 0:
return iter([])
trained_timestamp = train_events[0].timestamp
predict_events = [event for event in self._predict_iterator() if event.timestamp >= trained_timestamp]
return merge_iterators(
it1=iter(train_events),
it2=iter(predict_events),
compare=self._cmp_events,
)
@staticmethod
def _cmp_events(a: BacktestEvent, b: BacktestEvent) -> int:
"""Compares two BacktestEvents for ordering.
First orders by timestamp, then by event type (train before predict).
Returns:
-1 if a should come before b, 1 if a should come after b, 0 if equal.
"""
if a.timestamp < b.timestamp:
return -1
if a.timestamp > b.timestamp:
return 1
order = {"train": 0, "predict": 1}
return order[a.type] - order[b.type]
def _predict_iterator(self) -> Iterator[BacktestEvent]:
"""Generates prediction events at configured intervals.
Creates predict events that have sufficient forecast context coverage
and occur at regular intervals as specified by predict_interval.
Yields:
BacktestEvent: Prediction events with sufficient context coverage.
"""
end_time = self.end
current_time = align_datetime_to_time(self.start, self.align_time, mode="ceil")
while current_time <= end_time:
horizon_end = current_time + self.forecaster_config.predict_min_length
if horizon_end > end_time:
break
forecast_context_start = current_time - self.forecaster_config.predict_context_length
forecast_context_end = current_time
forecast_context_coverage = self._calculate_coverage(forecast_context_start, forecast_context_end)
if forecast_context_coverage >= self.forecaster_config.predict_context_min_coverage:
yield BacktestEvent(type="predict", timestamp=current_time)
current_time += self.predict_interval
def _train_iterator(self) -> Iterator[BacktestEvent]:
"""Generates training events at configured intervals.
Creates train events that have sufficient training context coverage
and occur at regular intervals as specified by train_interval.
Yields:
BacktestEvent: Training events with sufficient context coverage.
"""
end_time = self.end
current_time = align_datetime_to_time(self.start, self.align_time, mode="ceil")
while current_time <= end_time:
horizon_end = current_time + self.forecaster_config.predict_min_length
if horizon_end > end_time:
break
training_start = max(
current_time - self.forecaster_config.training_context_length,
self.index.min().to_pydatetime(), # type: ignore[reportUnknownMemberType]
)
training_end = current_time
if training_start == training_end:
current_time += self.train_interval
continue
training_coverage = self._calculate_coverage(training_start, training_end)
if training_coverage >= self.forecaster_config.training_context_min_coverage:
yield BacktestEvent(type="train", timestamp=current_time)
current_time += self.train_interval
def _calculate_coverage(self, start: datetime, end: datetime) -> float:
"""Calculates the data coverage ratio within a time window.
Determines what fraction of expected data points are actually available
in the provided index within the specified time window.
Args:
start: The start of the time window.
end: The end of the time window.
Returns:
The ratio of available data points to expected data points.
"""
num_window_samples = (end - start) / self.sample_interval
coverage = self.index[(self.index >= pd.Timestamp(start)) & (self.index < pd.Timestamp(end))]
return len(coverage) / num_window_samples
[docs]
@staticmethod
def iterate_batched(events: list[BacktestEvent], batch_size: int | None = None) -> Iterator[BacktestEventBatch]:
"""Creates an iterator of batched backtest events for efficient processing.
Groups prediction events into batches up to batch_size, while keeping
training events as individual batches. This provides a clean interface
for batch-aware processing without mixing concerns.
Args:
events: List of BacktestEvent objects to be processed in batches.
batch_size: Maximum number of prediction events per batch.
If None, all events are returned as individual batches.
Yields:
BacktestEventBatch: Batched events ready for processing.
"""
if batch_size is None or batch_size <= 1:
# Return individual events as single-element batches
for event in events:
yield BacktestEventBatch(events=[event])
return
i = 0
while i < len(events):
event = events[i]
if event.type == "train":
# Training events are always processed individually
yield BacktestEventBatch(events=[event])
i += 1
else:
# Collect consecutive prediction events up to batch_size
batch_events: list[BacktestEvent] = []
batch_end = min(i + batch_size, len(events))
for j in range(i, batch_end):
if events[j].type != "predict":
break
batch_events.append(events[j])
if batch_events:
yield BacktestEventBatch(events=batch_events)
i += len(batch_events)
else:
i += 1