BenchmarkCallback#
- class openstef_beam.benchmarking.BenchmarkCallback[source]#
Bases:
objectBase class for benchmark execution callbacks.
Provides hooks into the benchmark pipeline lifecycle, enabling custom monitoring, logging, progress tracking, and external integrations. Callbacks receive notifications at key execution points and can influence the benchmark flow.
Callback methods can return boolean values to control execution flow: - Returning False from start methods (on_benchmark_start, on_target_start, etc.)
will skip that phase of execution
Complete methods are purely informational and don’t affect flow control
Example
Creating a custom progress monitoring callback:
>>> from openstef_beam.benchmarking.callbacks import BenchmarkCallback >>> import logging >>> >>> class ProgressCallback(BenchmarkCallback): ... def __init__(self): ... self.logger = logging.getLogger("benchmark.progress") ... self.total_targets = 0 ... self.completed_targets = 0 ... ... def on_benchmark_start(self, runner, targets): ... self.total_targets = len(targets) ... self.logger.info(f"Starting benchmark for {self.total_targets} targets") ... return True ... ... def on_target_complete(self, runner, target): ... self.completed_targets += 1 ... progress = (self.completed_targets / self.total_targets) * 100 ... self.logger.info(f"Completed {target.name} ({progress:.1f}%)") ... ... def on_error(self, runner, target, error): ... self.logger.error(f"Error processing {target.name}: {error}")
Early termination based on conditions:
>>> class QualityGateCallback(BenchmarkCallback): ... def __init__(self, max_mae_threshold=100.0): ... self.threshold = max_mae_threshold ... ... def on_evaluation_complete(self, runner, target, report): ... mae = report.get_metric("MAE") ... if mae > self.threshold: ... logging.warning(f"Target {target.name} exceeds MAE threshold") ... # Could trigger alerts or early termination logic
The callback system enables extensive customization while maintaining clean separation between benchmark execution and monitoring concerns.
- on_benchmark_start(runner: BenchmarkPipeline[Any, Any], targets: list[BenchmarkTarget]) bool[source]#
Called when benchmark starts.
- Returns:
True if benchmark should start, False to skip.
- Return type:
bool
- Parameters:
runner (
BenchmarkPipeline[Any,Any])targets (
list[BenchmarkTarget])
- Return type:
bool
- on_target_start(runner: BenchmarkPipeline[Any, Any], target: BenchmarkTarget) bool[source]#
Called when processing a target begins.
- Returns:
True if target processing should start, False to skip.
- Return type:
bool
- Parameters:
runner (
BenchmarkPipeline[Any,Any])target (
BenchmarkTarget)
- Return type:
bool
- on_backtest_start(runner: BenchmarkPipeline[Any, Any], target: BenchmarkTarget) bool[source]#
Called before backtest execution.
- Returns:
True if backtesting should start, False to skip.
- Return type:
bool
- Parameters:
runner (
BenchmarkPipeline[Any,Any])target (
BenchmarkTarget)
- Return type:
bool
- on_backtest_complete(runner: BenchmarkPipeline[Any, Any], target: BenchmarkTarget, predictions: TimeSeriesDataset) None[source]#
Called after backtest completes.
- Parameters:
runner (
BenchmarkPipeline[Any,Any])target (
BenchmarkTarget)predictions (
TimeSeriesDataset)
- Return type:
None
- on_evaluation_start(runner: BenchmarkPipeline[Any, Any], target: BenchmarkTarget) bool[source]#
Called before evaluation starts.
- Returns:
True if evaluation should start, False to skip.
- Return type:
bool
- Parameters:
runner (
BenchmarkPipeline[Any,Any])target (
BenchmarkTarget)
- Return type:
bool
- on_evaluation_complete(runner: BenchmarkPipeline[Any, Any], target: BenchmarkTarget, report: EvaluationReport) None[source]#
Called after evaluation completes.
- Parameters:
runner (
BenchmarkPipeline[Any,Any])target (
BenchmarkTarget)report (
EvaluationReport)
- Return type:
None
- on_target_complete(runner: BenchmarkPipeline[Any, Any], target: BenchmarkTarget) None[source]#
Called when target processing finishes.
- Parameters:
runner (
BenchmarkPipeline[Any,Any])target (
BenchmarkTarget)
- Return type:
None
- on_benchmark_complete(runner: BenchmarkPipeline[Any, Any], targets: list[BenchmarkTarget]) None[source]#
Called when entire benchmark finishes.
- Parameters:
runner (
BenchmarkPipeline[Any,Any])targets (
list[BenchmarkTarget])
- Return type:
None
- on_error(runner: BenchmarkPipeline[Any, Any], target: BenchmarkTarget, error: Exception) None[source]#
Called when an error occurs.
- Parameters:
runner (
BenchmarkPipeline[Any,Any])target (
BenchmarkTarget)error (
Exception)
- Return type:
None
- on_analysis_complete(runner: BenchmarkPipeline[Any, Any], target: BenchmarkTarget | None, output: AnalysisOutput) None[source]#
Called after analysis (visualization) completes for a target.
- Parameters:
runner (
BenchmarkPipeline[Any,Any])target (
BenchmarkTarget|None)output (
AnalysisOutput)
- Return type:
None