# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project <short.term.energy.forecasts@alliander.com>
#
# SPDX-License-Identifier: MPL-2.0
"""Learned Weights Combiner.
Forecast combiner that uses a classification approach to learn weights for base forecasters.
It learns which forecaster is likely to perform best under different conditions.
The combiner can operate in two modes:
- Hard Selection: Selects the base forecaster with the highest predicted probability for each instance.
- Soft Selection: Uses the predicted probabilities as weights to combine base forecaster predictions.
"""
import logging
from typing import Literal, cast, override
import numpy as np
import pandas as pd
from pydantic import Field, PrivateAttr
from sklearn.base import ClassifierMixin
from sklearn.dummy import DummyClassifier
from sklearn.preprocessing import LabelEncoder
from sklearn.utils.class_weight import compute_sample_weight # type: ignore[import-untyped]
from openstef_core.datasets import ForecastDataset, ForecastInputDataset, TimeSeriesDataset
from openstef_core.datasets.validated_datasets import ENSEMBLE_COLUMN_SEP, EnsembleForecastDataset
from openstef_core.exceptions import InsufficientlyCompleteError, MissingExtraError, NotFittedError
from openstef_core.mixins.predictor import HyperParams
from openstef_core.types import Quantile
from openstef_core.utils.pandas import nan_aware_weighted_mean
from openstef_meta.models.forecast_combiners.forecast_combiner import (
ForecastCombiner,
)
from openstef_meta.utils.datasets import combine_forecast_input_datasets
logger = logging.getLogger(__name__)
[docs]
class LGBMCombinerHyperParams(HyperParams):
"""Hyperparameters for the LGBM gradient-boosted classifier."""
n_estimators: int = Field(default=20, description="Number of boosting rounds.")
n_leaves: int = Field(default=31, description="Maximum number of leaves per tree.")
reg_alpha: float = Field(default=0.0, description="L1 regularization term on weights.")
reg_lambda: float = Field(default=0.0, description="L2 regularization term on weights.")
[docs]
def get_classifier(self) -> ClassifierMixin:
"""Create an LGBM gradient-boosted classifier from these hyperparameters.
Returns:
Configured LGBMClassifier instance.
Raises:
MissingExtraError: If lightgbm is not installed.
"""
try:
from lightgbm import LGBMClassifier # noqa: PLC0415
except ImportError as e:
raise MissingExtraError("lightgbm", "openstef-models") from e
return cast(
ClassifierMixin,
LGBMClassifier(
class_weight="balanced",
n_estimators=self.n_estimators,
num_leaves=self.n_leaves,
reg_alpha=self.reg_alpha,
reg_lambda=self.reg_lambda,
n_jobs=1,
),
)
[docs]
class RFCombinerHyperParams(HyperParams):
"""Hyperparameters for the LGBM random-forest classifier."""
n_estimators: int = Field(default=20, description="Number of trees.")
n_leaves: int = Field(default=31, description="Maximum number of leaves per tree.")
bagging_freq: int = Field(default=1, description="Frequency for bagging.")
bagging_fraction: float = Field(default=0.8, description="Fraction of data per iteration.")
feature_fraction: float = Field(default=1, description="Fraction of features per iteration.")
[docs]
def get_classifier(self) -> ClassifierMixin:
"""Create an LGBM random-forest classifier from these hyperparameters.
Returns:
Configured LGBMClassifier instance in random-forest mode.
Raises:
MissingExtraError: If lightgbm is not installed.
"""
try:
from lightgbm import LGBMClassifier # noqa: PLC0415
except ImportError as e:
raise MissingExtraError("lightgbm", "openstef-models") from e
return cast(
ClassifierMixin,
LGBMClassifier(
boosting_type="rf",
class_weight="balanced",
n_estimators=self.n_estimators,
num_leaves=self.n_leaves,
bagging_freq=self.bagging_freq,
bagging_fraction=self.bagging_fraction,
feature_fraction=self.feature_fraction,
),
)
[docs]
class XGBCombinerHyperParams(HyperParams):
"""Hyperparameters for the XGBoost classifier."""
n_estimators: int = Field(default=20, description="Number of boosting rounds.")
[docs]
def get_classifier(self) -> ClassifierMixin:
"""Create an XGBoost classifier from these hyperparameters.
Returns:
Configured XGBClassifier instance.
Raises:
MissingExtraError: If xgboost is not installed.
"""
try:
from xgboost import XGBClassifier # noqa: PLC0415
except ImportError as e:
raise MissingExtraError("xgboost", "openstef-models") from e
return cast(ClassifierMixin, XGBClassifier(n_estimators=self.n_estimators))
[docs]
class LogisticCombinerHyperParams(HyperParams):
"""Hyperparameters for the logistic regression classifier."""
fit_intercept: bool = Field(default=True, description="Whether to calculate the intercept.")
penalty: Literal["l1", "l2", "elasticnet"] = Field(default="l2", description="Regularization norm.")
c: float = Field(default=1.0, description="Inverse of regularization strength.")
[docs]
def get_classifier(self) -> ClassifierMixin:
"""Create a logistic regression classifier from these hyperparameters.
Returns:
Configured LogisticRegression instance.
"""
from sklearn.linear_model import LogisticRegression # noqa: PLC0415
return LogisticRegression(
class_weight="balanced",
fit_intercept=self.fit_intercept,
penalty=self.penalty,
C=self.c,
)
[docs]
class WeightsCombiner(ForecastCombiner):
"""Combines base forecaster predictions with a classification approach.
A classifier predicts per-timestep model weights. Depending on ``hard_selection``,
the combiner either picks the best forecaster (hard) or blends using predicted
probabilities (soft).
"""
hyperparams: HyperParams = Field(
default_factory=LGBMCombinerHyperParams,
description="Classifier hyperparameters. Must have a get_classifier() method.",
)
@property
@override
def hparams(self) -> HyperParams:
return self.hyperparams
hard_selection: bool = Field(
default=False,
description="If True, select the single best forecaster per timestep; otherwise blend.",
)
_label_encoder: LabelEncoder = PrivateAttr(default_factory=LabelEncoder)
_is_fitted: bool = PrivateAttr(default=False)
_feature_names: list[str] = PrivateAttr(default_factory=list[str])
_models: dict[Quantile, ClassifierMixin] = PrivateAttr(default_factory=dict[Quantile, ClassifierMixin])
[docs]
def model_post_init(self, _context: object, /) -> None:
"""Validate hyperparams and initialize per-quantile classifiers.
Raises:
TypeError: If hyperparams does not have a ``get_classifier()`` method.
"""
if not hasattr(self.hyperparams, "get_classifier"):
msg = f"hyperparams ({type(self.hyperparams).__name__}) must have a get_classifier() method."
raise TypeError(msg)
self._models = {
# One classifier per quantile — optimal forecaster may differ across quantile levels
q: self.hyperparams.get_classifier() # type: ignore[union-attr]
for q in self.quantiles
}
@property
@override
def is_fitted(self) -> bool:
return self._is_fitted
[docs]
@override
def fit(
self,
data: EnsembleForecastDataset,
data_val: EnsembleForecastDataset | None = None,
additional_features: ForecastInputDataset | None = None,
) -> None:
self._label_encoder.fit(data.forecaster_names)
feature_names: list[str] = []
for q in self.quantiles:
base_data = data.get_base_predictions_for_quantile(quantile=q)
labels = self._classify_best_forecaster(base_data, quantile=q)
combined_data = combine_forecast_input_datasets(
input_data=base_data,
additional_features=additional_features,
)
input_data = combined_data.input_data()
# Filter labels to match combined_data index (inner join may drop rows)
labels = labels.loc[input_data.index]
self._validate_labels(labels=labels, quantile=q)
encoded_labels = self._label_encoder.transform(labels)
weights = compute_sample_weight("balanced", encoded_labels) * combined_data.sample_weight_series
self._models[q].fit(X=input_data, y=encoded_labels, sample_weight=weights) # pyright: ignore[reportUnknownMemberType, reportAttributeAccessIssue]
feature_names = list(input_data.columns)
self._feature_names = feature_names
self._is_fitted = True
@staticmethod
def _classify_best_forecaster(data: ForecastInputDataset, quantile: Quantile) -> pd.Series:
"""Compute best-forecaster labels via pinball loss.
For each sample, returns the name of the forecaster with the lowest
pinball loss at the given quantile.
Returns:
Series with the name of the best-performing forecaster per sample.
"""
predictions = data.input_data()
y_true = np.asarray(data.target_series)
def _pinball_loss(preds: pd.Series) -> np.ndarray:
y_pred = np.asarray(preds)
errors = y_true - y_pred
return np.where(errors >= 0, quantile * errors, (quantile - 1) * errors)
losses = predictions.apply(_pinball_loss)
return losses.idxmin(axis=1)
def _validate_labels(self, labels: pd.Series, quantile: Quantile) -> None:
# Fall back to DummyClassifier when one forecaster dominates — sklearn classifiers need ≥2 classes
if len(labels.unique()) == 1:
logger.warning("Quantile %s has only 1 class — switching to DummyClassifier.", quantile.format())
self._models[quantile] = DummyClassifier(strategy="most_frequent")
def _predict_weights(self, base_predictions: pd.DataFrame, quantile: Quantile) -> pd.DataFrame:
model = self._models[quantile]
if isinstance(model, DummyClassifier):
# DummyClassifier has no predict_proba — construct one-hot weights manually
weights_array = pd.DataFrame(0, index=base_predictions.index, columns=self._label_encoder.classes_)
weights_array[self._label_encoder.classes_[0]] = 1.0
else:
weights_array = model.predict_proba(base_predictions) # type: ignore[union-attr]
return pd.DataFrame(weights_array, index=base_predictions.index, columns=self._label_encoder.classes_) # type: ignore[arg-type]
@staticmethod
def _prepare_input_data(
dataset: ForecastInputDataset, additional_features: ForecastInputDataset | None
) -> pd.DataFrame:
df = dataset.input_data(start=dataset.index[0])
if additional_features is not None:
df_a = additional_features.input_data(start=dataset.index[0])
df = pd.concat([df, df_a], axis=1, join="inner")
if df.empty:
msg = "No overlapping timestamps between base predictions and additional features after inner join."
raise InsufficientlyCompleteError(msg)
return df
def _predict_quantile(
self,
dataset: ForecastInputDataset,
additional_features: ForecastInputDataset | None,
quantile: Quantile,
) -> pd.Series:
input_data = self._prepare_input_data(dataset=dataset, additional_features=additional_features)
weights = self._predict_weights(base_predictions=input_data, quantile=quantile)
if self.hard_selection:
# Convert soft probabilities to hard selection: max weight → 1.0, ties distributed equally
is_max: pd.DataFrame = weights.eq(weights.max(axis=1), axis=0) # pyright: ignore[reportUnknownMemberType]
weights = is_max.div(weights.sum(axis=1), axis=0)
# Reindex weights to predictions so that rows without additional_features
# (dropped by _prepare_input_data's inner join) get zero weight.
predictions = dataset.input_data()
weights = weights.reindex(predictions.index, fill_value=0.0)
return nan_aware_weighted_mean(predictions, weights)
[docs]
@override
def predict(
self,
data: EnsembleForecastDataset,
additional_features: ForecastInputDataset | None = None,
) -> ForecastDataset:
if not self.is_fitted:
raise NotFittedError(self.__class__.__name__)
predictions = pd.DataFrame({
q.format(): self._predict_quantile(
dataset=data.get_base_predictions_for_quantile(quantile=q),
additional_features=additional_features,
quantile=q,
)
for q in self.quantiles
})
target_series = data.target_series
if target_series is not None:
predictions[data.target_column] = target_series
return ForecastDataset(
data=predictions,
sample_interval=data.sample_interval,
target_column=data.target_column,
forecast_start=data.forecast_start,
)
[docs]
@override
def predict_contributions(
self,
data: EnsembleForecastDataset,
additional_features: ForecastInputDataset | None = None,
) -> TimeSeriesDataset:
if not self.is_fitted:
raise NotFittedError(self.__class__.__name__)
contribution_list = [
self._contributions_for_quantile(
dataset=data.get_base_predictions_for_quantile(quantile=q),
additional_features=additional_features,
quantile=q,
)
for q in self.quantiles
]
contributions = pd.concat(contribution_list, axis=1)
target_series = data.target_series
if target_series is not None:
contributions[data.target_column] = target_series
return TimeSeriesDataset(data=contributions, sample_interval=data.sample_interval)
def _contributions_for_quantile(
self,
dataset: ForecastInputDataset,
additional_features: ForecastInputDataset | None,
quantile: Quantile,
) -> pd.DataFrame:
input_data = self._prepare_input_data(dataset=dataset, additional_features=additional_features)
weights = self._predict_weights(base_predictions=input_data, quantile=quantile)
weights.columns = [f"{col}{ENSEMBLE_COLUMN_SEP}{quantile.format()}" for col in weights.columns]
return weights
@property
@override
def feature_importances(self) -> pd.DataFrame:
"""Feature importances from the internal classifiers, per quantile."""
importances: dict[str, np.ndarray] = {}
for q, model in self._models.items():
if hasattr(model, "feature_importances_"):
raw = np.array(model.feature_importances_, dtype=float) # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType, reportAttributeAccessIssue]
elif hasattr(model, "coef_"):
raw = np.abs(np.array(model.coef_, dtype=float)).mean(axis=0) # pyright: ignore[reportUnknownMemberType, reportUnknownArgumentType, reportAttributeAccessIssue]
else:
raw = np.ones(len(self._feature_names), dtype=float)
total = raw.sum()
importances[q.format()] = raw / total if total > 0 else raw
return pd.DataFrame(importances, index=self._feature_names)
__all__ = [
"LGBMCombinerHyperParams",
"LogisticCombinerHyperParams",
"RFCombinerHyperParams",
"WeightsCombiner",
"XGBCombinerHyperParams",
]