# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project <openstef@lfenergy.org>
#
# SPDX-License-Identifier: MPL-2.0
"""Transform for dimensionality reduction in time series data.
This module provides dimensionality reduction functionality. With a choice of various methods
from scikit-learn to reduce the number of features in time series datasets.
"""
from typing import TYPE_CHECKING, Any, Literal, override
import pandas as pd
from pydantic import Field, PrivateAttr
from sklearn.decomposition import PCA, FactorAnalysis, FastICA, KernelPCA
from openstef_core.base_model import BaseConfig
from openstef_core.datasets.timeseries_dataset import TimeSeriesDataset
from openstef_core.exceptions import NotFittedError
from openstef_core.transforms import TimeSeriesTransform
from openstef_models.utils.feature_selection import FeatureSelection
if TYPE_CHECKING:
import numpy as np
[docs]
class DimensionalityReducer(BaseConfig, TimeSeriesTransform):
"""Reduce the dimensionality of a given set of features.
Available methods include:
- PCA: linear dimensionality reduction into orthogonal components.
- Factor analysis: linear dimensionality reduction models observed variables as latent factors + Gaussian noise.
- FastICA: linear dimensionality reduction that maximizes statistical independence among components.
- KernelPCA: non-linear dimensionality reduction using rbf kernel.
Example:
>>> import pandas as pd
>>> from datetime import timedelta
>>> from openstef_core.datasets import TimeSeriesDataset
>>> from openstef_models.transforms.general import DimensionalityReducer
>>> # Create sample dataset
>>> data = pd.DataFrame({
... 'load': [100, 120, 110, 130, 125],
... 'feature1': [1.0, 2.0, 1.5, 2.5, 2.0],
... 'feature2': [1.0, 2.0, 1.5, 2.5, 2.0],
... 'feature3': [5.0, 11.0, 8.0, 2.0, 11.0]
... }, index=pd.date_range('2025-01-01', periods=5, freq='1h'))
>>> dataset = TimeSeriesDataset(data, timedelta(hours=1))
>>> # Initialize and apply transform
>>> from openstef_models.utils.feature_selection import FeatureSelection
>>> dim_reducer = DimensionalityReducer(
... selection=FeatureSelection(include={'feature1', 'feature2', 'feature3'}),
... method="pca",
... n_components=2,
... random_state=1234
... )
>>> dim_reducer.fit(dataset)
>>> transformed_dataset = dim_reducer.transform(dataset)
>>> transformed_dataset.data.head().round(3)
component_1 component_2 load
timestamp
2025-01-01 00:00:00 -2.383 -1.166 100
2025-01-01 01:00:00 3.596 0.335 120
2025-01-01 02:00:00 0.606 -0.416 110
2025-01-01 03:00:00 -5.414 0.912 130
2025-01-01 04:00:00 3.596 0.335 125
"""
selection: FeatureSelection = Field(
default=FeatureSelection.ALL,
description="Features to apply dimensionality reduction to.",
)
method: Literal["pca", "factor_analysis", "fastica", "kernel_pca"] = Field(
default="pca", description="Dimensionality reduction method to use."
)
n_components: int = Field(default=2, description="Desired nr of components after reduction.")
random_state: int | None = Field(default=42, description="Random state for reproducibility.")
_dimensionality_reducer: PCA | FactorAnalysis | FastICA | KernelPCA = PrivateAttr()
_is_fitted: bool = PrivateAttr(default=False)
# Method-specific parameters
max_iter: int = Field(default=1000, description="Maximum number of iterations for Factor Analysis and FastICA.")
@property
@override
def is_fitted(self) -> bool:
return self._is_fitted
[docs]
@override
def fit(self, data: TimeSeriesDataset) -> None:
features = self.selection.resolve(data.feature_names)
self._dimensionality_reducer.fit(data.data[features]) # type: ignore[reportUnknownMemberType]
self._is_fitted = True
[docs]
@override
def model_post_init(self, context: Any) -> None:
if self.method == "pca":
self._dimensionality_reducer = PCA(n_components=self.n_components, random_state=self.random_state)
elif self.method == "factor_analysis":
self._dimensionality_reducer = FactorAnalysis(
n_components=self.n_components, max_iter=self.max_iter, random_state=self.random_state
)
elif self.method == "fastica":
self._dimensionality_reducer = FastICA(
n_components=self.n_components, max_iter=self.max_iter, random_state=self.random_state
)
elif self.method == "kernel_pca":
self._dimensionality_reducer = KernelPCA(
n_components=self.n_components, kernel="rbf", random_state=self.random_state
) # rbf for non-linear
[docs]
@override
def features_added(self) -> list[str]:
return [f"component_{i + 1}" for i in range(self.n_components)]