CustomForecastingWorkflow#
- class openstef_models.workflows.custom_forecasting_workflow.CustomForecastingWorkflow(**data: Any) None[source]
Bases:
BaseModelComplete forecasting workflow with model management and lifecycle hooks.
Orchestrates the full forecasting process by combining a ForecastingModel (either ForecastingModel or EnsembleForecastingModel) with callback execution and optional model persistence. Provides the main interface for production forecasting systems where models need to be trained, saved, loaded, and used for prediction with monitoring.
Invariants
Callbacks are executed at appropriate lifecycle stages
Model fitting and prediction delegate to the underlying model
Storage operations (if configured) maintain model persistence
Example
Basic workflow with callbacks
>>> import pandas as pd >>> import numpy as np >>> from datetime import timedelta >>> from openstef_core.datasets import VersionedTimeSeriesDataset >>> from openstef_core.types import LeadTime, Q >>> from openstef_models.models.forecasting.constant_quantile_forecaster import ( ... ConstantQuantileForecaster, ... ) >>> from openstef_models.models.forecasting_model import ForecastingModel >>> >>> # Create sample data >>> dataset = TimeSeriesDataset( ... data=pd.DataFrame({ ... "load": np.random.default_rng(42).standard_normal(size=48), ... "temperature": np.random.default_rng(42).standard_normal(size=48), ... }, index=pd.date_range("2025-01-01", periods=48, freq="h")), ... sample_interval=timedelta(hours=1), ... ) >>> >>> # Create model and workflow >>> horizons = [LeadTime.from_string("PT24H")] >>> model = ForecastingModel( ... forecaster=ConstantQuantileForecaster( ... horizons=horizons, quantiles=[Q(0.5)] ... ) ... ) >>> >>> class LoggingCallback(ForecastingCallback): ... def on_fit_end(self, context, result): ... print("Model training completed") >>> >>> workflow = CustomForecastingWorkflow(model=model, model_id="my_model", callbacks=[LoggingCallback()]) >>> result = workflow.fit(dataset) Model training completed >>> forecasts = workflow.predict(dataset) >>> len(forecasts.data) > 0 True
Loading from storage with fallback
>>> workflow = ForecastingWorkflow.from_storage( ... model_id="production_model_v1", ... storage=my_storage, ... default_model_factory=lambda: create_default_model() ... )
- Parameters:
data (
Any)
-
model:
BaseForecastingModel
-
callbacks:
list[ForecastingCallback]
-
model_id:
TypeAliasType
- with_run_name(run_name: str) Self[source]
Return a deep copy of this workflow with the given run name.
- Parameters:
run_name (
str)- Return type:
Self
- fit(data: TimeSeriesDataset, data_val: TimeSeriesDataset | None = None, data_test: TimeSeriesDataset | None = None) ModelFitResult | None[source]
Train the forecasting model with callback execution.
Executes the complete training workflow including pre-fit callbacks, model training, and post-fit callbacks.
- Parameters:
data (
TimeSeriesDataset) – Training dataset for the forecasting model.data_val (
TimeSeriesDataset|None) – Optional validation dataset for model tuning.data_test (
TimeSeriesDataset|None) – Optional test dataset for final evaluation.data
data_val
data_test
- Returns:
ModelFitResult containing training metrics and information, or None if fitting was skipped.
- Return type:
ModelFitResult|None
- predict(data: TimeSeriesDataset, forecast_start: datetime | None = None) ForecastDataset[source]
Generate forecasts with callback execution.
Executes the complete prediction workflow including pre-prediction callbacks, model prediction, and post-prediction callbacks.
- Parameters:
data (
TimeSeriesDataset) – Input dataset for generating forecasts.forecast_start (
datetime|None) – Optional start time for forecasts.data
forecast_start
- Returns:
Generated forecast dataset.
- Raises:
NotFittedError – If the underlying model hasn’t been trained.
- Return type:
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True, 'protected_namespaces': (), 'ser_json_inf_nan': 'null'}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].