openstef.tasks.utils package#


openstef.tasks.utils.dependencies module#


Build the graph of dependencies between prediction jobs.


pjs (Iterable[PredictionJobDataClass]) – The Iterable of prediction jobs

Return type:

tuple[Set[Union[str, int]], Set[tuple[Union[str, int], Union[str, int]]]]


  • The set of node ids of the graph

  • The set of edges in the graph

openstef.tasks.utils.dependencies.build_nx_graph(nodes, edges)#

Build a networkx Directed Graph.

  • nodes (Iterable[Union[str, int]]) – The sequence of node ids

  • edges (Iterable[tuple[Union[str, int], Union[str, int]]]) – The sequence of edges

Return type:



The dependency graph

openstef.tasks.utils.dependencies.find_groups(pjs, randomize_groups=False)#

Find a sequence of prediction job groups respecting dependencies.

Compute groups of prediction jobs such that the prediction jobs in a group depend of at least one prediction job in the previous group and does not depend on a prediction job in the following groups. This means that all the prediction jobs in a group can be run in parallel and that if groups are treated in the given order, the dependencies of a prediction job have already been treated when the prediction job is run.

  • pjs (Sequence[PredictionJobDataClass]) – The sequence of prediction jobs

  • randomize_groups (bool) – Wether subgroups should be randomized.

Return type:

tuple[DiGraph, list[list[PredictionJobDataClass]]]


  • The dependency graph

  • The list of prediction job groups


Test whether some prediction jobs have dependencies information.


pjs (Iterable[PredictionJobDataClass]) – The list of prediction jobs

Return type:



True if some dependency information was found.

openstef.tasks.utils.predictionjobloop module#

class openstef.tasks.utils.predictionjobloop.PredictionJobLoop(context, stop_on_exception=False, random_order=True, on_exception_callback=None, on_successful_callback=None, on_end_callback=None, prediction_jobs=None, debug_pid=None, **pj_kwargs)#

Bases: object

Convenience objects that maps a function over prediction jobs.

Default behaviour is to automatically get prediction jobs from the database. Any keyword argument passed will be directed to the getting function. If another set of prediction jobs is desired, manually pass them using the prediction_jobs argument.

Tip: For debugging a specific PID, use debug_pid=specific_pid

  • context (TaskContext) – The context to run this loop in.

  • stop_on_exception (bool) – Whether to break out of the loop when an exception is raised. Defaults to False.

  • random_order (bool) – Whether to randomize the order of the prediction jobs. Defaults to True. Does not apply to manually passed prediction jobs.

  • on_exception_callback (Optional[Callable]) – Callback, will be called everytime an exception is raised. Callable gets the pj and exception raised as arguments

  • on_successful_callback (Optional[Callable]) – Callback, will be called everytime an iteration is successful (no exception is raised). Callable gets the pj as argument.

  • on_end_callback (Optional[Callable]) – Callback, will be called everytime an iteration is completed. Callable gets the pj and and bool indicating success as argument.

  • prediction_jobs (Optional[list[PredictionJobDataClass]]) – Manually pass a list of prediction jobs that will be looped over. If set to None, will fetch prediction jobs from database based on pj_kwargs

  • debug_pid (Optional[int]) – enter a specific pid for debugging. If not None, the prediction job loop will only look at this pid

  • **pj_kwargs – Any other kwargs willed will be directed to the prediction job getting function.

map(function, *args, **kwargs)#

Maps the passed function over all prediction jobs.

  • function (Callable) – The function that will be applied to each prediction job separately.

  • *args – Any other arguments or passed to the function.

  • **kwargs – All keyword arguments are passed to the function. This method is highly prefered over using args, since kwargs will be automatically logged.


PredictionJobException – This exception will be raised if one or more prediction jobs raised an exception during the loop.

openstef.tasks.utils.taskcontext module#

class openstef.tasks.utils.taskcontext.TaskContext(name, config, database, suppress_exceptions=False, post_teams_on_exception=True, on_exception=None, on_successful=None, on_end=None)#

Bases: object

A context manager that can be used to run tasks with.

Should be used as:

with TaskContext("my_task_name") as context:
  • name (str) – Name of the task

  • config (object) – Configuration object, can be found in openSTEF-dbc

  • database (object) – Database object, can be found in openSTEF-dbc

  • suppress_exceptions (bool) – If set to False the context manager will pass any raised exception on. Defaults to False.

  • post_teams_on_exception (bool) – If set to True the context manager will automatically post a message to teams when an exception is encountered. Defaults to True.

  • on_exception (Optional[Callable]) – Callback, will be called when an exception is raised. Callable gets exc_type, exc_info, stack_info as arguments.

  • on_successful (Optional[Callable]) – Callback, will be called everytime if the task is successful (no exception is raised).

  • on_end (Optional[Callable]) – Callback, will be called if the task is completed. Callable gets a bool indicating success as argument.

Module contents#