Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[tune] Limit maximum number of pending trials. Add convergence test. #14835

Merged
merged 8 commits into from
Mar 24, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion doc/source/tune/user-guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -759,14 +759,18 @@ These are the environment variables Ray Tune currently considers:
* **TUNE_MAX_LEN_IDENTIFIER**: Maximum length of trial subdirectory names (those
with the parameter values in them)
* **TUNE_MAX_PENDING_TRIALS_PG**: Maximum number of pending trials when placement groups are used. Defaults
to ``1000``.
to ``-1``, which will be updated to ``1000`` for random/grid search and `1` for any other search algorithms.
* **TUNE_PLACEMENT_GROUP_AUTO_DISABLED**: Ray Tune automatically uses placement groups
instead of the legacy resource requests. Setting this to 1 enables legacy placement.
* **TUNE_PLACEMENT_GROUP_CLEANUP_DISABLED**: Ray Tune cleans up existing placement groups
with the ``_tune__`` prefix in their name before starting a run. This is used to make sure
that scheduled placement groups are removed when multiple calls to ``tune.run()`` are
done in the same script. You might want to disable this if you run multiple Tune runs in
parallel from different scripts. Set to 1 to disable.
* **TUNE_PLACEMENT_GROUP_PREFIX**: Prefix for placement groups created by Ray Tune. This prefix is used
e.g. to identify placementgroups that should be cleaned up on start/stop of the tuning run. If you want to
run more than one Ray Tune run in parallel, each of the parallel trials should set this to a different
unique value.
* **TUNE_PLACEMENT_GROUP_WAIT_S**: Default time the trial executor waits for placement
groups to be placed before continuing the tuning loop. Setting this to a float
will block for that many seconds. This is mostly used for testing purposes. Defaults
Expand Down
4 changes: 2 additions & 2 deletions python/ray/tune/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ py_test(
)

