Source code for openstef_core.datasets.versioned_timeseries_dataset

# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project <openstef@lfenergy.org>
#
# SPDX-License-Identifier: MPL-2.0

"""Versioned time series dataset for efficient multi-part composition."""

import functools
import json
from collections.abc import Sequence
from datetime import datetime, timedelta
from typing import Literal, Self, cast, override

import pandas as pd
from pydantic import FilePath

from openstef_core.datasets.mixins import DatasetMixin, TimeSeriesMixin
from openstef_core.datasets.timeseries_dataset import TimeSeriesDataset
from openstef_core.datasets.validation import validate_disjoint_columns, validate_same_sample_intervals
from openstef_core.exceptions import TimeSeriesValidationError
from openstef_core.types import AvailableAt, LeadTime
from openstef_core.utils import timedelta_from_isoformat
from openstef_core.utils.pandas import combine_timeseries_indexes, unsafe_sorted_range_slice_idxs

type ConcatMode = Literal["left", "outer", "inner"]


[docs] class VersionedTimeSeriesDataset(TimeSeriesMixin, DatasetMixin): """A versioned time series dataset composed of multiple data parts. This class combines multiple TimeSeriesDataset instances into a unified dataset that tracks data availability over time. It provides methods to filter datasets by time ranges, availability constraints, and lead times, as well as select specific versions of the data for point-in-time reconstruction. The dataset is particularly useful for realistic backtesting scenarios where data arrives with delays or gets revised over time. Key motivation: This architecture solves the O(n²) space complexity problem that occurs when concatenating DataFrames with misaligned (timestamp, available_at) pairs. Instead of immediately combining data, it uses lazy composition that delays actual DataFrame concatenation until select_version() is called. Attributes: data_parts: List of TimeSeriesDataset instances that compose this dataset. Example: Create a versioned dataset by combining multiple data parts: >>> import pandas as pd >>> from datetime import datetime, timedelta >>> >>> # Create weather data part >>> weather_data = pd.DataFrame({ ... 'temperature': [20.5], ... 'available_at': [datetime(2025, 1, 1, 16, 0)] ... }, index=pd.DatetimeIndex([datetime(2025, 1, 1, 10, 0)])) >>> weather_part = TimeSeriesDataset(weather_data, timedelta(hours=1)) >>> >>> # Combine into versioned dataset >>> dataset = VersionedTimeSeriesDataset([weather_part]) >>> dataset.is_versioned True Note: All data parts must have identical sample intervals and disjoint feature sets. The final dataset index is the union of all part indices, enabling flexible composition of data sources with different coverage periods. """ data_parts: list[TimeSeriesDataset] _index: pd.DatetimeIndex _sample_interval: timedelta _feature_names: list[str]
[docs] def __init__( self, data_parts: list[TimeSeriesDataset], *, index: pd.DatetimeIndex | None = None, ) -> None: """Initialize a versioned time series dataset from multiple parts. Args: data_parts: List of TimeSeriesDataset instances to combine. Must have identical sample intervals and disjoint feature sets. index: Optional explicit index for the combined dataset. If not provided, the union of all part indices will be used. Raises: TimeSeriesValidationError: If no data parts provided or validation fails. """ if not data_parts: raise TimeSeriesValidationError("At least one data part must be provided.") if not all(part.is_versioned for part in data_parts): raise TimeSeriesValidationError("All data parts must be versioned datasets.") self._sample_interval = validate_same_sample_intervals(datasets=data_parts) self._feature_names = validate_disjoint_columns(datasets=data_parts) self._index = ( index if index is not None else combine_timeseries_indexes(indexes=[part.index for part in data_parts]) ) self.data_parts = data_parts
@property @override def index(self) -> pd.DatetimeIndex: return self._index @property @override def sample_interval(self) -> timedelta: return self._sample_interval @property @override def feature_names(self) -> list[str]: return self._feature_names @property @override def is_versioned(self) -> bool: return True def _copy_with_data(self, data_parts: list[TimeSeriesDataset], index: pd.DatetimeIndex | None = None) -> Self: # Fast way to copy self with new data and skipping validation since invariants are preserved. new_instance = object.__new__(self.__class__) new_instance.__dict__.update(self.__dict__) new_instance.data_parts = data_parts new_instance._index = index if index is not None else self._index # noqa: SLF001 return new_instance
[docs] @override def filter_by_range(self, start: datetime | None = None, end: datetime | None = None) -> Self: if start is None and end is None: return self start_idx, end_idx = unsafe_sorted_range_slice_idxs(data=cast(pd.Series, self.index), start=start, end=end) index = self.index[start_idx:end_idx] data_parts = [part.filter_by_range(start, end) for part in self.data_parts] return self._copy_with_data(data_parts=data_parts, index=index)
[docs] @override def filter_by_available_before(self, available_before: datetime) -> Self: data_parts = [part.filter_by_available_before(available_before) for part in self.data_parts] return self._copy_with_data(data_parts=data_parts)
[docs] @override def filter_by_available_at(self, available_at: AvailableAt) -> Self: data_parts = [part.filter_by_available_at(available_at) for part in self.data_parts] return self._copy_with_data(data_parts=data_parts)
[docs] @override def filter_by_lead_time(self, lead_time: LeadTime) -> Self: data_parts = [part.filter_by_lead_time(lead_time) for part in self.data_parts] return self._copy_with_data(data_parts=data_parts)
[docs] @override def select_version(self) -> TimeSeriesDataset: selected_parts = [part.select_version().data for part in self.data_parts] combined_data = pd.concat(selected_parts, axis=1).reindex(self.index) return TimeSeriesDataset(data=combined_data, sample_interval=self.sample_interval)
[docs] @override def to_parquet(self, path: FilePath) -> None: parts_df = [part.to_pandas() for part in self.data_parts] parts_metadata = [{**part_df.attrs, "columns": part_df.columns.tolist()} for part_df in parts_df] combined_data = pd.concat([part.data.assign(part_id=i) for i, part in enumerate(self.data_parts)], axis=0) combined_data.attrs["parts"] = json.dumps({"parts": parts_metadata}) combined_data.to_parquet(path=path)
[docs] @override @classmethod def read_parquet( cls, path: FilePath, *, sample_interval: timedelta | None = None, timestamp_column: str = "timestamp", available_at_column: str = "available_at", horizon_column: str = "horizon", ) -> Self: df = pd.read_parquet(path=path) # type: ignore if "parts" in df.attrs: parts_metadata = json.loads(df.attrs.get("parts", "{}")).get("parts", []) if len(parts_metadata) == 0: raise TimeSeriesValidationError("No data parts found in the parquet file.") parts: list[TimeSeriesDataset] = [ TimeSeriesDataset( data=df.loc[df.part_id == i, part_info["columns"]], sample_interval=timedelta_from_isoformat(part_info.get("sample_interval", "PT1H")), ) for i, part_info in enumerate(parts_metadata) ] else: part = TimeSeriesDataset.read_parquet( path=path, sample_interval=sample_interval, timestamp_column=timestamp_column, available_at_column=available_at_column, horizon_column=horizon_column, ) if not part.is_versioned and not part._version_column != part.available_at_column: # noqa: SLF001 raise TimeSeriesValidationError( "Parquet file does not contain versioned data. Use TimeSeriesDataset.read_parquet() instead." ) parts = [part] return cls(data_parts=parts)
[docs] @classmethod def concat(cls, datasets: Sequence[Self], mode: ConcatMode) -> Self: """Concatenate multiple versioned datasets into a single dataset. Combines multiple VersionedTimeSeriesDataset instances using the specified concatenation mode. Supports different strategies for handling overlapping time indices across datasets. This method is useful when you have data from different sources or time periods that need to be combined while preserving their versioning information. For example, combining weather data from different providers or merging historical data with recent updates. Args: datasets: Sequence of VersionedTimeSeriesDataset instances to concatenate. Must contain at least one dataset. mode: Concatenation mode determining how to handle overlapping indices: - "left": Use indices from the first dataset only - "outer": Union of all indices across datasets - "inner": Intersection of all indices across datasets Returns: New VersionedTimeSeriesDataset containing all data parts from input datasets. Raises: TimeSeriesValidationError: If no datasets are provided for concatenation. """ if not datasets: raise TimeSeriesValidationError("At least one dataset must be provided for concatenation.") data_parts = [part for dataset in datasets for part in dataset.data_parts] if mode == "outer" or len(datasets) == 1: return cls(data_parts=data_parts) if mode == "left": index = datasets[0].index elif mode == "inner": index = functools.reduce(lambda x, y: x.intersection(y), [part.index.unique() for part in data_parts]) return cls( data_parts=[ TimeSeriesDataset(data=part.data.loc[part.index.isin(index)]) # pyright: ignore[reportUnknownMemberType] for part in data_parts ], index=index, )
[docs] @classmethod def from_dataframe( cls, data: pd.DataFrame, sample_interval: timedelta, *, timestamp_column: str = "timestamp", available_at_column: str = "available_at", ) -> Self: """Create a VersionedTimeSeriesDataset from a single DataFrame. Convenience constructor for creating a versioned dataset from a single DataFrame containing all features. Args: data: DataFrame containing versioned time series data with timestamp and available_at columns. sample_interval: The regular interval between consecutive data points. available_at_column: Name of the column indicating when data became available. Default is 'available_at'. timestamp_column: Name of the column indicating the timestamps of the data. Default is 'timestamp'. Returns: New VersionedTimeSeriesDataset instance containing the data. Example: Create dataset from a single DataFrame: >>> import pandas as pd >>> from datetime import datetime, timedelta >>> data = pd.DataFrame({ ... 'available_at': [datetime.fromisoformat('2025-01-01T10:05:00'), ... datetime.fromisoformat('2025-01-01T10:20:00')], ... 'load': [100.0, 120.0], ... 'temperature': [20.0, 22.0] ... }, index=pd.DatetimeIndex([datetime.fromisoformat('2025-01-01T10:00:00'), ... datetime.fromisoformat('2025-01-01T10:15:00')], name='timestamp')) >>> dataset = VersionedTimeSeriesDataset.from_dataframe(data, timedelta(minutes=15)) >>> sorted(dataset.feature_names) ['load', 'temperature'] Note: This is equivalent to creating a TimeSeriesDataset and then wrapping it in a VersionedTimeSeriesDataset, but more convenient for simple cases. """ if not isinstance(data.index, pd.DatetimeIndex) and timestamp_column in data.columns: # Backwards compatibility: datasets with explicit timestamp column data = data.set_index(timestamp_column) return cls( data_parts=[ TimeSeriesDataset( data=data, sample_interval=sample_interval, available_at_column=available_at_column, ) ] )
[docs] def to_horizons(self, horizons: list[LeadTime]) -> TimeSeriesDataset: """Convert versioned dataset to horizon-based format for multiple lead times. Selects data for each specified horizon, adds a horizon column, and combines into a single TimeSeriesDataset. Useful for creating multi-horizon training data. Returns: TimeSeriesDataset with horizon column indicating forecast lead time. """ horizon_dfs = [ self.filter_by_lead_time(lead_time=horizon).select_version().data.assign(horizon=horizon.value) for horizon in horizons ] return TimeSeriesDataset( data=pd.concat(objs=horizon_dfs, axis=0), sample_interval=self.sample_interval, )