Skip to content

Commit

Permalink
Take into account queue length in autoscaling (#5684)
Browse files Browse the repository at this point in the history
  • Loading branch information
ericl authored Sep 11, 2019
1 parent 9ce6dd9 commit 2fdefe1
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 46 deletions.
10 changes: 5 additions & 5 deletions doc/source/autoscaling.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ as described in `the boto docs <http://boto3.readthedocs.io/en/latest/guide/conf
Then you're ready to go. The provided `ray/python/ray/autoscaler/aws/example-full.yaml <https://github.com/ray-project/ray/tree/master/python/ray/autoscaler/aws/example-full.yaml>`__ cluster config file will create a small cluster with a m5.large head node (on-demand) configured to autoscale up to two m5.large `spot workers <https://aws.amazon.com/ec2/spot/>`__.

Try it out by running these commands from your personal computer. Once the cluster is started, you can then
SSH into the head node, ``source activate tensorflow_p36``, and then run Ray programs with ``ray.init(address="localhost:6379")``.
SSH into the head node, ``source activate tensorflow_p36``, and then run Ray programs with ``ray.init(address="auto")``.

.. code-block:: bash
Expand All @@ -37,7 +37,7 @@ First, install the Google API client (``pip install google-api-python-client``),
Then you're ready to go. The provided `ray/python/ray/autoscaler/gcp/example-full.yaml <https://github.com/ray-project/ray/tree/master/python/ray/autoscaler/gcp/example-full.yaml>`__ cluster config file will create a small cluster with a n1-standard-2 head node (on-demand) configured to autoscale up to two n1-standard-2 `preemptible workers <https://cloud.google.com/preemptible-vms/>`__. Note that you'll need to fill in your project id in those templates.

Try it out by running these commands from your personal computer. Once the cluster is started, you can then
SSH into the head node and then run Ray programs with ``ray.init(address="localhost:6379")``.
SSH into the head node and then run Ray programs with ``ray.init(address="auto")``.

.. code-block:: bash
Expand All @@ -59,7 +59,7 @@ This is used when you have a list of machine IP addresses to connect in a Ray cl
Be sure to specify the proper ``head_ip``, list of ``worker_ips``, and the ``ssh_user`` field.

Try it out by running these commands from your personal computer. Once the cluster is started, you can then
SSH into the head node and then run Ray programs with ``ray.init(address="localhost:6379")``.
SSH into the head node and then run Ray programs with ``ray.init(address="auto")``.

.. code-block:: bash
Expand All @@ -77,7 +77,7 @@ SSH into the head node and then run Ray programs with ``ray.init(address="localh
Running commands on new and existing clusters
---------------------------------------------

You can use ``ray exec`` to conveniently run commands on clusters. Note that scripts you run should connect to Ray via ``ray.init(address="localhost:6379")``.
You can use ``ray exec`` to conveniently run commands on clusters. Note that scripts you run should connect to Ray via ``ray.init(address="auto")``.

.. code-block:: bash
Expand Down Expand Up @@ -261,7 +261,7 @@ with GPU worker nodes instead.

.. code-block:: yaml
min_workers: 1 # must have at least 1 GPU worker (issue #2106)
min_workers: 0 # NOTE: older Ray versions may need 1+ GPU workers (#2106)
max_workers: 10
head_node:
InstanceType: m4.large
Expand Down
28 changes: 13 additions & 15 deletions python/ray/autoscaler/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import os
import subprocess
import threading
import traceback
import time
from collections import defaultdict

Expand Down Expand Up @@ -157,9 +156,11 @@ def __init__(self):
self.last_heartbeat_time_by_ip = {}
self.static_resources_by_ip = {}
self.dynamic_resources_by_ip = {}
self.resource_load_by_ip = {}
self.local_ip = services.get_node_ip_address()

def update(self, ip, static_resources, dynamic_resources):
def update(self, ip, static_resources, dynamic_resources, resource_load):
self.resource_load_by_ip[ip] = resource_load
self.static_resources_by_ip[ip] = static_resources

# We are not guaranteed to have a corresponding dynamic resource for
Expand Down Expand Up @@ -204,6 +205,7 @@ def prune(mapping):
prune(self.last_used_time_by_ip)
prune(self.static_resources_by_ip)
prune(self.dynamic_resources_by_ip)
prune(self.resource_load_by_ip)
prune(self.last_heartbeat_time_by_ip)

def approx_workers_used(self):
Expand All @@ -218,7 +220,11 @@ def get_resource_usage(self):
resources_total = {}
for ip, max_resources in self.static_resources_by_ip.items():
avail_resources = self.dynamic_resources_by_ip[ip]
resource_load = self.resource_load_by_ip[ip]
max_frac = 0.0
for resource_id, amount in resource_load.items():
if amount > 0:
max_frac = 1.0 # the resource is saturated
for resource_id, amount in max_resources.items():
used = amount - avail_resources[resource_id]
if resource_id not in resources_used:
Expand Down Expand Up @@ -722,19 +728,11 @@ def request_resources(self, resources):

def kill_workers(self):
logger.error("StandardAutoscaler: kill_workers triggered")

while True:
try:
nodes = self.workers()
if nodes:
self.provider.terminate_nodes(nodes)
logger.error(
"StandardAutoscaler: terminated {} node(s)".format(
len(nodes)))
except Exception:
traceback.print_exc()

time.sleep(10)
nodes = self.workers()
if nodes:
self.provider.terminate_nodes(nodes)
logger.error("StandardAutoscaler: terminated {} node(s)".format(
len(nodes)))


def typename(v):
Expand Down
3 changes: 2 additions & 1 deletion python/ray/log_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ def update_log_filenames(self):
log_file_paths = glob.glob("{}/worker*[.out|.err]".format(
self.logs_dir))
# segfaults and other serious errors are logged here
raylet_err_paths = glob.glob("{}/raylet*.err".format(self.logs_dir))
raylet_err_paths = (glob.glob("{}/raylet*.err".format(self.logs_dir)) +
glob.glob("{}/monitor*.err".format(self.logs_dir)))
for file_path in log_file_paths + raylet_err_paths:
if os.path.isfile(
file_path) and file_path not in self.log_filenames:
Expand Down
6 changes: 5 additions & 1 deletion python/ray/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ def xray_heartbeat_batch_handler(self, unused_channel, data):
message = ray.gcs_utils.HeartbeatBatchTableData.FromString(
heartbeat_data)
for heartbeat_message in message.batch:
resource_load = dict(
zip(heartbeat_message.resource_load_label,
heartbeat_message.resource_load_capacity))
total_resources = dict(
zip(heartbeat_message.resources_total_label,
heartbeat_message.resources_total_capacity))
Expand All @@ -122,7 +125,7 @@ def xray_heartbeat_batch_handler(self, unused_channel, data):
ip = self.raylet_id_to_ip_map.get(client_id)
if ip:
self.load_metrics.update(ip, total_resources,
available_resources)
available_resources, resource_load)
else:
logger.warning(
"Monitor: "
Expand Down Expand Up @@ -357,6 +360,7 @@ def run(self):
try:
self._run()
except Exception:
logger.exception("Error in monitor loop")
if self.autoscaler:
self.autoscaler.kill_workers()
raise
Expand Down
55 changes: 33 additions & 22 deletions python/ray/tests/test_autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,45 +142,56 @@ def terminate_node(self, node_id):
class LoadMetricsTest(unittest.TestCase):
def testUpdate(self):
lm = LoadMetrics()
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1})
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {})
assert lm.approx_workers_used() == 0.5
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0})
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {})
assert lm.approx_workers_used() == 1.0
lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 0})
lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 0}, {})
assert lm.approx_workers_used() == 2.0

