Source code for openstef_models.models.forecasting.median_forecaster

# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project <short.term.energy.forecasts@alliander.com>
#
# SPDX-License-Identifier: MPL-2.0

"""Median regressor based forecasting models for energy forecasting.

Provides median regression models for multi-quantile energy forecasting.

Note that this is an autoregressive model, meaning that it uses the previous
predictions to predict the next value.

This regressor is good for predicting two types of signals:

- Signals with very slow dynamics compared to the sampling rate, possibly
  with a lot of noise.
- Signals that switch between two or more states, which are random in nature or
  depend on unknown features, but tend to be stable in each state. An example of
  this may be waste heat delivered from an industrial process. Using a median
  over the last few timesteps adds some hysteresis to avoid triggering on noise.

Tips for using this regressor:

- Set the lags to be evenly spaced and at a frequency matching the
  frequency of the input data. For example, if the input data is at 15
  minute intervals, set the lags to be at 15 minute intervals as well.
- Use a small training dataset, since there are no actual parameters to train.
- Set the frequency of the input data index to avoid inferring it. Inference
  might be a problem if we get very small chunks of data in training or validation sets.
- Use only one training horizon, since the regressor will use the same lags for all
  training horizons.
"""

from datetime import timedelta
from typing import ClassVar, override

import numpy as np
import pandas as pd
from pydantic import Field, PrivateAttr

from openstef_core.datasets.validated_datasets import ForecastDataset, ForecastInputDataset, TimeSeriesDataset
from openstef_core.mixins.predictor import HyperParams
from openstef_core.types import LeadTime, Quantile
from openstef_core.utils.pydantic import timedelta_from_isoformat
from openstef_models.explainability.mixins import ContributionsMixin, ExplainableForecaster
from openstef_models.models.forecasting.forecaster import Forecaster


