openstef.tasks package¶
Subpackages¶
Submodules¶
openstef.tasks.calculate_kpi module¶
This module contains the CRON job that is periodically executed to calculate key performance indicators (KPIs).
This code assumes prognoses are available from the persistent storage. If these are not available run create_forecast.py to train all models.
- The folowing tasks are caried out:
1: Calculate the KPI for a given pid. Ignore SplitEnergy 2: Create figures 3: Write KPI to database
Example
This module is meant to be called directly from a CRON job. Alternatively this code can be run directly by running:
$ python calculate_kpi.py
- openstef.tasks.calculate_kpi.calc_kpi_for_specific_pid(pid, realised, predicted_load, basecase)¶
Function that checks the model performance based on a pid. This function.
loads and combines forecast and realised data
calculated several key performance indicators (KPIs)
- These metric include:
RMSE,
bias,
NSME (model efficiency, between -inf and 1)
Mean absolute Error
- Parameters:
pid (
int
) – Prediction ID for a given prediction jobrealised (
DataFrame
) – Realised load.predicted_load (
DataFrame
) – Predicted load.basecase (
DataFrame
) – Basecase predicted load.
- Return type:
dict
- Returns:
Dictionary that includes a dictonary for each t_ahead.
Dict includes enddate en window (in days) for clarification
- Raises:
NoPredictedLoadError – When no predicted load for given datatime range.
NoRealisedLoadError – When no realised load for given datetime range.
Example
To get the rMAE for the 24 hours ahead prediction: kpis[‘24h’][‘rMAE’]
- openstef.tasks.calculate_kpi.check_kpi_task(pj, context, start_time, end_time, threshold_optimizing=0.5, threshold_retraining=0.25)¶
- Return type:
None
- openstef.tasks.calculate_kpi.main(model_type=None, config=None, database=None)¶
- Return type:
None
- openstef.tasks.calculate_kpi.set_incomplete_kpi_to_nan(kpis, t_ahead_h)¶
Checks the given kpis for completeness and sets to nan if this not true.
- Parameters:
kpis (
dict
) – the kpist_ahead_h (
str
) – t_ahead_h
- Return type:
None
openstef.tasks.create_basecase_forecast module¶
This module should be executed once every day.
For all prediction_jobs, it will create a ‘basecase’ forecast which is less accurate, but (almost) always available. For now, it uses the load a week earlier. Missing datapoints are interpolated.
Example
This module is meant to be called directly from a CRON job. A description of the CRON job can be found in the /k8s/CronJobs folder.
Alternatively this code can be run directly by running:
$ python create_basecase_forecast.py
- openstef.tasks.create_basecase_forecast.create_basecase_forecast_task(pj, context, t_behind_days=15, t_ahead_days=14)¶
Top level task that creates a basecase forecast.
On this task level all database and context manager dependencies are resolved.
- Parameters:
pj (
PredictionJobDataClass
) – Prediction jobcontext (
TaskContext
) – Contect object that holds a config manager and a database connectiont_behind_days – number of days included as history. This is used to generated lagged features for the to-be-forecasted period
t_ahead_days – number of days a basecase forecast is created for
- Return type:
None
- openstef.tasks.create_basecase_forecast.main(config=None, database=None, **kwargs)¶
openstef.tasks.create_components_forecast module¶
This module contains the CRON job that is periodically executed to make the components prognoses.
This code assumes trained models are available from the persistent storage. If these are not available run model_train.py to train all models. To provide the prognoses the following steps are carried out:
Get historic training data (TDCV, Load, Weather and APX price data)
Apply features
Load model
Make component prediction
Write prediction to the database
Send Teams message if something goes wrong
Example
This module is meant to be called directly from a CRON job. A description of the CRON job can be found in the /k8s/CronJobs folder. Alternatively this code can be run directly by running:
$ python create_components_forecast.py
- openstef.tasks.create_components_forecast.create_components_forecast_task(pj, context, t_behind_days=0, t_ahead_days=3)¶
Top level task that creates a components forecast.
On this task level all database and context manager dependencies are resolved.
- Parameters:
pj (
PredictionJobDataClass
) – Prediction jobcontext (
TaskContext
) – Contect object that holds a config manager and a database connectiont_behind_days (
int
) – number of days in the past that the component forecast is created fort_ahead_days (
int
) – number of days in the future that the component forecast is created for
- Raises:
ComponentForecastTooShortHorizonError – If the forecast horizon is too short (less than 30 minutes in advance)
- Return type:
None
- openstef.tasks.create_components_forecast.main(config=None, database=None, **kwargs)¶
openstef.tasks.create_forecast module¶
This module contains the CRON job that is periodically executed to make prognoses and save them in to the database.
This code assumes trained models are available from the persistent storage. If these are not available run model_train.py to train all models. To provide the prognoses the folowing steps are carried out:
Get historic training data (TDCV, Load, Weather and APX price data)
Apply features
Load model
Make prediction
Write prediction to the database
Send Teams message if something goes wrong
Example
This module is meant to be called directly from a CRON job. Alternatively this code can be run directly by running:
$ python create_forecast.py
- openstef.tasks.create_forecast.create_forecast_task(pj, context, t_behind_days=14)¶
Top level task that creates a forecast.
On this task level all database and context manager dependencies are resolved.
- Expected prediction job keys; “id”, “lat”, “lon”, “resolution_minutes”,
“horizon_minutes”, “type”, “name”, “quantiles”
- Parameters:
pj (
PredictionJobDataClass
) – Prediction jobcontext (
TaskContext
) – Contect object that holds a config manager and a database connectiont_behind_days (
int
) – number of days included as history. This is used to generated lagged features for the to-be-forecasted period
- Return type:
None
- openstef.tasks.create_forecast.main(model_type=None, config=None, database=None, **kwargs)¶
openstef.tasks.create_solar_forecast module¶
This module contains the CRON job that is periodically executed to make prognoses of solar features.
These are useful for splitting the load in solar and wind contributions.
Example
This module is meant to be called directly from a CRON job. A description of the CRON job can be found in the /k8s/CronJobs folder. Alternatively this code can be run directly by running:
$ python create_solar_forecast
- openstef.tasks.create_solar_forecast.apply_fit_insol(data, add_to_df=True, hours_delta=None, polynomial=False)¶
This model fits insolation to PV yield and uses this fit to forecast PV yield. It uses a 2nd order polynomial.
- Input:
data: pd.DataFrame(index = datetime, columns = [load, insolation])
Optional: - hoursDelta: period of forecast in hours [int] (e.g. every 6 hours for KNMI) - addToDF: Bool, add the norm to the data
- Output:
pd.DataFrame(index = datetime, columns = [(load), forecaopenstefitInsol])
NB: range of datetime of input is equal to range of datetime of output
Example: import pandas as pd import numpy as np index = pd.date_range(start = “2017-01-01 09:00:00”, freq = ‘15T’, periods = 300) data = pd.DataFrame(index = index,
data = dict(load=np.sin(index.hour/24*np.pi)*np.random.uniform(0.7,1.7, len(index))))
data[‘insolation’] = data.load * np.random.uniform(0.8, 1.2, len(index)) + 0.1 data.loc[int(len(index)/3*2):,”load”] = np.nan
- openstef.tasks.create_solar_forecast.apply_persistence(data, how='mean', smooth_entries=4, add_to_df=True, colname=None)¶
This script calculates the persistence forecast.
- Input:
data: pd.DataFrame(index = datetime, columns = [load]), datetime is expected to have historic values, as well as NA values
Optional: - how: str, how to determine the norm (abs or mean) - smoothEntries: int, number of historic entries over which the persistence is smoothed - add_to_df: Bool, add the forecast to the data - option of specifying colname if load is not first column
- Output:
pd.DataFrame(index = datetime, columns = [(load,) persistence])
NB: range of datetime of input is equal to range of datetime of output
Example: import pandas as pd import numpy as np index = pd.date_range(start = “2017-01-01 09:00:00”, freq = ‘15T’, periods = 300) data = pd.DataFrame(index = index,
data = dict(load=np.sin(index.hour/24*np.pi)*np.random.uniform(0.7,1.7, 300)))
data.loc[200:,”load”] = np.nan
- openstef.tasks.create_solar_forecast.calc_norm(data, how='max', add_to_df=True)¶
This script calculates the norm of a given dataset.
- Input:
data: pd.DataFrame(index = datetime, columns = [load])
how: str can be any function from numpy, recognized by np.’how’
Optional: - add_to_df: Bool, add the norm to the data
- Output:
pd.DataFrame(index = datetime, columns = [load])
NB: range of datetime of input is equal to range of datetime of output
Example: import pandas as pd import numpy as np index = pd.date_range(start = “2017-01-01 09:00:00”, freq = ‘15T’, periods = 200) data = pd.DataFrame(index = index,
data = dict(load=np.sin(index.hour/24*np.pi)*np.random.uniform(0.7,1.7, 200)))
- openstef.tasks.create_solar_forecast.combine_forecasts(forecasts, combination_coefs)¶
This function combines several independent forecasts into one, using predetermined coefficients.
- Input:
forecasts: pd.DataFrame(index = datetime, algorithm1, …, algorithmn)
combinationcoefs: pd.DataFrame(param1, …, paramn, algorithm1, …, algorithmn)
- Output:
pd.DataFrame(datetime, forecast)
- openstef.tasks.create_solar_forecast.fides(data, all_forecasts=False)¶
Fides makes a forecast based on persistence and a direct fit with insolation.
- Parameters:
data (
DataFrame
) – pd.DataFrame(index = datetime, columns =[‘output’,’insolation’])all_forecasts (
bool
) – Should all forecasts be returned or only the combination
Example: import numpy as np index = pd.date_range(start = “2017-01-01 09:00:00”, freq = ‘15T’, periods = 300) data = pd.DataFrame(index = index,
data = dict(load=np.sin(index.hour/24*np.pi)*np.random.uniform(0.7,1.7, 300)))
data[‘insolation’] = data.load * np.random.uniform(0.8, 1.2, len(index)) + 0.1 data.loc[int(len(index)/3*2):,”load”] = np.nan
- openstef.tasks.create_solar_forecast.main(config=None, database=None, **kwargs)¶
- openstef.tasks.create_solar_forecast.make_solar_prediction_pj(pj, context, radius=30, peak_power=180961000.0)¶
Make a solar prediction for a specific prediction job.
- Parameters:
pj – (dict) prediction job
context – Task context
radius – Radius us to collect PV systems.
peak_power – Peak power.
openstef.tasks.create_wind_forecast module¶
This module contains the CRON job that is periodically executed to make prognoses of wind features.
These features are usefull for splitting the load in solar and wind contributions and making prognoses.
Example
This module is meant to be called directly from a CRON job. A description of the CRON job can be found in the /k8s/CronJobs folder. Alternatively this code can be run directly by running:
$ python create_wind_forecast
- openstef.tasks.create_wind_forecast.main(config=None, database=None)¶
- openstef.tasks.create_wind_forecast.make_wind_forecast_pj(pj, context)¶
Make a wind prediction for a specific prediction job.
- Parameters:
pj (
PredictionJobDataClass
) – Prediction jobcontext (
TaskContext
) – Context manager
- Return type:
None
openstef.tasks.optimize_hyperparameters module¶
optimize_hyper_params.py.
This module contains the CRON job that is periodically executed to optimize the hyperparameters for the prognosis models.
Example
This module is meant to be called directly from a CRON job. A description of the CRON job can be found in the /k8s/CronJobs folder. Alternatively this code can be run directly by running:
$ python optimize_hyperparameters.py
- openstef.tasks.optimize_hyperparameters.main(config=None, database=None)¶
- openstef.tasks.optimize_hyperparameters.optimize_hyperparameters_task(pj, context, check_hyper_param_age=True)¶
Optimize hyperparameters task.
Expected prediction job keys: “id”, “model”, “lat”, “lon”, “name”, “description” Only used for logging: “name”, “description”
- Parameters:
pj (
PredictionJobDataClass
) – Prediction jobcontext (
TaskContext
) – Task contextcheck_hyper_param_age (
bool
) – Boolean indicating if optimization can be skipped in case existing hyperparameters do not exceed the maximum age.
- Return type:
None
openstef.tasks.split_forecast module¶
This module contains the CRON job that is periodically executed to make prognoses of solar features.
These features are usefull for splitting the load in solar and wind contributions. This is achieved by carrying out the folowing steps:
Get the wind and solar reference data for the specific location of the customer
Get the TDCV (Typical Domestic Consumption Values) data
Fit a linear combination of above time series to the historic load data to determine the contributions of each energy source.
Write the resulting coeficients to the SQL database.
Example
This module is meant to be called directly from a CRON job. A description of the CRON job can be found in the /k8s/CronJobs folder. Alternatively this code can be run directly by running:
$ python split_forecast.py
- openstef.tasks.split_forecast.convert_coefdict_to_coefsdf(pj, input_split_function, coefdict)¶
Convert dictionary of coefficients to dataframe with additional data for db storage.
- Parameters:
pj (
PredictionJobDataClass
) – prediction jobinput_split_function (
DataFrame
) – df of columns of standard load profiles, i.e. wind, solar, householdcoefdict (
dict
) – dict of coefficient per standard load profile
- Return type:
DataFrame
- Returns:
DataFrame of coefficients to insert in sql
- openstef.tasks.split_forecast.determine_invalid_coefs(new_coefs, last_coefs)¶
Determine which new coefficients are valid and return them.
- Parameters:
new_coefs (
DataFrame
) – df of new coefficients for standard load profiles (i.e. wind, solar, household)last_coefs (
DataFrame
) – df of last coefficients for standard load profiles (i.e. wind, solar, household)
- Return type:
DataFrame
- Returns:
Dataframe with invalid coefficients
- openstef.tasks.split_forecast.find_components(df, zero_bound=True)¶
Function that does the actual energy splitting.
- Parameters:
df (
DataFrame
) – Input data. The dataframe should contain these columns in exactly this order: [load, wind_ref, pv_ref, mulitple tdcv colums]zero_bound (
bool
) – If zero_bound is True coefficients can’t be negative.
- Returns:
DataFrame containing the wind and solar components
Dict with the coefficients that result from the fitting
- Return type:
tuple
- openstef.tasks.split_forecast.main(config=None, database=None)¶
- openstef.tasks.split_forecast.split_forecast_task(pj, context)¶
Function that caries out the energy splitting for a specific prediction job with id pid.
- Parameters:
pid – Prediction job id
- Return type:
DataFrame
- Returns:
Energy splitting coefficients.
openstef.tasks.train_model module¶
This module contains the CRON job that is periodically executed to retrain the prognosis models.
- For this the folowing steps are caried out:
Get historic training data (TDCV, Load, Weather and APX price data)
Apply features
Train and Test the new model
Check if new model performs better than the old model
Store the model if it performs better
Send slack message to inform the users
Example
This module is meant to be called directly from a CRON job. A description of the CRON job can be found in the /k8s/CronJobs folder. Alternatively this code can be run directly by running:
$ python model_train.py
- openstef.tasks.train_model.main(model_type=None, config=None, database=None)¶
- openstef.tasks.train_model.train_model_task(pj, context, check_old_model_age=True, datetime_start=None, datetime_end=None)¶
Train model task.
Top level task that trains a new model and makes sure the best available model is stored. On this task level all database and context manager dependencies are resolved.
Expected prediction job keys: “id”, “model”, “lat”, “lon”, “name”
- Parameters:
pj (
PredictionJobDataClass
) – Prediction jobcontext (
TaskContext
) – Contect object that holds a config manager and a database connection.check_old_model_age (
bool
) – check if model is too young to be retraineddatetime_start (
datetime
) – Startdatetime_end (
datetime
) – End
- Raises:
SkipSaveTrainingForecasts – If old model is better or too young, you don’t need to save the traing forcast.
InputDataOngoingZeroFlatlinerError – If all recent load measurements are zero.
- Return type:
None