def testLoadMessages(self):
lm = LoadMetrics()
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {})
assert lm.approx_workers_used() == 0.5
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {"CPU": 1})
assert lm.approx_workers_used() == 1.0
lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 1}, {})
assert lm.approx_workers_used() == 1.5
lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 1}, {"GPU": 1})
assert lm.approx_workers_used() == 2.0

def testPruneByNodeIp(self):
lm = LoadMetrics()
lm.update("1.1.1.1", {"CPU": 1}, {"CPU": 0})
lm.update("2.2.2.2", {"CPU": 1}, {"CPU": 0})
lm.update("1.1.1.1", {"CPU": 1}, {"CPU": 0}, {})
lm.update("2.2.2.2", {"CPU": 1}, {"CPU": 0}, {})
lm.prune_active_ips({"1.1.1.1", "4.4.4.4"})
assert lm.approx_workers_used() == 1.0

def testBottleneckResource(self):
lm = LoadMetrics()
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0})
lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2})
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {})
lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2}, {})
assert lm.approx_workers_used() == 1.88

def testHeartbeat(self):
lm = LoadMetrics()
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1})
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {})
lm.mark_active("2.2.2.2")
assert "1.1.1.1" in lm.last_heartbeat_time_by_ip
assert "2.2.2.2" in lm.last_heartbeat_time_by_ip
assert "3.3.3.3" not in lm.last_heartbeat_time_by_ip