py_test(
name = "test_convergence_gaussian_process",
name = "test_convergence",
size = "small",
srcs = ["tests/test_convergence_gaussian_process.py"],
srcs = ["tests/test_convergence.py"],
deps = [":tune_lib"],
tags = ["exclusive"],
)
Expand Down
6 changes: 5 additions & 1 deletion python/ray/tune/ray_trial_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ def __init__(self,

self._avail_resources = Resources(cpu=0, gpu=0)
self._committed_resources = Resources(cpu=0, gpu=0)
self._pg_manager = PlacementGroupManager()
self._pg_manager = PlacementGroupManager(
prefix=os.getenv("TUNE_PLACEMENT_GROUP_PREFIX", "__tune__"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add a hex to differentiate among different Tune runs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just the prefix here - and it's used to identify leftover placement groups to remove before starting a new Tune run. It thus has to be constant across runs, otherwise removal wouldn't make sense. The trial PGs are actually using unique hex identifiers.

I guess one possibility would be to create a hex, store it in a global variable, and re-use this for sequential runs. Effectively this would mean the auto-removal process will only be triggered for sequential runs (such as in our tests). Parallel trials in different remote functions would work out of the box. Parallel trials using shared global state (threads?) would still interfere, but they do this right now, too.

Hm, this might be a good idea, I'll think about it a bit more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up implementing this - I'll see if the tests pass, but the examples in the issues run for me without problems (other than setting a separate logdir)

self._staged_trials = set()
self._just_staged_trials = set()
self._trial_just_finished = False
Expand Down Expand Up @@ -197,6 +198,9 @@ def in_staging_grace_period(self) -> bool:
"""Returns True if trials have recently been staged."""
return self._pg_manager.in_staging_grace_period()

def set_max_pending_trials(self, max_pending: int):
self._pg_manager.set_max_staging(max_pending)

def stage_and_update_status(self, trials: List[Trial]):
"""Check and update statuses of scheduled placement groups.

Expand Down
1 change: 1 addition & 0 deletions python/ray/tune/stopper.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ def stop_all(self):
return self.has_plateaued() and self._iterations >= self._patience


# Deprecate: 1.4
class EarlyStopping(ExperimentPlateauStopper):
def __init__(self, *args, **kwargs):
warnings.warn(
Expand Down
3 changes: 2 additions & 1 deletion python/ray/tune/suggest/ax.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import copy
from typing import Dict, List, Optional, Union

from ax.exceptions.core import DataRequiredError
from ray.tune.result import DEFAULT_METRIC
from ray.tune.sample import Categorical, Float, Integer, LogUniform, \
Quantized, Uniform
Expand Down Expand Up @@ -262,7 +263,7 @@ def suggest(self, trial_id: str) -> Optional[Dict]:
else:
try:
parameters, trial_index = self._ax.get_next_trial()
except MaxParallelismReachedException:
except (MaxParallelismReachedException, DataRequiredError):
return None

self._live_trial_mapping[trial_id] = trial_index
Expand Down
12 changes: 0 additions & 12 deletions python/ray/tune/suggest/dragonfly.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import inspect
import logging
import pickle
from typing import Dict, List, Optional, Union

from ray.tune.result import DEFAULT_METRIC
Expand Down Expand Up @@ -331,17 +330,6 @@ def on_trial_complete(self,
self._opt.tell([(trial_info,
self._metric_op * result[self._metric])])

def save(self, checkpoint_path: str):
trials_object = (self._initial_points, self._opt)
with open(checkpoint_path, "wb") as outputFile:
pickle.dump(trials_object, outputFile)

def restore(self, checkpoint_dir: str):
with open(checkpoint_dir, "rb") as inputFile:
trials_object = pickle.load(inputFile)
self._initial_points = trials_object[0]
self._opt = trials_object[1]

@staticmethod
def convert_search_space(spec: Dict) -> List[Dict]:
resolved_vars, domain_vars, grid_vars = parse_spec_vars(spec)
Expand Down
10 changes: 7 additions & 3 deletions python/ray/tune/suggest/nevergrad.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import inspect
import logging
import pickle
from typing import Dict, Optional, Union, List, Sequence
from typing import Dict, Optional, Type, Union, List, Sequence

from ray.tune.result import DEFAULT_METRIC
from ray.tune.sample import Categorical, Domain, Float, Integer, LogUniform, \
Expand Down Expand Up @@ -108,7 +109,8 @@ class NevergradSearch(Searcher):
"""

def __init__(self,
optimizer: Union[None, Optimizer, ConfiguredOptimizer] = None,
optimizer: Union[None, Optimizer, Type[Optimizer],
ConfiguredOptimizer] = None,
space: Optional[Union[Dict, Parameter]] = None,
metric: Optional[str] = None,
mode: Optional[str] = None,
Expand Down Expand Up @@ -154,7 +156,9 @@ def __init__(self,
"parameter.")
self._parameters = space
self._nevergrad_opt = optimizer
elif isinstance(optimizer, ConfiguredOptimizer):
elif (inspect.isclass(optimizer)
and issubclass(optimizer, Optimizer)) or isinstance(
optimizer, ConfiguredOptimizer):
self._opt_factory = optimizer
self._parameters = None
self._space = space
Expand Down
4 changes: 2 additions & 2 deletions python/ray/tune/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,11 +364,11 @@ def test_trial_migration(start_connected_emptyhead_cluster, trainable_id):

@pytest.mark.parametrize("trainable_id", ["__fake", "__fake_durable"])
@pytest.mark.parametrize("with_pg", [True, False])
@patch("ray.tune.trial_runner.TUNE_MAX_PENDING_TRIALS_PG", 1)
@patch("ray.tune.utils.placement_groups.TUNE_MAX_PENDING_TRIALS_PG", 1)
def test_trial_requeue(start_connected_emptyhead_cluster, trainable_id,
with_pg):
"""Removing a node in full cluster causes Trial to be requeued."""
os.environ["TUNE_MAX_PENDING_TRIALS_PG"] = "1"

if not with_pg:
os.environ["TUNE_PLACEMENT_GROUP_AUTO_DISABLED"] = "1"

Expand Down
156 changes: 156 additions & 0 deletions python/ray/tune/tests/test_convergence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import math
import numpy as np

import ray
from ray import tune
from ray.tune.stopper import ExperimentPlateauStopper
from ray.tune.suggest import ConcurrencyLimiter
import unittest


def loss(config, reporter):
x = config.get("x")
reporter(loss=x**2) # A simple function to optimize


class ConvergenceTest(unittest.TestCase):
"""Test convergence in gaussian process."""

@classmethod
def setUpClass(cls) -> None:
ray.init(local_mode=False, num_cpus=1, num_gpus=0)

@classmethod
def tearDownClass(cls) -> None:
ray.shutdown()

def _testConvergence(self, searcher, top=3, patience=20):
# This is the space of parameters to explore
space = {"x": tune.uniform(0, 20)}

resources_per_trial = {"cpu": 1, "gpu": 0}

analysis = tune.run(
loss,
metric="loss",
mode="min",
stop=ExperimentPlateauStopper(
metric="loss", top=top, patience=patience),
search_alg=searcher,
config=space,
num_samples=100, # Number of iterations
resources_per_trial=resources_per_trial,
raise_on_failed_trial=False,
fail_fast=True,
reuse_actors=True,
verbose=1)
print(f"Num trials: {len(analysis.trials)}. "
f"Best result: {analysis.best_config['x']}")

return analysis

def testConvergenceAx(self):
from ray.tune.suggest.ax import AxSearch

np.random.seed(0)

searcher = AxSearch()
analysis = self._testConvergence(searcher, patience=10)

assert math.isclose(analysis.best_config["x"], 0, abs_tol=1e-5)

def testConvergenceBayesOpt(self):
from ray.tune.suggest.bayesopt import BayesOptSearch

np.random.seed(0)

# Following bayesian optimization
searcher = BayesOptSearch(random_search_steps=10)
searcher.repeat_float_precision = 5
searcher = ConcurrencyLimiter(searcher, 1)

analysis = self._testConvergence(searcher)

assert len(analysis.trials) < 50
assert math.isclose(analysis.best_config["x"], 0, abs_tol=1e-5)

def testConvergenceDragonfly(self):
from ray.tune.suggest.dragonfly import DragonflySearch

np.random.seed(0)
searcher = DragonflySearch(domain="euclidean", optimizer="bandit")
analysis = self._testConvergence(searcher)

assert len(analysis.trials) < 100
assert math.isclose(analysis.best_config["x"], 0, abs_tol=1e-5)

def testConvergenceHEBO(self):
from ray.tune.suggest.hebo import HEBOSearch

np.random.seed(0)
searcher = HEBOSearch()
analysis = self._testConvergence(searcher)

assert len(analysis.trials) < 100
assert math.isclose(analysis.best_config["x"], 0, abs_tol=1e-2)

def testConvergenceHyperopt(self):
from ray.tune.suggest.hyperopt import HyperOptSearch

np.random.seed(0)
searcher = HyperOptSearch(random_state_seed=1234)
analysis = self._testConvergence(searcher, patience=50, top=5)

assert math.isclose(analysis.best_config["x"], 0, abs_tol=1e-2)

def testConvergenceNevergrad(self):
from ray.tune.suggest.nevergrad import NevergradSearch
import nevergrad as ng

np.random.seed(0)
searcher = NevergradSearch(optimizer=ng.optimizers.PSO)
analysis = self._testConvergence(searcher, patience=50, top=5)

assert math.isclose(analysis.best_config["x"], 0, abs_tol=1e-3)

def testConvergenceOptuna(self):
from ray.tune.suggest.optuna import OptunaSearch

np.random.seed(0)
searcher = OptunaSearch()
analysis = self._testConvergence(
searcher,
top=3,
)

# This assertion is much weaker than in the BO case, but TPE
# don't converge too close. It is still unlikely to get to this
# tolerance with random search (~0.01% chance)
assert len(analysis.trials) < 100
assert math.isclose(analysis.best_config["x"], 0, abs_tol=1e-2)

def testConvergenceSkOpt(self):
from ray.tune.suggest.skopt import SkOptSearch

np.random.seed(0)
searcher = SkOptSearch()
analysis = self._testConvergence(searcher)

assert len(analysis.trials) < 100
assert math.isclose(analysis.best_config["x"], 0, abs_tol=1e-3)

def testConvergenceZoopt(self):
from ray.tune.suggest.zoopt import ZOOptSearch

np.random.seed(0)
searcher = ZOOptSearch(budget=100)
analysis = self._testConvergence(searcher)

assert len(analysis.trials) < 100
assert math.isclose(analysis.best_config["x"], 0, abs_tol=1e-3)


if __name__ == "__main__":
import pytest
import sys
sys.exit(pytest.main(["-v", __file__]))
56 changes: 0 additions & 56 deletions python/ray/tune/tests/test_convergence_gaussian_process.py

This file was deleted.

5 changes: 2 additions & 3 deletions python/ray/tune/tests/test_trial_runner.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os
import sys
import unittest
from unittest.mock import patch

import ray
from ray.rllib import _register_all
Expand Down Expand Up @@ -294,9 +293,9 @@ def on_trial_result(self, trial_runner, trial, result):
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(runner.trial_executor._committed_resources.cpu, 2)

@patch("ray.tune.trial_runner.TUNE_MAX_PENDING_TRIALS_PG", 1)
@patch("ray.tune.utils.placement_groups.TUNE_MAX_PENDING_TRIALS_PG", 1)
def testQueueFilling(self):
os.environ["TUNE_MAX_PENDING_TRIALS_PG"] = "1"

ray.init(num_cpus=4)

def f1(config):
Expand Down
Loading