From c9e13792462c03cb4858fef9a7a1a2d08b7f9ec3 Mon Sep 17 00:00:00 2001 From: screengreen Date: Mon, 2 Sep 2024 15:37:06 +0300 Subject: [PATCH] added optuna paralleation for ml models and combined optunatuner and dloptunatuner classes into one optunatuner class --- lightautoml/automl/presets/tabular_presets.py | 3 +- lightautoml/automl/presets/text_presets.py | 3 +- lightautoml/ml_algo/tuning/optuna.py | 278 +++++------------- 3 files changed, 71 insertions(+), 213 deletions(-) diff --git a/lightautoml/automl/presets/tabular_presets.py b/lightautoml/automl/presets/tabular_presets.py index 092b870e..a001434f 100755 --- a/lightautoml/automl/presets/tabular_presets.py +++ b/lightautoml/automl/presets/tabular_presets.py @@ -30,7 +30,6 @@ from ...ml_algo.dl_model import TorchModel from ...ml_algo.linear_sklearn import LinearLBFGS from ...ml_algo.random_forest import RandomForestSklearn -from ...ml_algo.tuning.optuna import DLOptunaTuner from ...ml_algo.tuning.optuna import OptunaTuner from ...pipelines.features.lgb_pipeline import LGBAdvancedPipeline from ...pipelines.features.lgb_pipeline import LGBSeqSimpleFeatures @@ -444,7 +443,7 @@ def get_nn( if tuned: nn_model.set_prefix("Tuned") - nn_tuner = DLOptunaTuner( + nn_tuner = OptunaTuner( n_trials=model_params["tuning_params"]["max_tuning_iter"], timeout=model_params["tuning_params"]["max_tuning_time"], fit_on_holdout=model_params["tuning_params"]["fit_on_holdout"], diff --git a/lightautoml/automl/presets/text_presets.py b/lightautoml/automl/presets/text_presets.py index d79fdbb4..4458ab1d 100755 --- a/lightautoml/automl/presets/text_presets.py +++ b/lightautoml/automl/presets/text_presets.py @@ -22,7 +22,6 @@ from ...ml_algo.boost_lgbm import BoostLGBM from ...ml_algo.dl_model import TorchModel from ...ml_algo.linear_sklearn import LinearLBFGS -from ...ml_algo.tuning.optuna import DLOptunaTuner from ...ml_algo.tuning.optuna import OptunaTuner from ...pipelines.features.base import FeaturesPipeline from ...pipelines.features.lgb_pipeline import LGBAdvancedPipeline @@ -307,7 +306,7 @@ def get_nn( if tuned: nn_model.set_prefix("Tuned") - nn_tuner = DLOptunaTuner( + nn_tuner = OptunaTuner( n_trials=model_params["tuning_params"]["max_tuning_iter"], timeout=model_params["tuning_params"]["max_tuning_time"], fit_on_holdout=model_params["tuning_params"]["fit_on_holdout"], diff --git a/lightautoml/ml_algo/tuning/optuna.py b/lightautoml/ml_algo/tuning/optuna.py index eade5d12..1584f386 100644 --- a/lightautoml/ml_algo/tuning/optuna.py +++ b/lightautoml/ml_algo/tuning/optuna.py @@ -11,6 +11,7 @@ from typing import Union import optuna +from tqdm import tqdm from ...dataset.base import LAMLDataset from ..base import MLAlgo @@ -19,6 +20,7 @@ from .base import Uniform from ...validation.base import HoldoutIterator from ...validation.base import TrainValidIterator +from ...ml_algo.dl_model import TorchModel logger = logging.getLogger(__name__) @@ -135,6 +137,7 @@ def fit( """ assert not ml_algo.is_fitted, "Fitted algo cannot be tuned." + self._params_scores = [] # optuna.logging.set_verbosity(logger.getEffectiveLevel()) # upd timeout according to ml_algo timer estimated_tuning_time = ml_algo.timer.estimate_tuner_time(len(train_valid_iterator)) @@ -172,6 +175,16 @@ def update_trial_time(study: optuna.study.Study, trial: optuna.trial.FrozenTrial ) try: + is_nn = isinstance(ml_algo, TorchModel) + rows_num = train_valid_iterator.train.shape[0] + + # get num of cpu for a process + num_cpu_per_process, n_jobs = self.get_num_cpu_n_jobs_for_optuna( + overall_num_cpu=ml_algo.params["num_threads"], rows_num=rows_num, is_nn=is_nn + ) + ml_algo.default_params[ + "thread_count" + ] = num_cpu_per_process # get's num of cpu here when makes params for optuna optimisation sampler = optuna.samplers.TPESampler(seed=self.random_state) self.study = optuna.create_study(direction=self.direction, sampler=sampler) @@ -186,10 +199,17 @@ def update_trial_time(study: optuna.study.Study, trial: optuna.trial.FrozenTrial timeout=self.timeout, callbacks=[update_trial_time], # show_progress_bar=True, + n_jobs=n_jobs, ) # need to update best params here - self._best_params = self.study.best_params + # self._best_params = self.study.best_params + if self.direction == "maximize": + self._best_params = max(self._params_scores, key=lambda x: x[1])[0] + + else: + self._best_params = min(self._params_scores, key=lambda x: x[1])[0] + ml_algo.params = self._best_params logger.info(f"Hyperparameters optimization for \x1b[1m{ml_algo._name}\x1b[0m completed") @@ -198,13 +218,20 @@ def update_trial_time(study: optuna.study.Study, trial: optuna.trial.FrozenTrial ) if flg_new_iterator: + # set defatult_params back to normal + ml_algo.default_params["thread_count"] = ml_algo.params["thread_count"] + del self._params_scores # if tuner was fitted on holdout set we dont need to save train results return None, None preds_ds = ml_algo.fit_predict(train_valid_iterator) + # set defatult_params back to normal + ml_algo.default_params["thread_count"] = ml_algo.params["thread_count"] + return ml_algo, preds_ds except optuna.exceptions.OptunaError: + del self._params_scores return None, None def _get_objective( @@ -229,19 +256,24 @@ def _get_objective( def objective(trial: optuna.trial.Trial) -> float: _ml_algo = deepcopy(ml_algo) + is_dl_model = isinstance(_ml_algo, TorchModel) optimization_search_space = _ml_algo.optimization_search_space if not optimization_search_space: - optimization_search_space = _ml_algo._get_default_search_spaces( - suggested_params=_ml_algo.init_params_on_input(train_valid_iterator), - estimated_n_trials=estimated_n_trials, - ) + if not is_dl_model: + optimization_search_space = _ml_algo._get_default_search_spaces( + suggested_params=_ml_algo.init_params_on_input(train_valid_iterator), + estimated_n_trials=estimated_n_trials, + ) + else: + optimization_search_space = _ml_algo._default_sample if callable(optimization_search_space): _ml_algo.params = optimization_search_space( trial=trial, optimization_search_space=optimization_search_space, + estimated_n_trials=estimated_n_trials, suggested_params=_ml_algo.init_params_on_input(train_valid_iterator), ) else: @@ -253,7 +285,9 @@ def objective(trial: optuna.trial.Trial) -> float: output_dataset = _ml_algo.fit_predict(train_valid_iterator=train_valid_iterator) - return _ml_algo.score(output_dataset) + score = _ml_algo.score(output_dataset) + self._params_scores.append((_ml_algo.params, score)) + return score return objective @@ -286,213 +320,39 @@ def plot(self): """Plot optimization history of all trials in a study.""" return optuna.visualization.plot_optimization_history(self.study) - -class DLOptunaTuner(ParamsTuner): - """Wrapper for optuna tuner. - - Args: - timeout: Maximum learning time. - n_trials: Maximum number of trials. - direction: Direction of optimization. - Set ``minimize`` for minimization - and ``maximize`` for maximization. - fit_on_holdout: Will be used holdout cv-iterator. - random_state: Seed for optuna sampler. - - """ - - _name: str = "OptunaTuner" - - study: optuna.study.Study = None - estimated_n_trials: int = None - mean_trial_time: Optional[int] = None - - def __init__( - # TODO: For now, metric is designed to be greater is better. Change maximize param after metric refactor if needed - self, - timeout: Optional[int] = 1000, - n_trials: Optional[int] = 100, - direction: Optional[str] = "maximize", - fit_on_holdout: bool = True, - random_state: int = 42, - ): - self.timeout = timeout - self.n_trials = n_trials - self.estimated_n_trials = n_trials - self.direction = direction - self._fit_on_holdout = fit_on_holdout - self.random_state = random_state - - def _upd_timeout(self, timeout): - self.timeout = min(self.timeout, timeout) - - def fit( - self, - ml_algo: TunableAlgo, - train_valid_iterator: Optional[TrainValidIterator] = None, - ) -> Tuple[Optional[TunableAlgo], Optional[LAMLDataset]]: - """Tune model. + def get_num_cpu_n_jobs_for_optuna(self, overall_num_cpu: int, rows_num: int, is_nn: bool = False): + """Get the number of CPU needed per process and the number of processes, + taking into account the length of the dataset. Args: - ml_algo: Algo that is tuned. - train_valid_iterator: Classic cv-iterator. + overall_num_cpu (int): Maximum number of CPUs available. + rows_num (int): Length of the dataset. + is_nn (bool, optional): Whether the task is a neural network task. Defaults to False. Returns: - Tuple (None, None) if an optuna exception raised - or ``fit_on_holdout=True`` and ``train_valid_iterator`` is - not :class:`~lightautoml.validation.base.HoldoutIterator`. - Tuple (MlALgo, preds_ds) otherwise. - + tuple: An empirical number of CPU for a process that works better for a specific dataset length, + and the number of processes. """ - assert not ml_algo.is_fitted, "Fitted algo cannot be tuned." - self._params_scores = [] - - # optuna.logging.set_verbosity(get_stdout_level()) - # upd timeout according to ml_algo timer - estimated_tuning_time = ml_algo.timer.estimate_tuner_time(len(train_valid_iterator)) - if estimated_tuning_time: - # TODO: Check for minimal runtime! - estimated_tuning_time = max(estimated_tuning_time, 1) - self._upd_timeout(estimated_tuning_time) - - logger.info( - f"Start hyperparameters optimization for \x1b[1m{ml_algo._name}\x1b[0m ... Time budget is {self.timeout:.2f} secs" - ) - - metric_name = train_valid_iterator.train.task.get_dataset_metric().name - ml_algo = deepcopy(ml_algo) - - flg_new_iterator = False - if self._fit_on_holdout and type(train_valid_iterator) != HoldoutIterator: - train_valid_iterator = train_valid_iterator.convert_to_holdout_iterator() - flg_new_iterator = True - - # TODO: Check if time estimation will be ok with multiprocessing - def update_trial_time(study: optuna.study.Study, trial: optuna.trial.FrozenTrial): - """Callback for number of iteration with time cut-off. - - Args: - study: Optuna study object. - trial: Optuna trial object. - - """ - ml_algo.mean_trial_time = study.trials_dataframe()["duration"].mean().total_seconds() - self.estimated_n_trials = min(self.n_trials, self.timeout // ml_algo.mean_trial_time) - - logger.info3( - f"\x1b[1mTrial {len(study.trials)}\x1b[0m with hyperparameters {trial.params} scored {trial.value} in {trial.duration}" - ) - - try: - sampler = optuna.samplers.TPESampler(seed=self.random_state) - self.study = optuna.create_study(direction=self.direction, sampler=sampler) - - self.study.optimize( - func=self._get_objective( - ml_algo=ml_algo, - estimated_n_trials=self.estimated_n_trials, - train_valid_iterator=train_valid_iterator, - ), - n_trials=self.n_trials, - timeout=self.timeout, - callbacks=[update_trial_time], - # show_progress_bar=True, - ) - - # need to update best params here - if self.direction == "maximize": - self._best_params = max(self._params_scores, key=lambda x: x[1])[0] + if is_nn: + return overall_num_cpu, 1 # TODO: test optuna parallelisation for nn + + def helper_function(impericaly_needed_num_of_cpu): + # if num of cpu we have is less then 2*num_cpu needed for a proces then just use one job + if overall_num_cpu <= impericaly_needed_num_of_cpu * 2 - 1: + num_cpu_per_process = overall_num_cpu + n_jobs = 1 else: - self._best_params = min(self._params_scores, key=lambda x: x[1])[0] - - ml_algo.params = self._best_params - del self._params_scores - - logger.info(f"Hyperparameters optimization for \x1b[1m{ml_algo._name}\x1b[0m completed") - logger.info2( - f"The set of hyperparameters \x1b[1m{self._best_params}\x1b[0m\n achieve {self.study.best_value:.4f} {metric_name}" - ) - - if flg_new_iterator: - # if tuner was fitted on holdout set we dont need to save train results - return None, None - - preds_ds = ml_algo.fit_predict(train_valid_iterator) - - return ml_algo, preds_ds - except optuna.exceptions.OptunaError: - del self._params_scores - return None, None - - def _get_objective( - self, - ml_algo: TunableAlgo, - estimated_n_trials: int, - train_valid_iterator: TrainValidIterator, - ) -> Callable[[optuna.trial.Trial], Union[float, int]]: - """Get objective. - - Args: - ml_algo: Tunable algorithm. - estimated_n_trials: Maximum number of hyperparameter estimations. - train_valid_iterator: Used for getting parameters - depending on dataset. - - Returns: - Callable objective. - - """ - assert isinstance(ml_algo, MLAlgo) - - def objective(trial: optuna.trial.Trial) -> float: - _ml_algo = deepcopy(ml_algo) - - optimization_search_space = _ml_algo.optimization_search_space - if not optimization_search_space: - optimization_search_space = _ml_algo._default_sample - - if callable(optimization_search_space): - sampled_params = optimization_search_space( - trial=trial, - estimated_n_trials=estimated_n_trials, - suggested_params=_ml_algo.init_params_on_input(train_valid_iterator), - ) - else: - sampled_params = self._sample( - trial=trial, - optimization_search_space=optimization_search_space, - suggested_params=_ml_algo.init_params_on_input(train_valid_iterator), - ) - - _ml_algo.params = sampled_params - output_dataset = _ml_algo.fit_predict(train_valid_iterator=train_valid_iterator) - score = _ml_algo.score(output_dataset) - self._params_scores.append((sampled_params, score)) - return score - - return objective - - def _sample( - self, - optimization_search_space, - trial: optuna.trial.Trial, - suggested_params: dict, - ) -> dict: - # logger.info3(f'Suggested parameters: {suggested_params}') - trial_values = copy(suggested_params) - for parameter_name, search_space in optimization_search_space.items(): - not_supported = True - for key_class in OPTUNA_DISTRIBUTIONS_MAP: - if isinstance(search_space, key_class): - wrapped_search_space = OPTUNA_DISTRIBUTIONS_MAP[key_class](search_space) - trial_values[parameter_name] = wrapped_search_space( - name=parameter_name, - trial=trial, - ) - not_supported = False - if not_supported: - raise ValueError(f"Optuna does not support distribution {search_space}") + num_cpu_per_process = impericaly_needed_num_of_cpu + n_jobs = overall_num_cpu // num_cpu_per_process + return num_cpu_per_process, n_jobs + + if rows_num <= 50_000: + num_cpu_per_process, n_jobs = helper_function(2) + elif rows_num <= 1_000_000: + num_cpu_per_process, n_jobs = helper_function(4) + elif rows_num <= 5_000_000: + num_cpu_per_process, n_jobs = helper_function(8) + else: + num_cpu_per_process, n_jobs = helper_function(16) - def plot(self): - """Plot optimization history of all trials in a study.""" - return optuna.visualization.plot_optimization_history(self.study) + return num_cpu_per_process, n_jobs