Building a Custom Pipeline#

The create_forecasting_workflow preset handles pipeline assembly automatically. When you need full control — custom transforms, different feature engineering, or non-standard postprocessing — you can build a ForecastingModel from individual components.

What you’ll learn:

  • Assemble preprocessing, forecaster, and postprocessing into a pipeline

  • Select and configure individual transforms

  • Train and predict with a hand-built pipeline

  • Compare the custom pipeline against a preset

Note

This tutorial is for advanced users who need to go beyond presets. Start with Forecasting Quickstart for the standard approach.

Key API references: ForecastingModel · TransformPipeline · GBLinearForecaster

Load the dataset#

from datetime import timedelta

from openstef_core.testing import load_liander_dataset
from openstef_core.types import LeadTime, Q

dataset = load_liander_dataset()

from datetime import datetime

train_start = datetime.fromisoformat("2024-03-01T00:00:00Z")
train_end = train_start + timedelta(days=45)
forecast_end = train_end + timedelta(days=7)

train_dataset = dataset.filter_by_range(start=train_start, end=train_end)
predict_dataset = dataset.filter_by_range(
    start=train_end - timedelta(days=14),
    end=forecast_end,
)

print(f"Training:  {train_dataset.data.shape[0]:,} rows")
print(f"Predict:   {predict_dataset.data.shape[0]:,} rows")
Training:  4,320 rows
Predict:   2,016 rows

Define pipeline components#

A ForecastingModel has three stages:

  1. Preprocessing — feature engineering and data cleaning transforms

  2. Forecaster — the model that produces predictions

  3. Postprocessing — transforms applied to the forecast output

Below we build each stage explicitly.

Preprocessing#

We select transforms from the available modules:

Module

Transforms

transforms.general

Scaler, Imputer, NaNDropper, OutlierHandler, EmptyFeatureRemover

transforms.time_domain

HolidayFeatureAdder, DatetimeFeaturesAdder, CyclicFeaturesAdder, LagsAdder

transforms.weather_domain

AtmosphereDerivedFeaturesAdder, DaylightFeatureAdder, RadiationDerivedFeaturesAdder

transforms.energy_domain

WindPowerFeatureAdder

transforms.validation

CompletenessChecker, FlatlineChecker

from openstef_core.mixins import TransformPipeline
from openstef_models.transforms.general import EmptyFeatureRemover, Imputer, NaNDropper, Scaler
from openstef_models.transforms.time_domain import CyclicFeaturesAdder, HolidayFeatureAdder
from openstef_models.transforms.time_domain.lags_adder import LagsAdder
from openstef_models.utils.feature_selection import Exclude

quantiles = [Q(0.1), Q(0.5), Q(0.9)]
horizons = [LeadTime.from_string("PT36H")]

preprocessing = TransformPipeline(
    transforms=[
        # Feature engineering
        LagsAdder(
            history_available=timedelta(days=14),
            horizons=horizons,
            add_trivial_lags=False,
            target_column="load",
            custom_lags=[timedelta(days=7)],
            lag_fallback_offset=timedelta(days=7),
        ),
        CyclicFeaturesAdder(),
        HolidayFeatureAdder(country_code="NL"),
        # Standardization
        Scaler(selection=Exclude("load"), method="standard"),
        EmptyFeatureRemover(),
        # Missing value handling
        Imputer(selection=Exclude("load"), imputation_strategy="mean"),
        NaNDropper(selection=Exclude("load")),
    ]
)

print(f"Preprocessing steps: {len(preprocessing.transforms)}")
for t in preprocessing.transforms:
    print(f"  - {type(t).__name__}")
Preprocessing steps: 7
  - LagsAdder
  - CyclicFeaturesAdder
  - HolidayFeatureAdder
  - Scaler
  - EmptyFeatureRemover
  - Imputer
  - NaNDropper

Forecaster#

We use GBLinearForecaster — a gradient-boosted linear model that works well with the Imputer + NaNDropper preprocessing pattern above.

from openstef_models.models.forecasting.gblinear_forecaster import (
    GBLinearForecaster,
    GBLinearHyperParams,
)

forecaster = GBLinearForecaster(
    quantiles=quantiles,
    horizons=horizons,
    hyperparams=GBLinearHyperParams(
        n_steps=100,
        learning_rate=0.3,
    ),
    verbosity=0,
)

Postprocessing#

We add a QuantileSorter (ensures quantile ordering) and a ConfidenceIntervalApplicator (adds confidence interval columns).

