Skip to content

Commit

Permalink
[tune] Fast Node Recovery (#5053)
Browse files Browse the repository at this point in the history
  • Loading branch information
richardliaw authored Jul 12, 2019
1 parent 0ec3a16 commit 1530389
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 29 deletions.
26 changes: 18 additions & 8 deletions python/ray/autoscaler/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ def prune(mapping):
"LoadMetrics: "
"Removed {} stale ip mappings: {} not in {}".format(
len(unwanted), unwanted, active_ips))
assert not (unwanted & set(mapping))

prune(self.last_used_time_by_ip)
prune(self.static_resources_by_ip)
Expand Down Expand Up @@ -266,16 +267,18 @@ def _info(self):


class NodeLauncher(threading.Thread):
def __init__(self, provider, queue, pending, *args, **kwargs):
def __init__(self, provider, queue, pending, index=None, *args, **kwargs):
self.queue = queue
self.pending = pending
self.provider = provider
self.index = str(index) if index is not None else ""
super(NodeLauncher, self).__init__(*args, **kwargs)

def _launch_node(self, config, count):
tag_filters = {TAG_RAY_NODE_TYPE: "worker"}
before = self.provider.non_terminated_nodes(tag_filters=tag_filters)
worker_filter = {TAG_RAY_NODE_TYPE: "worker"}
before = self.provider.non_terminated_nodes(tag_filters=worker_filter)
launch_hash = hash_launch_conf(config["worker_nodes"], config["auth"])
self.log("Launching {} nodes.".format(count))
self.provider.create_node(
config["worker_nodes"], {
TAG_RAY_NODE_NAME: "ray-{}-worker".format(
Expand All @@ -284,19 +287,25 @@ def _launch_node(self, config, count):
TAG_RAY_NODE_STATUS: "uninitialized",
TAG_RAY_LAUNCH_CONFIG: launch_hash,
}, count)
after = self.provider.non_terminated_nodes(tag_filters=tag_filters)
after = self.provider.non_terminated_nodes(tag_filters=worker_filter)
if set(after).issubset(before):
logger.error("NodeLauncher: "
"No new nodes reported after node creation")
self.log("No new nodes reported after node creation.")

def run(self):
while True:
config, count = self.queue.get()
self.log("Got {} nodes to launch.".format(count))
try:
self._launch_node(config, count)
except Exception:
logger.exception("Launch failed")
finally:
self.pending.dec(count)

def log(self, statement):
prefix = "NodeLauncher{}:".format(self.index)
logger.info(prefix + " {}".format(statement))


class ConcurrentCounter():
def __init__(self):
Expand Down Expand Up @@ -375,6 +384,7 @@ def __init__(self,
node_launcher = NodeLauncher(
provider=self.provider,
queue=self.launch_queue,
index=i,
pending=self.num_launches_pending)
node_launcher.daemon = True
node_launcher.start()
Expand Down Expand Up @@ -633,8 +643,8 @@ def can_update(self, node_id):
return True

def launch_new_node(self, count):
logger.info("StandardAutoscaler: "
"Launching {} new nodes".format(count))
logger.info(
"StandardAutoscaler: Queue {} new nodes for launch".format(count))
self.num_launches_pending.inc(count)
config = copy.deepcopy(self.config)
self.launch_queue.put((config, count))
Expand Down
50 changes: 32 additions & 18 deletions python/ray/autoscaler/aws/node_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@
import random
import threading
from collections import defaultdict
import logging

import boto3
import botocore
from botocore.config import Config

from ray.autoscaler.node_provider import NodeProvider
from ray.autoscaler.tags import TAG_RAY_CLUSTER_NAME, TAG_RAY_NODE_NAME
from ray.ray_constants import BOTO_MAX_RETRIES
from ray.autoscaler.log_timer import LogTimer

import logging
logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -207,23 +208,36 @@ def create_node(self, node_config, tags, count):
# SubnetIds is not a real config key: we must resolve to a
# single SubnetId before invoking the AWS API.
subnet_ids = conf.pop("SubnetIds")
subnet_id = subnet_ids[self.subnet_idx % len(subnet_ids)]
self.subnet_idx += 1
conf.update({
"MinCount": 1,
"MaxCount": count,
"SubnetId": subnet_id,
"TagSpecifications": tag_specs
})

logger.info(
"NodeProvider: Calling create_instances (count={}).".format(count))
L = self.ec2.create_instances(**conf)
for x in L:
logger.info("NodeProvider: Created instance "
"[id={}, name={}, info={}]".format(
x.instance_id, x.state["Name"],
x.state_reason["Message"]))

max_retries = 5
for attempt in range(1, max_retries + 1):
try:
subnet_id = subnet_ids[self.subnet_idx % len(subnet_ids)]
logger.info("NodeProvider: calling create_instances "
"with {} (count={}).".format(subnet_id, count))
self.subnet_idx += 1
conf.update({
"MinCount": 1,
"MaxCount": count,
"SubnetId": subnet_id,
"TagSpecifications": tag_specs
})
created = self.ec2.create_instances(**conf)
for instance in created:
logger.info("NodeProvider: Created instance "
"[id={}, name={}, info={}]".format(
instance.instance_id,
instance.state["Name"],
instance.state_reason["Message"]))
break
except botocore.exceptions.ClientError as exc:
if attempt == max_retries:
logger.error(
"create_instances: Max attempts ({}) exceeded.".format(
max_retries))
raise exc
else:
logger.error(exc)

def terminate_node(self, node_id):
node = self._get_cached_node(node_id)
Expand Down
6 changes: 5 additions & 1 deletion python/ray/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,12 @@ def client_table(self):
Information about the Ray clients in the cluster.
"""
self._check_connected()
client_table = _parse_client_table(self.redis_client)

return _parse_client_table(self.redis_client)
for client in client_table:
# These are equivalent and is better for application developers.
client["alive"] = client["IsInsertion"]
return client_table

def _job_table(self, job_id):
"""Fetch and parse the job table information for a single job ID.
Expand Down
33 changes: 31 additions & 2 deletions python/ray/tune/ray_trial_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,29 @@ def get_running_trials(self):

return list(self._running.values())

def get_alive_node_ips(self):
nodes = ray.state.nodes()
ip_addresses = set()
for node in nodes:
if node["alive"]:
ip_addresses.add(node["NodeManagerAddress"])
return ip_addresses

def get_current_trial_ips(self):
return {t.node_ip for t in self.get_running_trials()}

def get_next_available_trial(self):
if ray.worker._mode() != ray.worker.LOCAL_MODE:
live_cluster_ips = self.get_alive_node_ips()
if live_cluster_ips - self.get_current_trial_ips():
for trial in self.get_running_trials():
if trial.node_ip and trial.node_ip not in live_cluster_ips:
logger.warning(
"{} (ip: {}) detected as stale. This is likely "
"because the node was lost. Processing this "
"trial first.".format(trial, trial.node_ip))
return trial

shuffled_results = list(self._running.keys())
random.shuffle(shuffled_results)
# Note: We shuffle the results because `ray.wait` by default returns
Expand Down Expand Up @@ -541,8 +563,15 @@ def restore(self, trial, checkpoint=None):
assert type(value) != Checkpoint, type(value)
trial.runner.restore_from_object.remote(value)
else:
worker_ip = ray.get(trial.runner.current_ip.remote())
trial.sync_logger_to_new_location(worker_ip)
# TODO: Somehow, the call to get the current IP on the
# remote actor can be very slow - a better fix would
# be to use an actor table to detect the IP of the Trainable
# and rsync the files there.
# See https://github.com/ray-project/ray/issues/5168
with warn_if_slow("get_current_ip"):
worker_ip = ray.get(trial.runner.current_ip.remote())
with warn_if_slow("sync_to_new_location"):
trial.sync_logger_to_new_location(worker_ip)
with warn_if_slow("restore_from_disk"):
ray.get(trial.runner.restore.remote(value))
trial.last_result = checkpoint.last_result
Expand Down
1 change: 1 addition & 0 deletions python/ray/tune/trainable.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def resource_help(cls, config):
return ""

def current_ip(self):
logger.warning("Getting current IP.")
self._local_ip = ray.services.get_node_ip_address()
return self._local_ip

Expand Down
4 changes: 4 additions & 0 deletions python/ray/tune/trial.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,10 @@ def set_verbose(self, verbose):
def is_finished(self):
return self.status in [Trial.TERMINATED, Trial.ERROR]

@property
def node_ip(self):
return self.last_result.get("node_ip")

def __repr__(self):
return str(self)

Expand Down
11 changes: 11 additions & 0 deletions python/ray/tune/trial_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,9 +599,20 @@ def _requeue_trial(self, trial):
This does not notify the SearchAlgorithm because the function
evaluation is still in progress.
"""
self._scheduler_alg.on_trial_error(self, trial)
self.trial_executor.set_status(trial, Trial.PENDING)

# TODO(rliaw): Right now, this pushes the trial to the end of queue
# because restoration can be expensive. However, this is not
# ideal since it just hides the issue - a better fix would
# be to use an actor table to detect the IP of the Trainable
# and rsync the files there.
# See https://github.com/ray-project/ray/issues/5168
self._trials.pop(self._trials.index(trial))
self._trials.append(trial)

with warn_if_slow("scheduler.on_trial_add"):
self._scheduler_alg.on_trial_add(self, trial)

Expand Down

0 comments on commit 1530389

Please sign in to comment.