def testDebugString(self):
lm = LoadMetrics()
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0})
lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2})
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {})
lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2}, {})
lm.update("3.3.3.3", {
"memory": 20,
"object_store_memory": 40
}, {
"memory": 0,
"object_store_memory": 20
})
}, {})
debug = lm.info_string()
assert ("ResourceUsage=2.0/4.0 CPU, 14.0/16.0 GPU, "
"1.05 GiB/1.05 GiB memory, "
Expand Down Expand Up @@ -418,8 +429,8 @@ def testAggressiveAutoscaling(self):
tag_filters={TAG_RAY_NODE_TYPE: "worker"}, )
addrs += head_ip
for addr in addrs:
lm.update(addr, {"CPU": 2}, {"CPU": 0})
lm.update(addr, {"CPU": 2}, {"CPU": 2})
lm.update(addr, {"CPU": 2}, {"CPU": 0}, {})
lm.update(addr, {"CPU": 2}, {"CPU": 2}, {})
assert autoscaler.bringup
autoscaler.update()

Expand All @@ -428,7 +439,7 @@ def testAggressiveAutoscaling(self):
self.waitForNodes(1)

# All of the nodes are down. Simulate some load on the head node
lm.update(head_ip, {"CPU": 2}, {"CPU": 0})
lm.update(head_ip, {"CPU": 2}, {"CPU": 0}, {})

autoscaler.update()
self.waitForNodes(6) # expected due to batch sizes and concurrency
Expand Down Expand Up @@ -702,17 +713,17 @@ def testScaleUpBasedOnLoad(self):

# Scales up as nodes are reported as used
local_ip = services.get_node_ip_address()
lm.update(local_ip, {"CPU": 2}, {"CPU": 0}) # head
lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 0}) # worker 1
lm.update(local_ip, {"CPU": 2}, {"CPU": 0}, {}) # head
lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 0}, {}) # worker 1
autoscaler.update()
self.waitForNodes(3)
lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 0})
lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 0}, {})
autoscaler.update()
self.waitForNodes(5)

# Holds steady when load is removed
lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 2})
lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 2})
lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 2}, {})
lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 2}, {})
autoscaler.update()
assert autoscaler.num_launches_pending.value == 0
assert len(self.provider.non_terminated_nodes({})) == 5
Expand Down Expand Up @@ -746,20 +757,20 @@ def testDontScaleBelowTarget(self):

# Scales up as nodes are reported as used
local_ip = services.get_node_ip_address()
lm.update(local_ip, {"CPU": 2}, {"CPU": 0}) # head
lm.update(local_ip, {"CPU": 2}, {"CPU": 0}, {}) # head
# 1.0 nodes used => target nodes = 2 => target workers = 1
autoscaler.update()
self.waitForNodes(1)

# Make new node idle, and never used.
# Should hold steady as target is still 2.
lm.update("172.0.0.0", {"CPU": 0}, {"CPU": 0})
lm.update("172.0.0.0", {"CPU": 0}, {"CPU": 0}, {})
lm.last_used_time_by_ip["172.0.0.0"] = 0
autoscaler.update()
assert len(self.provider.non_terminated_nodes({})) == 1

# Reduce load on head => target nodes = 1 => target workers = 0
lm.update(local_ip, {"CPU": 2}, {"CPU": 1})
lm.update(local_ip, {"CPU": 2}, {"CPU": 1}, {})
autoscaler.update()
assert len(self.provider.non_terminated_nodes({})) == 0

Expand Down
7 changes: 5 additions & 2 deletions src/ray/raylet/scheduling_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,11 @@ const Task &SchedulingQueue::GetTaskOfState(const TaskID &task_id,
}

ResourceSet SchedulingQueue::GetResourceLoad() const {
// TODO(atumanov): consider other types of tasks as part of load.
return ready_queue_->GetCurrentResourceLoad();
auto load = ready_queue_->GetCurrentResourceLoad();
// Also take into account infeasible tasks so they show up for autoscaling.
load.AddResources(
task_queues_[static_cast<int>(TaskState::INFEASIBLE)]->GetCurrentResourceLoad());
return load;
}

const std::unordered_set<TaskID> &SchedulingQueue::GetBlockedTaskIds() const {
Expand Down

0 comments on commit 2fdefe1

Please sign in to comment.