openstef.tasks.utils package¶
Submodules¶
openstef.tasks.utils.dependencies module¶
- openstef.tasks.utils.dependencies.build_graph_structure(pjs)¶
Build the graph of dependencies between prediction jobs.
- Parameters:
pjs (
Iterable
[PredictionJobDataClass
]) – The Iterable of prediction jobs- Return type:
tuple
[Set
[Union
[str
,int
]],Set
[tuple
[Union
[str
,int
],Union
[str
,int
]]]]- Returns:
The set of node ids of the graph
The set of edges in the graph
- openstef.tasks.utils.dependencies.build_nx_graph(nodes, edges)¶
Build a networkx Directed Graph.
- Parameters:
nodes (
Iterable
[Union
[str
,int
]]) – The sequence of node idsedges (
Iterable
[tuple
[Union
[str
,int
],Union
[str
,int
]]]) – The sequence of edges
- Return type:
DiGraph
- Returns:
The dependency graph
- openstef.tasks.utils.dependencies.find_groups(pjs, randomize_groups=False)¶
Find a sequence of prediction job groups respecting dependencies.
Compute groups of prediction jobs such that the prediction jobs in a group depend of at least one prediction job in the previous group and does not depend on a prediction job in the following groups. This means that all the prediction jobs in a group can be run in parallel and that if groups are treated in the given order, the dependencies of a prediction job have already been treated when the prediction job is run.
- Parameters:
pjs (
Sequence
[PredictionJobDataClass
]) – The sequence of prediction jobsrandomize_groups (
bool
) – Wether subgroups should be randomized.
- Return type:
tuple
[DiGraph
,list
[list
[PredictionJobDataClass
]]]- Returns:
The dependency graph
The list of prediction job groups
- openstef.tasks.utils.dependencies.has_dependencies(pjs)¶
Test whether some prediction jobs have dependencies information.
- Parameters:
pjs (
Iterable
[PredictionJobDataClass
]) – The list of prediction jobs- Return type:
bool
- Returns:
True if some dependency information was found.
openstef.tasks.utils.predictionjobloop module¶
- class openstef.tasks.utils.predictionjobloop.PredictionJobLoop(context, stop_on_exception=False, random_order=True, on_exception_callback=None, on_successful_callback=None, on_end_callback=None, prediction_jobs=None, debug_pid=None, **pj_kwargs)¶
Bases:
object
Convenience objects that maps a function over prediction jobs.
Default behaviour is to automatically get prediction jobs from the database. Any keyword argument passed will be directed to the getting function. If another set of prediction jobs is desired, manually pass them using the prediction_jobs argument.
Tip: For debugging a specific PID, use debug_pid=specific_pid
- Parameters:
context (
TaskContext
) – The context to run this loop in.stop_on_exception (
bool
) – Whether to break out of the loop when an exception is raised. Defaults to False.random_order (
bool
) – Whether to randomize the order of the prediction jobs. Defaults to True. Does not apply to manually passed prediction jobs.on_exception_callback (
Callable
) – Callback, will be called everytime an exception is raised. Callable gets the pj and exception raised as argumentson_successful_callback (
Callable
) – Callback, will be called everytime an iteration is successful (no exception is raised). Callable gets the pj as argument.on_end_callback (
Callable
) – Callback, will be called everytime an iteration is completed. Callable gets the pj and and bool indicating success as argument.prediction_jobs (
list
[PredictionJobDataClass
]) – Manually pass a list of prediction jobs that will be looped over. If set to None, will fetch prediction jobs from database based on pj_kwargsdebug_pid (
int
) – enter a specific pid for debugging. If not None, the prediction job loop will only look at this pid**pj_kwargs – Any other kwargs willed will be directed to the prediction job getting function.
- map(function, *args, **kwargs)¶
Maps the passed function over all prediction jobs.
- Parameters:
function (
Callable
) – The function that will be applied to each prediction job separately.*args – Any other arguments or passed to the function.
**kwargs – All keyword arguments are passed to the function. This method is highly prefered over using args, since kwargs will be automatically logged.
- Raises:
PredictionJobException – This exception will be raised if one or more prediction jobs raised an exception during the loop.
openstef.tasks.utils.taskcontext module¶
- class openstef.tasks.utils.taskcontext.TaskContext(name, config, database, suppress_exceptions=False, post_teams_on_exception=True, on_exception=None, on_successful=None, on_end=None)¶
Bases:
object
A context manager that can be used to run tasks with.
Should be used as:
with TaskContext("my_task_name") as context: pass
- Parameters:
name (
str
) – Name of the taskconfig (
object
) – Configuration object, can be found in openSTEF-dbcdatabase (
object
) – Database object, can be found in openSTEF-dbcsuppress_exceptions (
bool
) – If set to False the context manager will pass any raised exception on. Defaults to False.post_teams_on_exception (
bool
) – If set to True the context manager will automatically post a message to teams when an exception is encountered. Defaults to True.on_exception (
Callable
) – Callback, will be called when an exception is raised. Callable gets exc_type, exc_info, stack_info as arguments.on_successful (
Callable
) – Callback, will be called everytime if the task is successful (no exception is raised).on_end (
Callable
) – Callback, will be called if the task is completed. Callable gets a bool indicating success as argument.