[docs] class MedianForecasterHyperParams(HyperParams): """Hyperparameter configuration for median forecaster.""" primary_lag: timedelta = Field( default=timedelta(days=7), description="Primary lag to use for predictions (default: 7 days for weekly patterns)", ) fallback_lag: timedelta = Field( default=timedelta(days=14), description="Fallback lag to use when primary lag data is unavailable (default: 14 days)", )
[docs] class MedianForecaster(Forecaster, ExplainableForecaster, ContributionsMixin): """Median forecaster using lag features for predictions. This forecaster predicts the median value based on specified lag features. It is particularly useful for signals with slow dynamics or state-switching behavior. Hyperparameters: lags: List of time deltas representing the lag features to use for prediction. These should be aligned with the data sampling frequency. context_window: Time delta representing the context window size for input data. This defines how much historical data is considered for making predictions. """ HyperParams: ClassVar[type[MedianForecasterHyperParams]] = MedianForecasterHyperParams quantiles: list[Quantile] = Field( default=[Quantile(0.5)], description=( "Probability levels for uncertainty estimation. Each quantile represents a confidence level " "(e.g., 0.1 = 10th percentile, 0.5 = median, 0.9 = 90th percentile). " "Models must generate predictions for all specified quantiles." ), min_length=1, max_length=1, ) horizons: list[LeadTime] = Field( default=..., description=( "Lead times for predictions, accounting for data availability and versioning cutoffs. " "Each horizon defines how far ahead the model should predict." ), min_length=1, max_length=1, ) hyperparams: MedianForecasterHyperParams = Field( default_factory=MedianForecasterHyperParams, ) _is_fitted: bool = PrivateAttr(default=False) _feature_names: list[str] = PrivateAttr(default_factory=list[str]) _lags_to_time_deltas: dict[str, timedelta] = PrivateAttr(default_factory=dict[str, timedelta]) _frequency: timedelta = PrivateAttr(default=timedelta(0)) _feature_importances: np.ndarray = PrivateAttr(default_factory=lambda: np.array([])) @property @override def hparams(self) -> MedianForecasterHyperParams: return self.hyperparams @property @override def is_fitted(self) -> bool: return self._is_fitted @property @override def feature_importances(self) -> pd.DataFrame: return pd.DataFrame( data=self._feature_importances, columns=[self.quantiles[0].format()], index=self._feature_names ) @property def frequency(self) -> timedelta: """Retrieve the model input frequency. Returns: The frequency of the model input. """ return self._frequency @staticmethod def _fill_diagonal_with_median(lag_array: np.ndarray, start: int, end: int, median: float) -> np.ndarray | None: # Use the calculated median to fill in future lag values where this prediction would be used as input. # If the start index is beyond the array bounds, no future updates are needed from this step. if start >= lag_array.shape[0]: return lag_array # Ensure the end index does not exceed the array bounds. end = min(end, lag_array.shape[0]) # Get a view of the sub-array where the diagonal needs to be filled. # The slice represents future time steps (rows) and corresponding lag features (columns). # Rows: from 'start' up to (but not including) 'end' # Columns: from 0 up to (but not including) 'end - start' # This selects the part of the array where lag_array[start + k, k] resides for k in range(end - start). view = lag_array[start:end, 0 : (end - start)] # Create a mask for NaNs on the diagonal diagonal_nan_mask = np.isnan(np.diag(view)) # Only update if there are NaNs on the diagonal if np.any(diagonal_nan_mask): # Create a temporary array to hold the new diagonal updated_diagonal = np.diag(view).copy() updated_diagonal[diagonal_nan_mask] = median np.fill_diagonal(view, updated_diagonal) return None
[docs] @override def predict(self, data: ForecastInputDataset) -> ForecastDataset: """Predict the median of the lag features for each time step in the context window. Args: data: The input data for prediction, this should be a pandas dataframe with lag features. Returns: ForecastDataset with predicted median for each time step. If any lag feature is NaN, this will be ignored. If all lag features are NaN, the regressor will return NaN. Raises: AttributeError: If the model is not fitted yet. ValueError: If the input data is missing any of the required lag features. """ if not self.is_fitted: msg = "This MedianForecaster instance is not fitted yet" raise AttributeError(msg) if not data.frequency_matches(data.data.index): # type: ignore msg = ( f"Input data frequency does not match model frequency ({self.frequency}). " "Please ensure the input data index has the correct frequency set." ) raise ValueError(msg) input_data: pd.DataFrame = data.input_data(start=data.forecast_start) # Check that the input data contains the required lag features missing_features = set(self._feature_names) - set(data.feature_names) if missing_features: msg = f"The input data is missing the following lag features: {missing_features}" raise ValueError(msg) # Reindex the input data to ensure there are no gaps in the time series. # This is important for the autoregressive logic that follows. new_index = pd.date_range(input_data.index[0], input_data.index[-1], freq=self._frequency) # Select only the lag feature columns in the specified order. lag_df = input_data.reindex(new_index, fill_value=np.nan)[self._feature_names] # Convert the lag DataFrame to NumPy arrays for faster processing. lag_array = lag_df.to_numpy() # Initialize the prediction array with NaNs. prediction = np.full(lag_array.shape[0], np.nan) # Calculate the time step size based on the model frequency. step_size = self._frequency # Determine the number of steps corresponding to the smallest and largest lags. smallest_lag_steps = int(self._lags_to_time_deltas[self._feature_names[0]] / step_size) largest_lag_steps = int(self._lags_to_time_deltas[self._feature_names[-1]] / step_size) # Iterate through each time step in the reindexed data. for time_step in range(lag_array.shape[0]): # Get the lag features for the current time step. current_lags = lag_array[time_step] # Calculate the median of the available lag features, ignoring NaNs. median = np.nanmedian(current_lags) # type: ignore # If the median calculation resulted in NaN (e.g., all lags were NaN), skip the autoregression step. if not np.isnan(median): # type: ignore median = float(median) # type: ignore else: continue # Store the calculated median in the prediction array. prediction[time_step] = median # Auto-regressive step: update the lag array for future time steps. # Calculate the start and end indices in the future time steps that will be affected. start, end = ( time_step + smallest_lag_steps, time_step + largest_lag_steps + 1, ) self._fill_diagonal_with_median(lag_array, start, end, median) # Convert the prediction array back to a pandas DataFrame using the reindexed time index. prediction_df = pd.DataFrame(prediction, index=lag_df.index.to_numpy(), columns=["median"]) # Reindex the prediction DataFrame back to the original input data index. prediction_df = prediction_df.reindex(input_data.index) return ForecastDataset( data=prediction_df.dropna().rename(columns={"median": self.quantiles[0].format()}), # type: ignore sample_interval=data.sample_interval, forecast_start=data.forecast_start, target_column=data.target_column, )
[docs] @override def fit(self, data: ForecastInputDataset, data_val: ForecastInputDataset | None = None) -> None: """Fit the median forecaster. This regressor does not need any fitting, but it does need to know the feature names of the lag features and the order of these. Lag features are expected to be evenly spaced and match the frequency of the input data. The lag features are expected to be named in the format ``T-<lag_in_minutes>`` or ``T-<lag_in_days>d``. For example, ``T-1min``, ``T-2min``, ``T-3min`` or ``T-1d``, ``T-2d``. Which lag features are used is determined by the feature engineering step. Args: data: The training data containing lag features. data_val: Optional validation data, not used in this regressor. Raises: ValueError: If the input data frequency does not match the model frequency. ValueError: If no lag features are found in the input data. """ self._frequency = data.sample_interval lag_prefix = f"{data.target_column}_lag_" self._feature_names = [ feature_name for feature_name in data.feature_names if feature_name.startswith(lag_prefix) ] if not self._feature_names: msg = f"No lag features found in the input data with prefix '{lag_prefix}'." raise ValueError(msg) self._lags_to_time_deltas = { feature_name: timedelta_from_isoformat(feature_name.replace(lag_prefix, "")) for feature_name in self._feature_names } if not data.frequency_matches(data.data.index): # type: ignore msg = ( f"Training data frequency does not match model frequency ({self.frequency}). " "Please ensure the training data index has the correct frequency set." ) raise ValueError(msg) lag_deltas = sorted(self._lags_to_time_deltas.values()) lag_intervals = [(lag_deltas[i] - lag_deltas[i - 1]) for i in range(1, len(lag_deltas))] if not all(interval == lag_intervals[0] for interval in lag_intervals): msg = ( "Lag features are not evenly spaced. " "Please ensure lag features are evenly spaced and match the data frequency." ) raise ValueError(msg) expected_lag_interval = lag_intervals[0] if expected_lag_interval != self._frequency: msg = ( f"Lag feature interval ({expected_lag_interval}) does not match " f"data frequency ({self._frequency}). " "Please ensure lag features match the data frequency." ) raise ValueError(msg) self._feature_names = sorted(self._feature_names, key=lambda f: self._lags_to_time_deltas[f]) self._feature_importances = np.ones(len(self._feature_names)) / (len(self._feature_names) or 1.0) self._is_fitted = True
[docs] @override def predict_contributions(self, data: ForecastInputDataset) -> TimeSeriesDataset: """Return uniform contributions since MedianForecaster weights all lag features equally.""" input_data = data.input_data(start=data.forecast_start) contribs_df = pd.DataFrame(1.0, index=input_data.index, columns=["bias"]) return TimeSeriesDataset(data=contribs_df, sample_interval=data.sample_interval)