Skip to content

Commit

Permalink
Merge pull request #74 from ihmeuw-msca/bugfix/ensemble
Browse files Browse the repository at this point in the history
Bugfix: ensemble model configuration
  • Loading branch information
kels271828 authored Apr 4, 2024
2 parents 61b1dff + bebf646 commit 3937168
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 111 deletions.
12 changes: 3 additions & 9 deletions src/onemod/actions/data/initialize_results.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
"""Initialize onemod stage results."""

import shutil

import fire
from pplkit.data.interface import DataInterface

from onemod.utils import (
get_ensemble_submodels,
get_handle,
get_rover_covsel_submodels,
get_swimr_submodels,
get_weave_submodels,
Subsets,
)


Expand Down Expand Up @@ -47,7 +48,6 @@ def initialize_results(experiment_dir: str, stages: list[str]) -> None:
stage_init_map[stage](dataif)



def _initialize_rover_covsel_results(dataif: DataInterface) -> None:
"""Initialize rover results."""

Expand Down Expand Up @@ -101,13 +101,7 @@ def _initialize_ensemble_results(dataif: DataInterface) -> None:
dataif.ensemble.mkdir(parents=True)

# Create ensemble subsets
settings = dataif.load_settings()
if "groupby" in settings["ensemble"]:
Subsets(
"ensemble",
settings["ensemble"],
dataif.load_data(),
).subsets.to_csv(dataif.ensemble / "subsets.csv", index=False)
get_ensemble_submodels(dataif.experiment, save_file=True)


def main() -> None:
Expand Down
156 changes: 64 additions & 92 deletions src/onemod/actions/models/ensemble_model.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
"""Run ensemble model."""

import warnings
from functools import reduce
from pathlib import Path
from typing import Any, Optional
import warnings

import fire
import numpy as np
import pandas as pd

from onemod.modeling.metric import Metric
from onemod.utils import as_list, get_handle, Subsets
from onemod.utils import Subsets, as_list, get_handle


