MLFlowStorage#

class openstef_models.integrations.mlflow.MLFlowStorage(**data: Any) None[source]

Bases: BaseConfig

MLflow storage backend for managing training runs and model artifacts.

Handles creation, storage, and retrieval of MLflow runs including models, training data, metrics, and hyperparameters. Organizes artifacts locally before uploading to MLflow tracking server.

Parameters:

data (Any)

tracking_uri: str
local_artifacts_path: Path
experiment_name_prefix: str
data_path: str
model_path: str
enable_mlflow_stdout: bool
model_serializer: ModelSerializer
model_post_init(context: Any) None[source]

Override this method to perform additional initialization after __init__ and model_construct. This is useful if you want to do some validation that requires the entire model to be initialized.

Parameters:

context (Any)

Return type:

None

create_run(model_id: ModelIdentifier, run_name: str | None = None, tags: dict[str, str] | None = None, experiment_tags: dict[str, str] | None = None, hyperparams: HyperParams | None = None) Run[source]

Create a new MLflow run for tracking a model training session.

Creates or reuses an MLflow experiment named after the model ID, then starts a new run within that experiment. Logs hyperparameters if provided.

Parameters:
  • model_id (TypeAliasType) – Unique identifier for the model, used as experiment name.

  • run_name (str | None) – Optional display name for this specific run.

  • tags (dict[str, str] | None) – Key-value pairs to attach to the run for filtering/organization.

  • experiment_tags (dict[str, str] | None) – Key-value pairs to attach to the experiment if created.

  • hyperparams (HyperParams | None) – Model hyperparameters to log with the run.

  • model_id

  • run_name

  • tags

  • experiment_tags

  • hyperparams

Returns:

Created MLflow Run object with run_id and metadata.

Return type:

Run

log_hyperparams(run_id: str, params: dict[str, str]) None[source]

Log additional hyperparameters to an existing MLflow run.

Useful for logging hyperparameters from multiple components (e.g., ensemble base forecasters and combiner) with prefixed names.

Parameters:
  • run_id (str) – MLflow run ID to log parameters to.

  • params (dict[str, str]) – Key-value pairs of hyperparameter names and string values.

  • run_id

  • params

Return type:

None

finalize_run(model_id: ModelIdentifier, run_id: str, metrics: dict[str, float] | None = None, status: str = 'FINISHED') None[source]

Complete an MLflow run by uploading artifacts and logging final metrics.

Uploads all locally stored artifacts to MLflow, logs performance metrics, and marks the run as finished with the specified status.

Parameters:
  • model_id (TypeAliasType) – Model identifier used to locate artifact path.

  • run_id (str) – MLflow run ID to finalize.

  • metrics (dict[str, float] | None) – Training/validation metrics to log (e.g., MAE, RMSE).

  • status (str) – Final run status, either “FINISHED”, “FAILED”, or “KILLED”.

  • model_id

  • run_id

  • metrics

  • status

Return type:

None

search_latest_runs(model_id: ModelIdentifier, limit: int = 1, filter_string: str = "attribute.status = 'FINISHED'", order_by: Sequence[str] = ['start_time DESC']) list[Run][source]

Search for recent runs of a specific model in MLflow.

Queries MLflow for runs matching the filter criteria, ordered by most recent. Returns empty list if no experiment exists for the model.

Parameters:
  • model_id (TypeAliasType) – Model identifier to search runs for.

  • limit (int) – Maximum number of runs to return.

  • filter_string (str) – MLflow filter query (e.g., status, metrics, tags).

  • order_by (Sequence[str]) – Sort order for results (e.g., [“start_time DESC”]).

  • model_id

  • limit

  • filter_string

  • order_by

Returns:

List of matching Run objects, newest first, up to limit count.

Return type:

list[Run]

search_run(model_id: ModelIdentifier, run_name: str) Run | None[source]

Search for a specific run of a model by its name in MLflow.

Queries MLflow for a run matching the provided run name. Returns None if no experiment or run exists for the model.

Parameters:
  • model_id (TypeAliasType) – Model identifier to search runs for.

  • run_name (str) – Name of the run to search for.

  • model_id

  • run_name

Returns:

The matching Run object if found, otherwise None.

Return type:

Run | None

save_run_model(model_id: ModelIdentifier, run_id: str, model: object) None[source]

Save a trained model to local artifacts directory for the run.

Serializes the model using the configured serializer and stores it in the run’s artifact directory. Model will be uploaded to MLflow when finalize_run is called.

Parameters:
  • model_id (TypeAliasType) – Model identifier for organizing artifact paths.

  • run_id (str) – MLflow run ID to associate artifacts with.

  • model (object) – Trained model instance with state to serialize.

  • model_id

  • run_id

  • model

Return type:

None

load_run_model(run_id: str, model_id: ModelIdentifier) object[source]

Load a trained model from MLflow artifacts.

Downloads model artifacts from MLflow and deserializes them into the provided model instance, restoring its trained state.

Parameters:
  • run_id (str) – MLflow run ID containing the model artifacts.

  • model_id (TypeAliasType) – Model identifier for locating artifact paths.

  • run_id

  • model_id

Returns:

Model instance with restored state from the run.

Raises:

ModelNotFoundError – If the model artifacts cannot be found in MLflow.

Return type:

object

get_artifacts_path(model_id: ModelIdentifier, run_id: str | None = None) Path[source]

Get the local file system path for storing run artifacts.

Constructs the directory path where artifacts are staged before uploading to MLflow. Path structure: local_artifacts_path/model_id/run_id.

Parameters:
  • model_id (TypeAliasType) – Model identifier for organizing artifacts.

  • run_id (str | None) – Optional run ID to include in path. If None, returns model directory.

  • model_id

  • run_id

Returns:

Absolute path to the artifacts directory.

Return type:

Path

model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': False, 'extra': 'ignore', 'protected_namespaces': ()}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].