CustomForecastingWorkflow#

class openstef_models.workflows.CustomForecastingWorkflow(**data: Any) None[source]#

Bases: BaseModel

Complete forecasting workflow with model management and lifecycle hooks.

Orchestrates the full forecasting process by combining a ForecastingModel (either ForecastingModel or EnsembleForecastingModel) with callback execution and optional model persistence. Provides the main interface for production forecasting systems where models need to be trained, saved, loaded, and used for prediction with monitoring.

Invariants

  • Callbacks are executed at appropriate lifecycle stages

  • Model fitting and prediction delegate to the underlying model

  • Storage operations (if configured) maintain model persistence

Example

Basic workflow with callbacks

>>> import pandas as pd
>>> import numpy as np
>>> from datetime import timedelta
>>> from openstef_core.datasets import VersionedTimeSeriesDataset
>>> from openstef_core.types import LeadTime, Q
>>> from openstef_models.models.forecasting.constant_quantile_forecaster import (
...     ConstantQuantileForecaster,
... )
>>> from openstef_models.models.forecasting_model import ForecastingModel
>>>
>>> # Create sample data
>>> dataset = TimeSeriesDataset(
...     data=pd.DataFrame({
...         "load": np.random.default_rng(42).standard_normal(size=48),
...         "temperature": np.random.default_rng(42).standard_normal(size=48),
...     }, index=pd.date_range("2025-01-01", periods=48, freq="h")),
...     sample_interval=timedelta(hours=1),
... )
>>>
>>> # Create model and workflow
>>> horizons = [LeadTime.from_string("PT24H")]
>>> model = ForecastingModel(
...     forecaster=ConstantQuantileForecaster(
...         horizons=horizons, quantiles=[Q(0.5)]
...     )
... )
>>>
>>> class LoggingCallback(ForecastingCallback):
...     def on_fit_end(self, context, result):
...         print("Model training completed")
>>>
>>> workflow = CustomForecastingWorkflow(model=model, model_id="my_model", callbacks=[LoggingCallback()])
>>> result = workflow.fit(dataset)
Model training completed
>>> forecasts = workflow.predict(dataset)
>>> len(forecasts.data) > 0
True

Loading from storage with fallback

>>> workflow = ForecastingWorkflow.from_storage(
...     model_id="production_model_v1",
...     storage=my_storage,
...     default_model_factory=lambda: create_default_model()
... )
Parameters:

data (Any)

model: BaseForecastingModel#

The forecasting model to use.

callbacks: list[ForecastingCallback]#

List of callbacks to execute during workflow events.

model_id: TypeAliasType#
run_name: str | None#

Optional name for this workflow run.

experiment_tags: dict[str, str]#

Optional metadata tags for experiment tracking.

with_run_name(run_name: str) Self[source]#

Return a deep copy of this workflow with the given run name.

Parameters:

run_name (str)

Return type:

Self

fit(data: TimeSeriesDataset, data_val: TimeSeriesDataset | None = None, data_test: TimeSeriesDataset | None = None) ModelFitResult | None[source]#

Train the forecasting model with callback execution.

Executes the complete training workflow including pre-fit callbacks, model training, and post-fit callbacks.

Parameters:
Returns:

ModelFitResult containing training metrics and information, or None if fitting was skipped.

Return type:

ModelFitResult | None

predict(data: TimeSeriesDataset, forecast_start: datetime | None = None) ForecastDataset[source]#

Generate forecasts with callback execution.

Executes the complete prediction workflow including pre-prediction callbacks, model prediction, and post-prediction callbacks.

Parameters:
  • data (TimeSeriesDataset) – Input dataset for generating forecasts.

  • forecast_start (datetime | None) – Optional start time for forecasts.

  • data

  • forecast_start

Returns:

Generated forecast dataset.

Raises:

NotFittedError – If the underlying model hasn’t been trained.

Return type:

ForecastDataset

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'protected_namespaces': (), 'ser_json_inf_nan': 'null'}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_post_init(context: Any, /) None#

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Parameters:
  • self (BaseModel) – The BaseModel instance.

  • context (Any) – The context.

  • self

  • context

Return type:

None