Source code for openstef_core.utils.pandas
# SPDX-FileCopyrightText: 2025 Contributors to the OpenSTEF project <openstef@lfenergy.org>
#
# SPDX-License-Identifier: MPL-2.0
"""Pandas utility functions for time series data processing.
This module provides utility functions for working with pandas time series data.
"""
import functools
from collections.abc import Callable, Sequence
from datetime import datetime
from typing import cast
import pandas as pd
[docs]
def unsafe_sorted_range_slice_idxs(
data: pd.Series | pd.Index, start: datetime | pd.Timestamp | None, end: datetime | pd.Timestamp | None
) -> tuple[int, int]:
"""Get sorted slice indices for a datetime range.
Efficiently finds the start and end indices for slicing a sorted datetime series
within a specified time range. Uses binary search for optimal performance on
large datasets.
This function is particularly useful when working with large time series datasets
where you need to quickly extract data for specific time windows without
iterating through all data points.
Args:
data: Sorted pandas Series with datetime index or values to search.
start: Start of the time range (inclusive). If None, starts from beginning.
end: End of the time range (exclusive). If None, goes to the end.
Returns:
Tuple of (start_index, end_index) for slicing the data series.
Note:
This function assumes that the input series is sorted in ascending order.
It does not perform any checks to verify this, so it's up to the caller to
ensure that this invariant holds true.
"""
start_idx = data.searchsorted(start, side="left") if start else 0
end_idx = data.searchsorted(end, side="left") if end else len(data)
return int(start_idx), int(end_idx)
[docs]
def normalize_to_unit_sum(df: pd.DataFrame) -> pd.DataFrame:
"""Normalize each column so absolute values sum to 1.0.
Pipe-compatible: ``df.pipe(normalize_to_unit_sum)``.
Columns that sum to zero are left as zeros (no NaN).
Returns:
DataFrame with the same shape, each column normalized to unit sum.
"""
abs_values = df.abs()
totals = abs_values.sum(axis=0).replace(to_replace=0, value=1.0) # pyright: ignore[reportUnknownMemberType]
return abs_values / totals
[docs]
def combine_timeseries_indexes(indexes: Sequence[pd.DatetimeIndex]) -> pd.DatetimeIndex:
"""Combine multiple datetime indexes into a single sorted index.
Merges several pandas DatetimeIndex objects into one, ensuring that the
resulting index is sorted and contains no duplicate timestamps.
Args:
indexes: Sequence of pandas DatetimeIndex objects to combine.
Returns:
A single pandas DatetimeIndex containing all unique timestamps from the input indexes, sorted in ascending
order.
"""
if not indexes:
return pd.DatetimeIndex([])
union_fn = cast(
Callable[[pd.DatetimeIndex, pd.DatetimeIndex], pd.DatetimeIndex],
functools.partial(pd.DatetimeIndex.union, sort=False),
)
index_raw = functools.reduce(union_fn, indexes)
return index_raw.unique().sort_values(ascending=True) # pyright: ignore[reportUnknownVariableType, reportUnknownMemberType]
[docs]
def nan_aware_weighted_mean(values: pd.DataFrame, weights: pd.DataFrame) -> "pd.Series[float]":
"""Weighted mean that redistributes NaN values' weights proportionally.
For each row, weights corresponding to NaN values are zeroed out and the
remaining weights are used to compute the weighted sum, normalized by their
total. If all values in a row are NaN, the result is 0.
Args:
values: DataFrame of values (may contain NaN).
weights: DataFrame of non-negative weights, aligned with values.
Returns:
Series with the weighted mean for each row.
"""
valid_weights = weights.where(values.notna(), 0)
available = cast("pd.Series[float]", valid_weights.sum(axis=1).replace(0, 1)) # pyright: ignore[reportUnknownMemberType]
weighted_sum = cast("pd.Series[float]", values.fillna(0).mul(valid_weights).sum(axis=1)) # pyright: ignore[reportUnknownMemberType]
return weighted_sum / available