Source code for openstef_meta.models.forecast_combiners.stacking_combiner

# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project <short.term.energy.forecasts@alliander.com>
#
# SPDX-License-Identifier: MPL-2.0
"""Stacking Forecast Combiner.

A meta-regressor per quantile is trained on top of the base forecasters' predictions.
Each quantile gets its own stacking model (e.g., GBLinear or LGBM).
"""

import logging
from functools import partial
from typing import override

import pandas as pd
from pydantic import Field, PrivateAttr

from openstef_core.datasets import ForecastDataset, ForecastInputDataset, TimeSeriesDataset
from openstef_core.datasets.validated_datasets import EnsembleForecastDataset
from openstef_core.exceptions import NotFittedError
from openstef_core.mixins.predictor import HyperParams
from openstef_core.types import Quantile
from openstef_meta.models.forecast_combiners.forecast_combiner import ForecastCombiner
from openstef_meta.utils.datasets import combine_forecast_input_datasets
from openstef_models.explainability.mixins import ContributionsMixin, ExplainableForecaster
from openstef_models.models.forecasting.forecaster import Forecaster

logger = logging.getLogger(__name__)


[docs] class StackingCombiner(ForecastCombiner): """Stacking combiner: one meta-regressor per quantile on top of base forecaster outputs. Accepts a template ``meta_forecaster`` (a fully-configured :class:`Forecaster` instance). During initialisation the template is cloned once per quantile — each clone receives a single quantile while horizons are taken from the combiner's own configuration. """ meta_forecaster: Forecaster = Field( exclude=True, description="Template forecaster cloned per quantile as the stacking meta-forecaster.", ) _is_fitted: bool = PrivateAttr(default=False) _models: dict[Quantile, Forecaster] = PrivateAttr(default_factory=dict[Quantile, Forecaster]) @property @override def hparams(self) -> HyperParams: return self.meta_forecaster.hparams
[docs] def model_post_init(self, _context: object, /) -> None: """Clone the template forecaster once per quantile.""" models: dict[Quantile, Forecaster] = {} for q in self.quantiles: models[q] = self.meta_forecaster.model_copy( update={"quantiles": [q], "horizons": [self.max_horizon]}, ) self._models = models
@staticmethod def _prepare_input( data: EnsembleForecastDataset, quantile: Quantile, additional_features: ForecastInputDataset | None = None, ) -> ForecastInputDataset: input_data = data.get_base_predictions_for_quantile(quantile=quantile) if additional_features is not None: input_data = combine_forecast_input_datasets(input_data=input_data, additional_features=additional_features) return input_data @property @override def is_fitted(self) -> bool: return all(x.is_fitted for x in self._models.values())
[docs] @override def fit( self, data: EnsembleForecastDataset, data_val: EnsembleForecastDataset | None = None, additional_features: ForecastInputDataset | None = None, ) -> None: for q in self.quantiles: input_data = self._prepare_input(data, q, additional_features) target_dropna = partial(pd.DataFrame.dropna, subset=[input_data.target_column]) # pyright: ignore[reportUnknownMemberType] input_data = input_data.pipe_pandas(target_dropna) self._models[q].fit(data=input_data, data_val=None)
[docs] @override def predict( self, data: EnsembleForecastDataset, additional_features: ForecastInputDataset | None = None, ) -> ForecastDataset: if not self.is_fitted: raise NotFittedError(self.__class__.__name__) predictions = [ self._models[q].predict(data=self._prepare_input(data, q, additional_features)).data for q in self.quantiles ] return ForecastDataset(data=pd.concat(predictions, axis=1), sample_interval=data.sample_interval)
[docs] @override def predict_contributions( self, data: EnsembleForecastDataset, additional_features: ForecastInputDataset | None = None, ) -> TimeSeriesDataset: frames: list[pd.DataFrame] = [] for q in self.quantiles: model = self._models[q] if not isinstance(model, ContributionsMixin): msg = f"Model {type(model).__name__} does not support predict_contributions." raise NotImplementedError(msg) frames.append(model.predict_contributions(data=self._prepare_input(data, q, additional_features)).data) contributions = pd.concat(frames, axis=1) target_series = data.target_series if target_series is not None: contributions[data.target_column] = target_series return TimeSeriesDataset(data=contributions, sample_interval=data.sample_interval)
@property @override def feature_importances(self) -> pd.DataFrame: frames = [m.feature_importances for m in self._models.values() if isinstance(m, ExplainableForecaster)] return pd.concat(frames, axis=1) if frames else pd.DataFrame()
__all__ = ["StackingCombiner"]