from openstef_models.transforms.postprocessing import (
    ConfidenceIntervalApplicator,
    QuantileSorter,
)

postprocessing = TransformPipeline(
    transforms=[
        QuantileSorter(),
        ConfidenceIntervalApplicator(
            quantiles=quantiles,
            add_quantiles_from_std=False,
        ),
    ]
)

Assemble the model#

ForecastingModel combines all three stages. We wrap it in a CustomForecastingWorkflow which adds train/predict orchestration.

from openstef_models.models.forecasting_model import ForecastingModel
from openstef_models.workflows import CustomForecastingWorkflow

model = ForecastingModel(
    preprocessing=preprocessing,
    forecaster=forecaster,
    postprocessing=postprocessing,
    target_column="load",
)

workflow = CustomForecastingWorkflow(
    model_id="custom_pipeline_demo",
    model=model,
    callbacks=[],
)

Train and predict#

result = workflow.fit(train_dataset)
forecast = workflow.predict(predict_dataset, forecast_start=train_end)

print(f"Forecast rows:  {len(forecast.data)}")
print(f"Columns:        {list(forecast.data.columns)}")
Forecast rows:  672
Columns:        ['quantile_P10', 'quantile_P50', 'quantile_P90', 'load', 'stdev']

Visualize the result#

Hide code cell source

from openstef_beam.analysis.plots import ForecastTimeSeriesPlotter

fig = (
    ForecastTimeSeriesPlotter()
    .add_measurements(measurements=predict_dataset.data["load"].loc[train_end:])
    .add_model(
        model_name="Custom GBLinear",
        forecast=forecast.median_series,
        quantiles=forecast.quantiles_data,
    )
    .plot()
)
fig.update_layout(
    title="Custom pipeline — forecast vs actuals",
    yaxis_title="Load (MW)",
    xaxis_title="Time",
    height=450,
)
fig.show()
../_images/981274ffa8e7036d183b6cadc2e7385a89e8302d1c38b072d0bc908ad1d2e628.png

Using components individually#

ForecastingModel is convenient, but every component also works on its own. You can run the preprocessing pipeline, inspect intermediate data, and call the forecaster directly.

Run preprocessing on raw data#

preprocessed = model.preprocessing.transform(train_dataset)

print(f"Before preprocessing: {train_dataset.data.shape[1]} columns")
print(f"After preprocessing:  {preprocessed.data.shape[1]} columns")
print(f"\nAdded features: {sorted(set(preprocessed.data.columns) - set(train_dataset.data.columns))[:8]}...")
Before preprocessing: 28 columns
After preprocessing:  49 columns

Added features: ['day_of_week_cosine', 'day_of_week_sine', 'is_ascension_day', 'is_christmas_day', 'is_easter_monday', 'is_easter_sunday', 'is_good_friday', 'is_holiday']...

Run a single transform#

single_transform = CyclicFeaturesAdder()
single_transform.fit(train_dataset)
result_single = single_transform.transform(train_dataset)

print(
    f"CyclicFeaturesAdder added {len(single_transform.features_added())} columns: {single_transform.features_added()}"
)
CyclicFeaturesAdder added 8 columns: ['time_of_day_sine', 'season_sine', 'day_of_week_sine', 'month_sine', 'time_of_day_cosine', 'season_cosine', 'day_of_week_cosine', 'month_cosine']

Call the forecaster directly#

After preprocessing, you can pass the data to a ForecastInputDataset and call the forecaster directly. This is useful for debugging or integrating into custom workflows.

from openstef_core.datasets import ForecastInputDataset

# Preprocess the prediction data
preprocessed_predict = model.preprocessing.transform(predict_dataset)

# Convert to ForecastInputDataset (what the forecaster expects)
forecast_input = ForecastInputDataset(
    data=preprocessed_predict.data,
    sample_interval=preprocessed_predict.sample_interval,
    target_column="load",
    forecast_start=train_end,
)

# Call the forecaster directly
raw_forecast = model.forecaster.predict(forecast_input)
print(f"Raw forecast shape: {raw_forecast.data.shape}")
print(f"Raw forecast columns: {list(raw_forecast.data.columns)}")
Raw forecast shape: (672, 3)
Raw forecast columns: ['quantile_P10', 'quantile_P50', 'quantile_P90']

Next steps#

  • Ensemble Forecasting — combine your custom pipeline with other models into an ensemble for better accuracy.

  • Quantile Calibration — append isotonic calibration to your postprocessing for more reliable confidence intervals.