def get_predictions(
Expand Down Expand Up @@ -51,15 +52,15 @@ def get_predictions(
try:
df_smoother = dataif.load_weave(weave_file)
# Use concat to add a level to the column multi-index
df_smoother = pd.concat([df_smoother[col_pred]], axis=1, keys="weave")
df_smoother = pd.concat([df_smoother[col_pred]], axis=1, keys=["weave"])
except FileNotFoundError:
# No weave smoother results, initialize empty df
warnings.warn("No weave predictions found for ensemble stage.")
df_smoother = pd.DataFrame()

try:
swimr_df = dataif.load_swimr(swimr_file)
swimr_df = pd.concat([swimr_df[col_pred]], axis=1, keys="swimr")
swimr_df = pd.concat([swimr_df[col_pred]], axis=1, keys=["swimr"])
if df_smoother.empty:
df_smoother = swimr_df
else:
Expand All @@ -82,7 +83,7 @@ def get_performance(
row: pd.Series,
df_holdout: pd.DataFrame,
subsets: Optional[Subsets],
metric: str,
metric_name: str,
col_obs: str,
**kwargs,
) -> float:
Expand All @@ -96,7 +97,7 @@ def get_performance(
Dataframe containing the holdout data.
subsets : Optional[Subsets]
Subsets object containing the subset data.
metric : str
metric_name : str
Performance metric to be used (e.g., "rmse").
col_obs : str
Column name for the observed values.
Expand All @@ -112,15 +113,12 @@ def get_performance(
If an invalid performance metric is provided.
"""
df_holdout = df_holdout[
(df_holdout[row["holdout_id"]] == 1)
& (df_holdout["param_id"] == row["param_id"])
]
df_holdout = df_holdout[df_holdout[row["holdout_id"]] == 1]
if subsets is not None:
df_holdout = subsets.filter_subset(df_holdout, row["subset_id"])

metric = Metric(metric)
return metric(df_holdout, col_obs, row.model_id, **kwargs)
metric = Metric(metric_name)
return metric(df_holdout, col_obs, tuple(row[:3]), **kwargs)


def get_weights(
Expand Down Expand Up @@ -253,129 +251,103 @@ def ensemble_model(experiment_dir: str, *args: Any, **kwargs: Any) -> None:
Parameters
----------
experiment_dir : Union[Path, str]
experiment_dir : str
Path to the experiment directory.
"""
experiment_dir = Path(experiment_dir)
dataif, _ = get_handle(experiment_dir)
settings = dataif.load_settings()
subsets_df = dataif.load_ensemble("subsets.csv")
dataif, global_config = get_handle(experiment_dir)

ensemble_config = global_config.ensemble

subsets = Subsets(
"ensemble",
settings["ensemble"],
subsets=subsets_df,
ensemble_config,
subsets=dataif.load_ensemble("subsets.csv"),
)

# Load input data and smoother predictions
df_input = dataif.load_data()
df_full = get_predictions(experiment_dir, "full", settings["col_pred"])
df_full = get_predictions(experiment_dir, "full", global_config.col_pred)

# Get smoother out-of-sample performance by holdout set
df_performance = pd.merge(
left=df_full.columns.to_frame(index=False),
right=pd.Series(as_list(settings["col_holdout"]), name="holdout_id"),
right=pd.Series(as_list(global_config.col_holdout), name="holdout_id"),
how="cross",
)
df_performance = df_performance.merge(
right=pd.Series(subsets.get_subset_ids(), name="subset_id"),
how="cross",
)
if "groupby" in settings["ensemble"]:
df_performance = df_performance.merge(
right=pd.Series(subsets.get_subset_ids(), name="subset_id"),
how="cross",
)
df_list = []
id_cols = settings["col_id"]
id_cols = as_list(global_config.col_id)

for holdout_id, df in df_performance.groupby("holdout_id"):
predictions = get_predictions(experiment_dir, holdout_id, settings["col_pred"])
# Iteratively merge on prediction columns
df_holdout = reduce(
lambda df, smoother: pd.merge(
left=df,
right=predictions.loc[:, smoother].stack().reset_index(),
how="right",
on=id_cols + ["param_id"],
),
predictions.columns.get_level_values("smoother_id"),
pd.DataFrame(columns=id_cols + ["param_id"]),
predictions = get_predictions(
experiment_dir, holdout_id, global_config.col_pred
)
df_holdout = pd.merge(df_holdout, df_input, on=id_cols)

# Select holdout ids, and calc rmse by subset
df[settings["ensemble"]["metric"]] = df.apply(
predictions.columns = predictions.columns.to_flat_index()
df_holdout = pd.merge(
left=df_input[id_cols + [global_config.col_obs, holdout_id]],
right=predictions,
on=id_cols,
)
df[ensemble_config.metric] = df.apply(
lambda row: get_performance(
row,
df_holdout,
subsets,
settings["ensemble"]["metric"],
settings["col_obs"],
ensemble_config.metric,
global_config.col_obs,
**kwargs,
),
axis=1,
)
df_list.append(df)

# Get smoother weights
columns = ["smoother_id", "model_id", "param_id"]
if "groupby" in settings["ensemble"]:
columns += ["subset_id"]

metric = settings["ensemble"]["metric"]
full_df = pd.concat(df_list)
columns = ["smoother_id", "model_id", "param_id", "subset_id"]
metric_name = ensemble_config.metric
groups = pd.concat(df_list).groupby(columns)
mean = (
full_df.groupby(columns)
groups
.mean(numeric_only=True)
.rename({metric: f"{metric}_mean"}, axis=1)
.rename({metric_name: f"{metric_name}_mean"}, axis=1)
)
std = (
full_df.groupby(columns)
groups
.std(numeric_only=True)
.rename({metric: f"{metric}_std"}, axis=1)
.rename({metric_name: f"{metric_name}_std"}, axis=1)
)
df_performance = pd.concat([mean, std], axis=1)
if settings["ensemble"]["score"] == "avg":
df_performance["weight"] = get_weights(
df_performance,
subsets,
settings["ensemble"]["metric"],
settings["ensemble"]["score"],
)
else:
df_performance["weight"] = get_weights(
df_performance,
subsets,
metric,
settings["ensemble"]["score"],
settings["ensemble"]["top_pct_score"],
settings["ensemble"]["top_pct_model"],
)
dataif.dump_ensemble(df_performance.T, "performance.csv")
df_performance["weight"] = get_weights(
df_performance,
subsets,
ensemble_config.metric,
ensemble_config.score,
ensemble_config.top_pct_score,
ensemble_config.top_pct_model,
)
dataif.dump_ensemble(df_performance, "performance.csv")

# Get ensemble predictions
if "groupby" in settings["ensemble"]:
df_list = []
for subset_id in subsets.get_subset_ids():
indices = [
tuple(index)
for index in subsets.filter_subset(df_input, subset_id)[
as_list(settings["col_id"])
].values
]
df_subset = df_full.loc[indices]
df_list.append(
(df_subset * df_performance["weight"][:, :, :, subset_id])
.T.sum()
.reset_index()
.rename(columns={0: settings["col_pred"]})
)
df_pred = pd.concat(df_list)
else:
df_pred = (
(df_full * df_performance.loc["weight"])
df_list = []
for subset_id in subsets.get_subset_ids():
indices = [
tuple(index)
for index in subsets.filter_subset(df_input, subset_id)[
as_list(global_config.col_id)
].values
]
df_subset = df_full.loc[indices]
df_list.append(
(df_subset * df_performance["weight"][:, :, :, subset_id])
.T.sum()
.reset_index()
.rename(columns={0: settings["col_pred"]})
.rename(columns={0: global_config.col_pred})
)
df_pred = pd.concat(df_list)

dataif.dump_ensemble(df_pred, "predictions.parquet")


Expand Down
2 changes: 2 additions & 0 deletions src/onemod/application/api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from onemod.application.regmod_smooth_application import RegmodSmoothApplication
from onemod.application.rover_covsel_application import RoverCovselApplication
from onemod.application.weave_application import WeaveApplication
from onemod.application.ensemble_application import EnsembleApplication


def get_application_class(stage_name: str) -> type:
Expand All @@ -9,5 +10,6 @@ def get_application_class(stage_name: str) -> type:
"rover_covsel": RoverCovselApplication,
"regmod_smooth": RegmodSmoothApplication,
"weave": WeaveApplication,
"ensemble": EnsembleApplication,
}
return application_map[stage_name]
20 changes: 20 additions & 0 deletions src/onemod/application/ensemble_application.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from pathlib import Path
from typing import Generator

from onemod.actions.action import Action
from onemod.actions.models.ensemble_model import ensemble_model
from onemod.application.base import Application


class EnsembleApplication(Application):
"""An application to run the ensemble stage."""

def __init__(self, experiment_dir: str | Path):
self.experiment_dir = experiment_dir

def action_generator(self) -> Generator[Action, None, None]:
"""A generator to return actions to be run."""
yield Action(
ensemble_model,
experiment_dir=self.experiment_dir,
)
1 change: 1 addition & 0 deletions src/onemod/scheduler/scheduling_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ def upstream_task_callback(action: Action) -> list["Task"]:
"rover_covsel_model": ["initialize_results"],
"regmod_smooth_model": ["collect_results", "initialize_results"],
"weave_model": ["collect_results", "collect_results", "initialize_results"],
"ensemble_model": 3 * ["collect_results"] + ["initialize_results"],
# Logic for collect results: set all modeling tasks as dependencies.
# Due to traversal order of the generator, the rover collection task must be created
# prior to weave modeling tasks being instantiated, therefore this is
Expand Down
4 changes: 1 addition & 3 deletions src/onemod/scheduler/templates.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,7 @@ def create_modeling_template(

# Tasks can be parallelized by an internal concept called submodels
node_args = []

# Assumption: only regmod_smooth is not parallel
if "regmod_smooth" not in task_template_name:
if task_template_name in ["rover_covsel", "weave", "swimr"]:
node_args.append("submodel_id")

template = _create_task_template(
Expand Down
12 changes: 10 additions & 2 deletions src/onemod/schema/models/ensemble_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,13 @@


class EnsembleConfiguration(ParametrizedBaseModel):
# TODO
pass
groupby: list[str] = []
max_attempts: int | None = None
max_batch: int = -1
metric: str = "rmse"
score: str = "neg_exp"
top_pct_score: float = 1.0
top_pct_model: float = 1.0

def inherit(self) -> None:
super().inherit(keys=["groupby", "max_attempts"])
Loading

0 comments on commit 3937168

Please sign in to comment.