Skip to content

Commit

Permalink
added optuna paralleation for ml models and combined optunatuner and …
Browse files Browse the repository at this point in the history
…dloptunatuner classes into one optunatuner class
  • Loading branch information
screengreen committed Sep 2, 2024
1 parent 38fdfa9 commit c9e1379
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 213 deletions.
3 changes: 1 addition & 2 deletions lightautoml/automl/presets/tabular_presets.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
from ...ml_algo.dl_model import TorchModel
from ...ml_algo.linear_sklearn import LinearLBFGS
from ...ml_algo.random_forest import RandomForestSklearn
from ...ml_algo.tuning.optuna import DLOptunaTuner
from ...ml_algo.tuning.optuna import OptunaTuner
from ...pipelines.features.lgb_pipeline import LGBAdvancedPipeline
from ...pipelines.features.lgb_pipeline import LGBSeqSimpleFeatures
Expand Down Expand Up @@ -444,7 +443,7 @@ def get_nn(

if tuned:
nn_model.set_prefix("Tuned")
nn_tuner = DLOptunaTuner(
nn_tuner = OptunaTuner(
n_trials=model_params["tuning_params"]["max_tuning_iter"],
timeout=model_params["tuning_params"]["max_tuning_time"],
fit_on_holdout=model_params["tuning_params"]["fit_on_holdout"],
Expand Down
3 changes: 1 addition & 2 deletions lightautoml/automl/presets/text_presets.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from ...ml_algo.boost_lgbm import BoostLGBM
from ...ml_algo.dl_model import TorchModel
from ...ml_algo.linear_sklearn import LinearLBFGS
from ...ml_algo.tuning.optuna import DLOptunaTuner
from ...ml_algo.tuning.optuna import OptunaTuner
from ...pipelines.features.base import FeaturesPipeline
from ...pipelines.features.lgb_pipeline import LGBAdvancedPipeline
Expand Down Expand Up @@ -307,7 +306,7 @@ def get_nn(

if tuned:
nn_model.set_prefix("Tuned")
nn_tuner = DLOptunaTuner(
nn_tuner = OptunaTuner(
n_trials=model_params["tuning_params"]["max_tuning_iter"],
timeout=model_params["tuning_params"]["max_tuning_time"],
fit_on_holdout=model_params["tuning_params"]["fit_on_holdout"],
Expand Down
278 changes: 69 additions & 209 deletions lightautoml/ml_algo/tuning/optuna.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from typing import Union

import optuna
from tqdm import tqdm

from ...dataset.base import LAMLDataset
from ..base import MLAlgo
Expand All @@ -19,6 +20,7 @@
from .base import Uniform
from ...validation.base import HoldoutIterator
from ...validation.base import TrainValidIterator
from ...ml_algo.dl_model import TorchModel


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -135,6 +137,7 @@ def fit(
"""
assert not ml_algo.is_fitted, "Fitted algo cannot be tuned."
self._params_scores = []
# optuna.logging.set_verbosity(logger.getEffectiveLevel())
# upd timeout according to ml_algo timer
estimated_tuning_time = ml_algo.timer.estimate_tuner_time(len(train_valid_iterator))
Expand Down Expand Up @@ -172,6 +175,16 @@ def update_trial_time(study: optuna.study.Study, trial: optuna.trial.FrozenTrial
)

try:
is_nn = isinstance(ml_algo, TorchModel)
rows_num = train_valid_iterator.train.shape[0]

# get num of cpu for a process
num_cpu_per_process, n_jobs = self.get_num_cpu_n_jobs_for_optuna(
overall_num_cpu=ml_algo.params["num_threads"], rows_num=rows_num, is_nn=is_nn
)
ml_algo.default_params[
"thread_count"
] = num_cpu_per_process # get's num of cpu here when makes params for optuna optimisation

sampler = optuna.samplers.TPESampler(seed=self.random_state)
self.study = optuna.create_study(direction=self.direction, sampler=sampler)
Expand All @@ -186,10 +199,17 @@ def update_trial_time(study: optuna.study.Study, trial: optuna.trial.FrozenTrial
timeout=self.timeout,
callbacks=[update_trial_time],
# show_progress_bar=True,
n_jobs=n_jobs,
)

# need to update best params here
self._best_params = self.study.best_params
# self._best_params = self.study.best_params
if self.direction == "maximize":
self._best_params = max(self._params_scores, key=lambda x: x[1])[0]

else:
self._best_params = min(self._params_scores, key=lambda x: x[1])[0]

ml_algo.params = self._best_params

logger.info(f"Hyperparameters optimization for \x1b[1m{ml_algo._name}\x1b[0m completed")
Expand All @@ -198,13 +218,20 @@ def update_trial_time(study: optuna.study.Study, trial: optuna.trial.FrozenTrial
)

if flg_new_iterator:
# set defatult_params back to normal
ml_algo.default_params["thread_count"] = ml_algo.params["thread_count"]
del self._params_scores
# if tuner was fitted on holdout set we dont need to save train results
return None, None

preds_ds = ml_algo.fit_predict(train_valid_iterator)

# set defatult_params back to normal
ml_algo.default_params["thread_count"] = ml_algo.params["thread_count"]

return ml_algo, preds_ds
except optuna.exceptions.OptunaError:
del self._params_scores
return None, None

def _get_objective(
Expand All @@ -229,19 +256,24 @@ def _get_objective(

def objective(trial: optuna.trial.Trial) -> float:
_ml_algo = deepcopy(ml_algo)
is_dl_model = isinstance(_ml_algo, TorchModel)

optimization_search_space = _ml_algo.optimization_search_space

if not optimization_search_space:
optimization_search_space = _ml_algo._get_default_search_spaces(
suggested_params=_ml_algo.init_params_on_input(train_valid_iterator),
estimated_n_trials=estimated_n_trials,
)
if not is_dl_model:
optimization_search_space = _ml_algo._get_default_search_spaces(
suggested_params=_ml_algo.init_params_on_input(train_valid_iterator),
estimated_n_trials=estimated_n_trials,
)
else:
optimization_search_space = _ml_algo._default_sample

if callable(optimization_search_space):
_ml_algo.params = optimization_search_space(
trial=trial,
optimization_search_space=optimization_search_space,
estimated_n_trials=estimated_n_trials,
suggested_params=_ml_algo.init_params_on_input(train_valid_iterator),
)
else:
Expand All @@ -253,7 +285,9 @@ def objective(trial: optuna.trial.Trial) -> float:

output_dataset = _ml_algo.fit_predict(train_valid_iterator=train_valid_iterator)

return _ml_algo.score(output_dataset)
score = _ml_algo.score(output_dataset)
self._params_scores.append((_ml_algo.params, score))
return score

return objective

Expand Down Expand Up @@ -286,213 +320,39 @@ def plot(self):
"""Plot optimization history of all trials in a study."""
return optuna.visualization.plot_optimization_history(self.study)


class DLOptunaTuner(ParamsTuner):
"""Wrapper for optuna tuner.
Args:
timeout: Maximum learning time.
n_trials: Maximum number of trials.
direction: Direction of optimization.
Set ``minimize`` for minimization
and ``maximize`` for maximization.
fit_on_holdout: Will be used holdout cv-iterator.
random_state: Seed for optuna sampler.
"""

_name: str = "OptunaTuner"

study: optuna.study.Study = None
estimated_n_trials: int = None
mean_trial_time: Optional[int] = None

def __init__(
# TODO: For now, metric is designed to be greater is better. Change maximize param after metric refactor if needed
self,
timeout: Optional[int] = 1000,
n_trials: Optional[int] = 100,
direction: Optional[str] = "maximize",
fit_on_holdout: bool = True,
random_state: int = 42,
):
self.timeout = timeout
self.n_trials = n_trials
self.estimated_n_trials = n_trials
self.direction = direction
self._fit_on_holdout = fit_on_holdout
self.random_state = random_state

def _upd_timeout(self, timeout):
self.timeout = min(self.timeout, timeout)

def fit(
self,
ml_algo: TunableAlgo,
train_valid_iterator: Optional[TrainValidIterator] = None,
) -> Tuple[Optional[TunableAlgo], Optional[LAMLDataset]]:
"""Tune model.
def get_num_cpu_n_jobs_for_optuna(self, overall_num_cpu: int, rows_num: int, is_nn: bool = False):
"""Get the number of CPU needed per process and the number of processes,
taking into account the length of the dataset.
Args:
ml_algo: Algo that is tuned.
train_valid_iterator: Classic cv-iterator.
overall_num_cpu (int): Maximum number of CPUs available.
rows_num (int): Length of the dataset.
is_nn (bool, optional): Whether the task is a neural network task. Defaults to False.
Returns:
Tuple (None, None) if an optuna exception raised
or ``fit_on_holdout=True`` and ``train_valid_iterator`` is
not :class:`~lightautoml.validation.base.HoldoutIterator`.
Tuple (MlALgo, preds_ds) otherwise.
tuple: An empirical number of CPU for a process that works better for a specific dataset length,
and the number of processes.
"""
assert not ml_algo.is_fitted, "Fitted algo cannot be tuned."
self._params_scores = []

# optuna.logging.set_verbosity(get_stdout_level())
# upd timeout according to ml_algo timer
estimated_tuning_time = ml_algo.timer.estimate_tuner_time(len(train_valid_iterator))
if estimated_tuning_time:
# TODO: Check for minimal runtime!
estimated_tuning_time = max(estimated_tuning_time, 1)
self._upd_timeout(estimated_tuning_time)

logger.info(
f"Start hyperparameters optimization for \x1b[1m{ml_algo._name}\x1b[0m ... Time budget is {self.timeout:.2f} secs"
)

metric_name = train_valid_iterator.train.task.get_dataset_metric().name
ml_algo = deepcopy(ml_algo)

flg_new_iterator = False
if self._fit_on_holdout and type(train_valid_iterator) != HoldoutIterator:
train_valid_iterator = train_valid_iterator.convert_to_holdout_iterator()
flg_new_iterator = True

# TODO: Check if time estimation will be ok with multiprocessing
def update_trial_time(study: optuna.study.Study, trial: optuna.trial.FrozenTrial):
"""Callback for number of iteration with time cut-off.
Args:
study: Optuna study object.
trial: Optuna trial object.
"""
ml_algo.mean_trial_time = study.trials_dataframe()["duration"].mean().total_seconds()
self.estimated_n_trials = min(self.n_trials, self.timeout // ml_algo.mean_trial_time)

logger.info3(
f"\x1b[1mTrial {len(study.trials)}\x1b[0m with hyperparameters {trial.params} scored {trial.value} in {trial.duration}"
)

try:
sampler = optuna.samplers.TPESampler(seed=self.random_state)
self.study = optuna.create_study(direction=self.direction, sampler=sampler)

self.study.optimize(
func=self._get_objective(
ml_algo=ml_algo,
estimated_n_trials=self.estimated_n_trials,
train_valid_iterator=train_valid_iterator,
),
n_trials=self.n_trials,
timeout=self.timeout,
callbacks=[update_trial_time],
# show_progress_bar=True,
)

# need to update best params here
if self.direction == "maximize":
self._best_params = max(self._params_scores, key=lambda x: x[1])[0]
if is_nn:
return overall_num_cpu, 1 # TODO: test optuna parallelisation for nn

def helper_function(impericaly_needed_num_of_cpu):
# if num of cpu we have is less then 2*num_cpu needed for a proces then just use one job
if overall_num_cpu <= impericaly_needed_num_of_cpu * 2 - 1:
num_cpu_per_process = overall_num_cpu
n_jobs = 1
else:
self._best_params = min(self._params_scores, key=lambda x: x[1])[0]

ml_algo.params = self._best_params
del self._params_scores

logger.info(f"Hyperparameters optimization for \x1b[1m{ml_algo._name}\x1b[0m completed")
logger.info2(
f"The set of hyperparameters \x1b[1m{self._best_params}\x1b[0m\n achieve {self.study.best_value:.4f} {metric_name}"
)

if flg_new_iterator:
# if tuner was fitted on holdout set we dont need to save train results
return None, None

preds_ds = ml_algo.fit_predict(train_valid_iterator)

return ml_algo, preds_ds
except optuna.exceptions.OptunaError:
del self._params_scores
return None, None

def _get_objective(
self,
ml_algo: TunableAlgo,
estimated_n_trials: int,
train_valid_iterator: TrainValidIterator,
) -> Callable[[optuna.trial.Trial], Union[float, int]]:
"""Get objective.
Args:
ml_algo: Tunable algorithm.
estimated_n_trials: Maximum number of hyperparameter estimations.
train_valid_iterator: Used for getting parameters
depending on dataset.
Returns:
Callable objective.
"""
assert isinstance(ml_algo, MLAlgo)

def objective(trial: optuna.trial.Trial) -> float:
_ml_algo = deepcopy(ml_algo)

optimization_search_space = _ml_algo.optimization_search_space
if not optimization_search_space:
optimization_search_space = _ml_algo._default_sample

if callable(optimization_search_space):
sampled_params = optimization_search_space(
trial=trial,
estimated_n_trials=estimated_n_trials,
suggested_params=_ml_algo.init_params_on_input(train_valid_iterator),
)
else:
sampled_params = self._sample(
trial=trial,
optimization_search_space=optimization_search_space,
suggested_params=_ml_algo.init_params_on_input(train_valid_iterator),
)

_ml_algo.params = sampled_params
output_dataset = _ml_algo.fit_predict(train_valid_iterator=train_valid_iterator)
score = _ml_algo.score(output_dataset)
self._params_scores.append((sampled_params, score))
return score

return objective

def _sample(
self,
optimization_search_space,
trial: optuna.trial.Trial,
suggested_params: dict,
) -> dict:
# logger.info3(f'Suggested parameters: {suggested_params}')
trial_values = copy(suggested_params)
for parameter_name, search_space in optimization_search_space.items():
not_supported = True
for key_class in OPTUNA_DISTRIBUTIONS_MAP:
if isinstance(search_space, key_class):
wrapped_search_space = OPTUNA_DISTRIBUTIONS_MAP[key_class](search_space)
trial_values[parameter_name] = wrapped_search_space(
name=parameter_name,
trial=trial,
)
not_supported = False
if not_supported:
raise ValueError(f"Optuna does not support distribution {search_space}")
num_cpu_per_process = impericaly_needed_num_of_cpu
n_jobs = overall_num_cpu // num_cpu_per_process
return num_cpu_per_process, n_jobs

if rows_num <= 50_000:
num_cpu_per_process, n_jobs = helper_function(2)
elif rows_num <= 1_000_000:
num_cpu_per_process, n_jobs = helper_function(4)
elif rows_num <= 5_000_000:
num_cpu_per_process, n_jobs = helper_function(8)
else:
num_cpu_per_process, n_jobs = helper_function(16)

def plot(self):
"""Plot optimization history of all trials in a study."""
return optuna.visualization.plot_optimization_history(self.study)
return num_cpu_per_process, n_jobs

0 comments on commit c9e1379

Please sign in to comment.