MLFlowStorage#
- class openstef_models.integrations.mlflow.MLFlowStorage(**data: Any) None[source]
Bases:
BaseConfigMLflow storage backend for managing training runs and model artifacts.
Handles creation, storage, and retrieval of MLflow runs including models, training data, metrics, and hyperparameters. Organizes artifacts locally before uploading to MLflow tracking server.
- Parameters:
data (
Any)
-
tracking_uri:
str
-
local_artifacts_path:
Path
-
experiment_name_prefix:
str
-
data_path:
str
-
model_path:
str
-
enable_mlflow_stdout:
bool
-
model_serializer:
ModelSerializer
- model_post_init(context: Any) None[source]
Override this method to perform additional initialization after __init__ and model_construct. This is useful if you want to do some validation that requires the entire model to be initialized.
- create_run(model_id: ModelIdentifier, run_name: str | None = None, tags: dict[str, str] | None = None, experiment_tags: dict[str, str] | None = None, hyperparams: HyperParams | None = None) Run[source]
Create a new MLflow run for tracking a model training session.
Creates or reuses an MLflow experiment named after the model ID, then starts a new run within that experiment. Logs hyperparameters if provided.
- Parameters:
model_id (
TypeAliasType) – Unique identifier for the model, used as experiment name.run_name (
str|None) – Optional display name for this specific run.tags (
dict[str,str] |None) – Key-value pairs to attach to the run for filtering/organization.experiment_tags (
dict[str,str] |None) – Key-value pairs to attach to the experiment if created.hyperparams (
HyperParams|None) – Model hyperparameters to log with the run.model_id
run_name
tags
experiment_tags
hyperparams
- Returns:
Created MLflow Run object with run_id and metadata.
- Return type:
Run
- log_hyperparams(run_id: str, params: dict[str, str]) None[source]
Log additional hyperparameters to an existing MLflow run.
Useful for logging hyperparameters from multiple components (e.g., ensemble base forecasters and combiner) with prefixed names.
- finalize_run(model_id: ModelIdentifier, run_id: str, metrics: dict[str, float] | None = None, status: str = 'FINISHED') None[source]
Complete an MLflow run by uploading artifacts and logging final metrics.
Uploads all locally stored artifacts to MLflow, logs performance metrics, and marks the run as finished with the specified status.
- Parameters:
model_id (
TypeAliasType) – Model identifier used to locate artifact path.run_id (
str) – MLflow run ID to finalize.metrics (
dict[str,float] |None) – Training/validation metrics to log (e.g., MAE, RMSE).status (
str) – Final run status, either “FINISHED”, “FAILED”, or “KILLED”.model_id
run_id
metrics
status
- Return type:
- search_latest_runs(model_id: ModelIdentifier, limit: int = 1, filter_string: str = "attribute.status = 'FINISHED'", order_by: Sequence[str] = ['start_time DESC']) list[Run][source]
Search for recent runs of a specific model in MLflow.
Queries MLflow for runs matching the filter criteria, ordered by most recent. Returns empty list if no experiment exists for the model.
- Parameters:
model_id (
TypeAliasType) – Model identifier to search runs for.limit (
int) – Maximum number of runs to return.filter_string (
str) – MLflow filter query (e.g., status, metrics, tags).order_by (
Sequence[str]) – Sort order for results (e.g., [“start_time DESC”]).model_id
limit
filter_string
order_by
- Returns:
List of matching Run objects, newest first, up to limit count.
- Return type:
list[Run]
- search_run(model_id: ModelIdentifier, run_name: str) Run | None[source]
Search for a specific run of a model by its name in MLflow.
Queries MLflow for a run matching the provided run name. Returns None if no experiment or run exists for the model.
- save_run_model(model_id: ModelIdentifier, run_id: str, model: object) None[source]
Save a trained model to local artifacts directory for the run.
Serializes the model using the configured serializer and stores it in the run’s artifact directory. Model will be uploaded to MLflow when finalize_run is called.
- load_run_model(run_id: str, model_id: ModelIdentifier) object[source]
Load a trained model from MLflow artifacts.
Downloads model artifacts from MLflow and deserializes them into the provided model instance, restoring its trained state.
- Parameters:
run_id (
str) – MLflow run ID containing the model artifacts.model_id (
TypeAliasType) – Model identifier for locating artifact paths.run_id
model_id
- Returns:
Model instance with restored state from the run.
- Raises:
ModelNotFoundError – If the model artifacts cannot be found in MLflow.
- Return type:
- get_artifacts_path(model_id: ModelIdentifier, run_id: str | None = None) Path[source]
Get the local file system path for storing run artifacts.
Constructs the directory path where artifacts are staged before uploading to MLflow. Path structure: local_artifacts_path/model_id/run_id.
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': False, 'extra': 'ignore', 'protected_namespaces': ()}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].