# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project <short.term.energy.forecasts@alliander.com>
#
# SPDX-License-Identifier: MPL-2.0
"""Ensemble forecasting model combining multiple base forecasters.
Orchestrates parallel base forecasters whose predictions are aggregated by a
``ForecastCombiner``. Extends ``BaseForecastingModel`` as a sibling of
``ForecastingModel``.
"""
import logging
from datetime import datetime
from functools import partial
from typing import Self, cast, override
import pandas as pd
from pydantic import Field, PrivateAttr, model_validator
from openstef_core.datasets import (
ForecastDataset,
ForecastInputDataset,
TimeSeriesDataset,
)
from openstef_core.datasets.timeseries_dataset import validate_horizons_present
from openstef_core.datasets.validated_datasets import EnsembleForecastDataset
from openstef_core.exceptions import NotFittedError
from openstef_core.mixins import HyperParams, TransformPipeline
from openstef_core.types import LeadTime, Quantile
from openstef_meta.models.forecast_combiners.forecast_combiner import ForecastCombiner
from openstef_models.explainability.mixins import ExplainableForecaster
from openstef_models.models.forecasting.forecaster import Forecaster
from openstef_models.models.forecasting_model import BaseForecastingModel, ModelFitResult, restore_target
logger = logging.getLogger(__name__)
[docs]
class EnsembleModelFitResult(ModelFitResult):
"""Fit result for EnsembleForecastingModel.
Extends ModelFitResult with per-forecaster details. The base class fields
(input_dataset, metrics_*, etc.) represent the combiner's fit results.
"""
forecaster_fit_results: dict[str, ModelFitResult] = Field(description="ModelFitResult for each base forecaster")
[docs]
@override
def metrics_to_flat_dict(self) -> dict[str, float]:
result = super().metrics_to_flat_dict()
for name, child in self.forecaster_fit_results.items():
result.update({f"{name}_{k}": v for k, v in child.metrics_to_flat_dict().items()})
return result
@property
@override
def component_fit_results(self) -> dict[str, ModelFitResult]:
return self.forecaster_fit_results
[docs]
class EnsembleForecastingModel(BaseForecastingModel):
"""Ensemble forecasting pipeline: common preprocessing -> N forecasters -> combiner.
Runs multiple base forecasters in parallel, aggregates their predictions via a
``ForecastCombiner``, and applies shared postprocessing. Extends
``BaseForecastingModel`` as a sibling of ``ForecastingModel`` — not a subclass.
The ``preprocessing`` field (inherited from base) holds the **common preprocessing**
shared across all base forecasters. ``model_specific_preprocessing`` adds
per-forecaster transforms on top.
Invariants:
- fit() must be called before predict()
- All forecaster horizons must be present in the input data
Important:
The ``cutoff_history`` parameter is crucial when using lag-based features.
Set it to exclude incomplete rows from training (e.g. ``timedelta(days=14)``
for a lag-14 transform).
Example:
>>> from openstef_models.models.forecasting.constant_quantile_forecaster import (
... ConstantQuantileForecaster,
... )
>>> from openstef_meta.models.forecast_combiners.learned_weights_combiner import WeightsCombiner
>>> from openstef_core.types import LeadTime
>>> from datetime import timedelta
>>>
>>> forecaster_1 = ConstantQuantileForecaster(
... horizons=[LeadTime.from_string("PT36H")]
... )
>>> forecaster_2 = ConstantQuantileForecaster(
... horizons=[LeadTime.from_string("PT36H")]
... )
>>> combiner = WeightsCombiner(
... horizons=[LeadTime.from_string("PT36H")],
... )
>>> model = EnsembleForecastingModel(
... forecasters={"constant_median": forecaster_1, "constant_median_2": forecaster_2},
... combiner=combiner,
... cutoff_history=timedelta(days=14),
... )
>>> model.fit(training_data) # doctest: +SKIP
>>> forecasts = model.predict(new_data) # doctest: +SKIP
"""
forecasters: dict[str, Forecaster] = Field(
default=...,
description="Named base forecasters whose predictions are combined.",
exclude=True,
)
combiner: ForecastCombiner = Field(
default=...,
description="Combiner that aggregates base forecaster predictions.",
exclude=True,
)
model_specific_preprocessing: dict[str, TransformPipeline[TimeSeriesDataset]] = Field(
default_factory=dict,
description="Per-forecaster preprocessing pipelines applied after common preprocessing.",
exclude=True,
)
combiner_preprocessing: TransformPipeline[TimeSeriesDataset] = Field(
default_factory=TransformPipeline[TimeSeriesDataset],
description="Feature engineering for the forecast combiner.",
exclude=True,
)
model_specific_postprocessing: TransformPipeline[ForecastDataset] = Field(
default_factory=TransformPipeline[ForecastDataset],
description="Per-forecaster postprocessing applied before the combiner sees predictions.",
exclude=True,
)
combiner_postprocessing: TransformPipeline[ForecastDataset] = Field(
default_factory=TransformPipeline[ForecastDataset],
description="Combiner-specific postprocessing applied after shared postprocessing.",
exclude=True,
)
_logger: logging.Logger = PrivateAttr(default=logging.getLogger(__name__))
@model_validator(mode="after")
def _validate_horizons_consistent(self) -> Self:
"""All forecasters and the combiner must share the same horizons list.
Returns:
Validated model instance.
Raises:
ValueError: If forecasters dict is empty or any forecaster's horizons differ from the combiner's.
"""
if not self.forecasters:
msg = "At least one forecaster is required."
raise ValueError(msg)
expected = sorted(self.combiner.horizons)
for name, forecaster in self.forecasters.items():
if sorted(forecaster.horizons) != expected:
msg = (
f"Forecaster '{name}' horizons {forecaster.horizons} "
f"do not match combiner horizons {self.combiner.horizons}"
)
raise ValueError(msg)
return self
@property
def forecaster_configs(self) -> dict[str, Forecaster]:
"""Configuration of each base forecaster, keyed by name."""
return dict(self.forecasters)
@property
@override
def quantiles(self) -> list[Quantile]:
return self.combiner.quantiles
@property
@override
def max_horizon(self) -> LeadTime:
return self.combiner.max_horizon
@property
@override
def hyperparams(self) -> HyperParams:
return self.combiner.hparams
@property
@override
def is_fitted(self) -> bool:
return all(f.is_fitted for f in self.forecasters.values()) and self.combiner.is_fitted
@property
@override
def component_hyperparams(self) -> dict[str, HyperParams]:
return {name: f.hparams for name, f in self.forecasters.items()}
[docs]
@override
def get_explainable_components(self) -> dict[str, ExplainableForecaster]:
components: dict[str, ExplainableForecaster] = {
name: forecaster
for name, forecaster in self.forecasters.items()
if isinstance(forecaster, ExplainableForecaster)
}
# ForecastCombiner is always ExplainableForecaster, but skip if importances are empty
if not self.combiner.feature_importances.empty:
components["combiner"] = self.combiner
return components
@property
def forecaster_names(self) -> list[str]:
"""Returns the names of the underlying forecasters."""
return list(self.forecasters.keys())
[docs]
@override
def fit(
self,
data: TimeSeriesDataset,
data_val: TimeSeriesDataset | None = None,
data_test: TimeSeriesDataset | None = None,
) -> EnsembleModelFitResult:
"""Train all base forecasters and then the combiner.
Args:
data: Historical time series data with features and target values.
data_val: Optional validation data. If provided, splitters are ignored for validation.
data_test: Optional test data. If provided, splitters are ignored for test.
Returns:
FitResult containing training details and metrics.
"""
# Phase 1: fit each base forecaster and collect their in-sample predictions
train_ensemble, val_ensemble, test_ensemble, forecaster_fit_results = self._fit_forecasters(
data=data,
data_val=data_val,
data_test=data_test,
)
# Phase 2: fit the combiner on base forecasters' in-sample predictions
combiner_fit_result = self._fit_combiner(
train_ensemble_dataset=train_ensemble,
val_ensemble_dataset=val_ensemble,
test_ensemble_dataset=test_ensemble,
data=data,
data_val=data_val,
data_test=data_test,
)
return EnsembleModelFitResult(
forecaster_fit_results=forecaster_fit_results,
**combiner_fit_result.model_dump(),
)
@staticmethod
def _combine_datasets(
data: ForecastInputDataset, additional_features: ForecastInputDataset
) -> ForecastInputDataset:
"""Combine Forecaster learner predictions with additional features for ForecastCombiner input.
Args:
data: ForecastInputDataset containing base Forecaster predictions.
additional_features: ForecastInputDataset containing additional features.
Returns:
ForecastInputDataset with combined features.
"""
additional_df = additional_features.data.loc[
:, [col for col in additional_features.data.columns if col not in data.data.columns]
]
# Merge on index to combine datasets
combined_df = data.data.join(additional_df)
return ForecastInputDataset(
data=combined_df,
sample_interval=data.sample_interval,
forecast_start=data.forecast_start,
)
def _transform_combiner_data(self, data: TimeSeriesDataset) -> ForecastInputDataset | None:
# Returns None when no combiner preprocessing is configured, signalling the combiner
# should work without additional features.
if len(self.combiner_preprocessing.transforms) == 0:
return None
combiner_data = self.combiner_preprocessing.transform(data)
return ForecastInputDataset.from_timeseries(combiner_data, target_column=self.target_column)
def _fit_prepare_combiner_data(
self,
data: TimeSeriesDataset,
data_val: TimeSeriesDataset | None = None,
data_test: TimeSeriesDataset | None = None,
) -> tuple[ForecastInputDataset | None, ForecastInputDataset | None, ForecastInputDataset | None]:
# Fits combiner preprocessing on train data and transforms all splits.
# Returns (None, None, None) when no combiner preprocessing is configured.
if len(self.combiner_preprocessing.transforms) == 0:
return None, None, None
self.combiner_preprocessing.fit(data=data)
input_data_train = self.combiner_preprocessing.transform(data)
input_data_val = self.combiner_preprocessing.transform(data_val) if data_val else None
input_data_test = self.combiner_preprocessing.transform(data_test) if data_test else None
input_data_train, input_data_val, input_data_test = self.data_splitter.split_dataset(
data=input_data_train, data_val=input_data_val, data_test=input_data_test, target_column=self.target_column
)
combiner_data = ForecastInputDataset.from_timeseries(input_data_train, target_column=self.target_column)
combiner_data_val = (
ForecastInputDataset.from_timeseries(input_data_val, target_column=self.target_column)
if input_data_val
else None
)
combiner_data_test = (
ForecastInputDataset.from_timeseries(input_data_test, target_column=self.target_column)
if input_data_test
else None
)
return combiner_data, combiner_data_val, combiner_data_test
def _fit_forecasters(
self,
data: TimeSeriesDataset,
data_val: TimeSeriesDataset | None = None,
data_test: TimeSeriesDataset | None = None,
) -> tuple[
EnsembleForecastDataset,
EnsembleForecastDataset | None,
EnsembleForecastDataset | None,
dict[str, ModelFitResult],
]:
# Fits common + per-forecaster preprocessing, trains each forecaster,
# and bundles their in-sample predictions into EnsembleForecastDatasets.
predictions_train: dict[str, ForecastDataset] = {}
predictions_val: dict[str, ForecastDataset | None] = {}
predictions_test: dict[str, ForecastDataset | None] = {}
results: dict[str, ModelFitResult] = {}
# Fit the feature engineering transforms
self.preprocessing.fit(data=data)
data_transformed = self.preprocessing.transform(data=data)
# Fit per-forecaster transforms on the common-preprocessed output (not raw data)
for name in self.model_specific_preprocessing:
self.model_specific_preprocessing[name].fit(data=data_transformed)
logger.debug("Completed fitting preprocessing pipelines.")
# Fit the forecasters
for name in self.forecasters:
logger.debug("Fitting Forecaster '%s'.", name)
predictions_train[name], predictions_val[name], predictions_test[name], results[name] = (
self._fit_forecaster(
data=data,
data_val=data_val,
data_test=data_test,
forecaster_name=name,
)
)
# Attach original (unsplit) target so the combiner can compute loss across all timesteps
train_ensemble = EnsembleForecastDataset.from_forecast_datasets(
predictions_train, target_series=data.data[self.target_column]
)
if all(isinstance(v, ForecastDataset) for v in predictions_val.values()):
val_ensemble = EnsembleForecastDataset.from_forecast_datasets(
{k: v for k, v in predictions_val.items() if v is not None},
target_series=data.data[self.target_column],
)
else:
val_ensemble = None
if all(isinstance(v, ForecastDataset) for v in predictions_test.values()):
test_ensemble = EnsembleForecastDataset.from_forecast_datasets(
{k: v for k, v in predictions_test.items() if v is not None},
target_series=data.data[self.target_column],
)
else:
test_ensemble = None
return train_ensemble, val_ensemble, test_ensemble, results
def _fit_forecaster(
self,
data: TimeSeriesDataset,
data_val: TimeSeriesDataset | None = None,
data_test: TimeSeriesDataset | None = None,
forecaster_name: str = "",
) -> tuple[
ForecastDataset,
ForecastDataset | None,
ForecastDataset | None,
ModelFitResult,
]:
"""Train the forecaster on the provided dataset.
Args:
data: Historical time series data with features and target values.
data_val: Optional validation data.
data_test: Optional test data.
forecaster_name: Name of the forecaster to train.
Returns:
ForecastDataset containing the trained forecaster's predictions.
"""
forecaster = self.forecasters[forecaster_name]
validate_horizons_present(data, forecaster.horizons)
# Transform and split input data
input_data_train = self.prepare_forecaster_input(data=data, forecaster_name=forecaster_name)
input_data_val = (
self.prepare_forecaster_input(data=data_val, forecaster_name=forecaster_name) if data_val else None
)
input_data_test = (
self.prepare_forecaster_input(data=data_test, forecaster_name=forecaster_name) if data_test else None
)
# Drop target column nan's from training data. One can not train on missing targets.
target_dropna = partial(pd.DataFrame.dropna, subset=[self.target_column]) # pyright: ignore[reportUnknownMemberType]
input_data_train = input_data_train.pipe_pandas(target_dropna)
input_data_val = input_data_val.pipe_pandas(target_dropna) if input_data_val else None
input_data_test = input_data_test.pipe_pandas(target_dropna) if input_data_test else None
# Transform the input data to a valid forecast input and split into train/val/test
input_data_train, input_data_val, input_data_test = self.data_splitter.split_dataset(
data=input_data_train, data_val=input_data_val, data_test=input_data_test, target_column=self.target_column
)
# Fit the model
logger.debug("Started fitting forecaster '%s'.", forecaster_name)
forecaster.fit(data=input_data_train, data_val=input_data_val)
logger.debug("Completed fitting forecaster '%s'.", forecaster_name)
prediction_train = self._predict_forecaster(input_data=input_data_train, forecaster_name=forecaster_name)
metrics_train = self._calculate_score(prediction=prediction_train)
if input_data_val is not None:
prediction_val = self._predict_forecaster(input_data=input_data_val, forecaster_name=forecaster_name)
metrics_val = self._calculate_score(prediction=prediction_val)
else:
prediction_val = None
metrics_val = None
if input_data_test is not None:
prediction_test = self._predict_forecaster(input_data=input_data_test, forecaster_name=forecaster_name)
metrics_test = self._calculate_score(prediction=prediction_test)
else:
prediction_test = None
metrics_test = None
result = ModelFitResult(
input_dataset=input_data_train,
input_data_train=input_data_train,
input_data_val=input_data_val,
input_data_test=input_data_test,
metrics_train=metrics_train,
metrics_val=metrics_val,
metrics_test=metrics_test,
metrics_full=metrics_train,
)
return prediction_train, prediction_val, prediction_test, result
def _predict_forecaster(self, input_data: ForecastInputDataset, forecaster_name: str) -> ForecastDataset:
logger.debug("Predicting forecaster '%s'.", forecaster_name)
prediction_raw = self.forecasters[forecaster_name].predict(data=input_data)
prediction = restore_target(
dataset=prediction_raw, original_dataset=input_data, target_column=self.target_column
)
prediction = self.model_specific_postprocessing.transform(prediction)
return self.postprocessing.transform(prediction)
def _predict_forecasters(
self,
data: TimeSeriesDataset,
forecast_start: datetime | None = None,
) -> EnsembleForecastDataset:
predictions: dict[str, ForecastDataset] = {}
for name in self.forecasters:
logger.debug("Generating predictions for forecaster '%s'.", name)
input_data = self.prepare_forecaster_input(data=data, forecast_start=forecast_start, forecaster_name=name)
predictions[name] = self._predict_forecaster(
input_data=input_data,
forecaster_name=name,
)
return EnsembleForecastDataset.from_forecast_datasets(predictions, target_series=data.data[self.target_column])
def _predict_transform_combiner(
self, ensemble_dataset: EnsembleForecastDataset, original_data: TimeSeriesDataset
) -> ForecastDataset:
logger.debug("Predicting combiner.")
features = self._transform_combiner_data(data=original_data)
return self._predict_combiner(ensemble_dataset, features)
def _predict_combiner(
self,
ensemble_dataset: EnsembleForecastDataset,
features: ForecastInputDataset | None,
) -> ForecastDataset:
logger.debug("Predicting combiner.")
prediction_raw = self.combiner.predict(ensemble_dataset, additional_features=features)
prediction = restore_target(
dataset=prediction_raw, original_dataset=ensemble_dataset, target_column=self.target_column
)
prediction = self.combiner_postprocessing.transform(prediction)
return self.postprocessing.transform(prediction)
def _fit_combiner(
self,
data: TimeSeriesDataset,
train_ensemble_dataset: EnsembleForecastDataset,
data_val: TimeSeriesDataset | None = None,
data_test: TimeSeriesDataset | None = None,
val_ensemble_dataset: EnsembleForecastDataset | None = None,
test_ensemble_dataset: EnsembleForecastDataset | None = None,
) -> ModelFitResult:
# Prepare additional features for the combiner (e.g. sample weights) — split separately from ensemble data
features_train, features_val, features_test = self._fit_prepare_combiner_data(
data=data, data_val=data_val, data_test=data_test
)
logger.debug("Fitting combiner.")
self.combiner.fit(
data=train_ensemble_dataset, data_val=val_ensemble_dataset, additional_features=features_train
)
# Fit combiner postprocessing on training predictions
prediction_raw = self.combiner.predict(train_ensemble_dataset, additional_features=features_train)
prediction_raw = restore_target(
dataset=prediction_raw, original_dataset=train_ensemble_dataset, target_column=self.target_column
)
self.combiner_postprocessing.fit_transform(prediction_raw)
prediction_train = self._predict_combiner(train_ensemble_dataset, features=features_train)
metrics_train = self._calculate_score(prediction=prediction_train)
if val_ensemble_dataset is not None:
prediction_val = self._predict_combiner(val_ensemble_dataset, features=features_val)
metrics_val = self._calculate_score(prediction=prediction_val)
else:
prediction_val = None
metrics_val = None
if test_ensemble_dataset is not None:
prediction_test = self._predict_combiner(test_ensemble_dataset, features=features_test)
metrics_test = self._calculate_score(prediction=prediction_test)
else:
prediction_test = None
metrics_test = None
return ModelFitResult(
input_dataset=train_ensemble_dataset,
# ModelFitResult expects ForecastInputDataset; use first quantile as a representative slice
input_data_train=train_ensemble_dataset.get_base_predictions_for_quantile(quantile=self.quantiles[0]),
input_data_val=val_ensemble_dataset.get_base_predictions_for_quantile(quantile=self.quantiles[0])
if val_ensemble_dataset
else None,
input_data_test=test_ensemble_dataset.get_base_predictions_for_quantile(quantile=self.quantiles[0])
if test_ensemble_dataset
else None,
metrics_train=metrics_train,
metrics_val=metrics_val,
metrics_test=metrics_test,
metrics_full=metrics_train,
)
def _predict_contributions_combiner(
self, ensemble_dataset: EnsembleForecastDataset, original_data: TimeSeriesDataset
) -> TimeSeriesDataset:
features = self._transform_combiner_data(data=original_data)
return self.combiner.predict_contributions(ensemble_dataset, additional_features=features)
[docs]
@override
def predict(self, data: TimeSeriesDataset, forecast_start: datetime | None = None) -> ForecastDataset:
"""Generate forecasts for the provided dataset.
Args:
data: Input time series dataset for prediction.
forecast_start: Optional start time for forecasts.
Returns:
ForecastDataset containing the generated forecasts.
Raises:
NotFittedError: If the model has not been fitted yet.
"""
if not self.is_fitted:
raise NotFittedError(self.__class__.__name__)
logger.debug("Generating predictions.")
ensemble_predictions = self._predict_forecasters(data=data, forecast_start=forecast_start)
# Predict and restore target column
return self._predict_transform_combiner(
ensemble_dataset=ensemble_predictions,
original_data=data,
)
[docs]
@override
def predict_contributions(
self,
data: TimeSeriesDataset,
forecast_start: datetime | None = None,
) -> TimeSeriesDataset:
"""Compute per-model contributions for the ensemble prediction.
Args:
data: Input time series dataset.
forecast_start: Optional start time for forecasts.
Returns:
TimeSeriesDataset where each column is a base model's contribution.
Raises:
NotFittedError: If the model has not been fitted yet.
"""
if not self.is_fitted:
raise NotFittedError(self.__class__.__name__)
ensemble_predictions = self._predict_forecasters(data=data, forecast_start=forecast_start)
return self._predict_contributions_combiner(
ensemble_dataset=ensemble_predictions,
original_data=data,
)
__all__ = ["EnsembleForecastingModel", "EnsembleModelFitResult", "ModelFitResult"]