From f6ac68724bb918dc2ce195374cf4454fbaaee6da Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 10 Sep 2019 17:15:55 -0700 Subject: [PATCH 1/5] fix it --- doc/source/autoscaling.rst | 2 +- python/ray/autoscaler/autoscaler.py | 9 ++++- python/ray/log_monitor.py | 3 +- python/ray/monitor.py | 6 +++- python/ray/tests/test_autoscaler.py | 55 +++++++++++++++++------------ src/ray/raylet/scheduling_queue.cc | 7 ++-- 6 files changed, 54 insertions(+), 28 deletions(-) diff --git a/doc/source/autoscaling.rst b/doc/source/autoscaling.rst index f1b41440784b2..b04176e2a29b3 100644 --- a/doc/source/autoscaling.rst +++ b/doc/source/autoscaling.rst @@ -261,7 +261,7 @@ with GPU worker nodes instead. .. code-block:: yaml - min_workers: 1 # must have at least 1 GPU worker (issue #2106) + min_workers: 0 # NOTE: older Ray versions may need 1+ GPU workers (#2106) max_workers: 10 head_node: InstanceType: m4.large diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index 8873fec717296..40ffbddd913af 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -157,9 +157,11 @@ def __init__(self): self.last_heartbeat_time_by_ip = {} self.static_resources_by_ip = {} self.dynamic_resources_by_ip = {} + self.resource_load_by_ip = {} self.local_ip = services.get_node_ip_address() - def update(self, ip, static_resources, dynamic_resources): + def update(self, ip, static_resources, dynamic_resources, resource_load): + self.resource_load_by_ip[ip] = resource_load self.static_resources_by_ip[ip] = static_resources # We are not guaranteed to have a corresponding dynamic resource for @@ -204,6 +206,7 @@ def prune(mapping): prune(self.last_used_time_by_ip) prune(self.static_resources_by_ip) prune(self.dynamic_resources_by_ip) + prune(self.resource_load_by_ip) prune(self.last_heartbeat_time_by_ip) def approx_workers_used(self): @@ -218,7 +221,11 @@ def get_resource_usage(self): resources_total = {} for ip, max_resources in self.static_resources_by_ip.items(): avail_resources = self.dynamic_resources_by_ip[ip] + resource_load = self.resource_load_by_ip[ip] max_frac = 0.0 + for resource_id, amount in resource_load.items(): + if amount > 0: + max_frac = 1.0 # the resource is saturated for resource_id, amount in max_resources.items(): used = amount - avail_resources[resource_id] if resource_id not in resources_used: diff --git a/python/ray/log_monitor.py b/python/ray/log_monitor.py index 37239950e1101..b0427b1d4e10e 100644 --- a/python/ray/log_monitor.py +++ b/python/ray/log_monitor.py @@ -115,7 +115,8 @@ def update_log_filenames(self): log_file_paths = glob.glob("{}/worker*[.out|.err]".format( self.logs_dir)) # segfaults and other serious errors are logged here - raylet_err_paths = glob.glob("{}/raylet*.err".format(self.logs_dir)) + raylet_err_paths = (glob.glob("{}/raylet*.err".format(self.logs_dir)) + + glob.glob("{}/monitor*.err".format(self.logs_dir))) for file_path in log_file_paths + raylet_err_paths: if os.path.isfile( file_path) and file_path not in self.log_filenames: diff --git a/python/ray/monitor.py b/python/ray/monitor.py index c4531d8b3610e..1cd7da8f471c1 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -108,6 +108,9 @@ def xray_heartbeat_batch_handler(self, unused_channel, data): message = ray.gcs_utils.HeartbeatBatchTableData.FromString( heartbeat_data) for heartbeat_message in message.batch: + resource_load = dict( + zip(heartbeat_message.resource_load_label, + heartbeat_message.resource_load_capacity)) total_resources = dict( zip(heartbeat_message.resources_total_label, heartbeat_message.resources_total_capacity)) @@ -120,9 +123,10 @@ def xray_heartbeat_batch_handler(self, unused_channel, data): # Update the load metrics for this raylet. client_id = ray.utils.binary_to_hex(heartbeat_message.client_id) ip = self.raylet_id_to_ip_map.get(client_id) + logger.error("RESOURCE LOAD {}".format(resource_load)) if ip: self.load_metrics.update(ip, total_resources, - available_resources) + available_resources, resource_load) else: logger.warning( "Monitor: " diff --git a/python/ray/tests/test_autoscaler.py b/python/ray/tests/test_autoscaler.py index 36d55e940f26d..52dc87f183271 100644 --- a/python/ray/tests/test_autoscaler.py +++ b/python/ray/tests/test_autoscaler.py @@ -142,29 +142,40 @@ def terminate_node(self, node_id): class LoadMetricsTest(unittest.TestCase): def testUpdate(self): lm = LoadMetrics() - lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}) + lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {}) assert lm.approx_workers_used() == 0.5 - lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}) + lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {}) assert lm.approx_workers_used() == 1.0 - lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 0}) + lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 0}, {}) + assert lm.approx_workers_used() == 2.0 + + def testLoadMessages(self): + lm = LoadMetrics() + lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {}) + assert lm.approx_workers_used() == 0.5 + lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {"CPU": 1}) + assert lm.approx_workers_used() == 1.0 + lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 1}, {}) + assert lm.approx_workers_used() == 1.5 + lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 1}, {"GPU": 1}) assert lm.approx_workers_used() == 2.0 def testPruneByNodeIp(self): lm = LoadMetrics() - lm.update("1.1.1.1", {"CPU": 1}, {"CPU": 0}) - lm.update("2.2.2.2", {"CPU": 1}, {"CPU": 0}) + lm.update("1.1.1.1", {"CPU": 1}, {"CPU": 0}, {}) + lm.update("2.2.2.2", {"CPU": 1}, {"CPU": 0}, {}) lm.prune_active_ips({"1.1.1.1", "4.4.4.4"}) assert lm.approx_workers_used() == 1.0 def testBottleneckResource(self): lm = LoadMetrics() - lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}) - lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2}) + lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {}) + lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2}, {}) assert lm.approx_workers_used() == 1.88 def testHeartbeat(self): lm = LoadMetrics() - lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}) + lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {}) lm.mark_active("2.2.2.2") assert "1.1.1.1" in lm.last_heartbeat_time_by_ip assert "2.2.2.2" in lm.last_heartbeat_time_by_ip @@ -172,15 +183,15 @@ def testHeartbeat(self): def testDebugString(self): lm = LoadMetrics() - lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}) - lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2}) + lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {}) + lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2}, {}) lm.update("3.3.3.3", { "memory": 20, "object_store_memory": 40 }, { "memory": 0, "object_store_memory": 20 - }) + }, {}) debug = lm.info_string() assert ("ResourceUsage=2.0/4.0 CPU, 14.0/16.0 GPU, " "1.05 GiB/1.05 GiB memory, " @@ -418,8 +429,8 @@ def testAggressiveAutoscaling(self): tag_filters={TAG_RAY_NODE_TYPE: "worker"}, ) addrs += head_ip for addr in addrs: - lm.update(addr, {"CPU": 2}, {"CPU": 0}) - lm.update(addr, {"CPU": 2}, {"CPU": 2}) + lm.update(addr, {"CPU": 2}, {"CPU": 0}, {}) + lm.update(addr, {"CPU": 2}, {"CPU": 2}, {}) assert autoscaler.bringup autoscaler.update() @@ -428,7 +439,7 @@ def testAggressiveAutoscaling(self): self.waitForNodes(1) # All of the nodes are down. Simulate some load on the head node - lm.update(head_ip, {"CPU": 2}, {"CPU": 0}) + lm.update(head_ip, {"CPU": 2}, {"CPU": 0}, {}) autoscaler.update() self.waitForNodes(6) # expected due to batch sizes and concurrency @@ -702,17 +713,17 @@ def testScaleUpBasedOnLoad(self): # Scales up as nodes are reported as used local_ip = services.get_node_ip_address() - lm.update(local_ip, {"CPU": 2}, {"CPU": 0}) # head - lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 0}) # worker 1 + lm.update(local_ip, {"CPU": 2}, {"CPU": 0}, {}) # head + lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 0}, {}) # worker 1 autoscaler.update() self.waitForNodes(3) - lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 0}) + lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 0}, {}) autoscaler.update() self.waitForNodes(5) # Holds steady when load is removed - lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 2}) - lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 2}) + lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 2}, {}) + lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 2}, {}) autoscaler.update() assert autoscaler.num_launches_pending.value == 0 assert len(self.provider.non_terminated_nodes({})) == 5 @@ -746,20 +757,20 @@ def testDontScaleBelowTarget(self): # Scales up as nodes are reported as used local_ip = services.get_node_ip_address() - lm.update(local_ip, {"CPU": 2}, {"CPU": 0}) # head + lm.update(local_ip, {"CPU": 2}, {"CPU": 0}, {}) # head # 1.0 nodes used => target nodes = 2 => target workers = 1 autoscaler.update() self.waitForNodes(1) # Make new node idle, and never used. # Should hold steady as target is still 2. - lm.update("172.0.0.0", {"CPU": 0}, {"CPU": 0}) + lm.update("172.0.0.0", {"CPU": 0}, {"CPU": 0}, {}) lm.last_used_time_by_ip["172.0.0.0"] = 0 autoscaler.update() assert len(self.provider.non_terminated_nodes({})) == 1 # Reduce load on head => target nodes = 1 => target workers = 0 - lm.update(local_ip, {"CPU": 2}, {"CPU": 1}) + lm.update(local_ip, {"CPU": 2}, {"CPU": 1}, {}) autoscaler.update() assert len(self.provider.non_terminated_nodes({})) == 0 diff --git a/src/ray/raylet/scheduling_queue.cc b/src/ray/raylet/scheduling_queue.cc index 5a0f91dd7c0e2..c20c07405185b 100644 --- a/src/ray/raylet/scheduling_queue.cc +++ b/src/ray/raylet/scheduling_queue.cc @@ -131,8 +131,11 @@ const Task &SchedulingQueue::GetTaskOfState(const TaskID &task_id, } ResourceSet SchedulingQueue::GetResourceLoad() const { - // TODO(atumanov): consider other types of tasks as part of load. - return ready_queue_->GetCurrentResourceLoad(); + auto load = ready_queue_->GetCurrentResourceLoad(); + // Also take into account infeasible tasks so they show up for autoscaling + load.AddResources( + task_queues_[static_cast(TaskState::INFEASIBLE)]->GetCurrentResourceLoad()); + return load; } const std::unordered_set &SchedulingQueue::GetBlockedTaskIds() const { From 354e10b215b77056b50f86dd7e1ff5c795b28fc0 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 10 Sep 2019 18:03:43 -0700 Subject: [PATCH 2/5] fix errors --- doc/source/autoscaling.rst | 8 ++++---- python/ray/monitor.py | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/doc/source/autoscaling.rst b/doc/source/autoscaling.rst index b04176e2a29b3..d68fa1659da9c 100644 --- a/doc/source/autoscaling.rst +++ b/doc/source/autoscaling.rst @@ -14,7 +14,7 @@ as described in `the boto docs `__ cluster config file will create a small cluster with a m5.large head node (on-demand) configured to autoscale up to two m5.large `spot workers `__. Try it out by running these commands from your personal computer. Once the cluster is started, you can then -SSH into the head node, ``source activate tensorflow_p36``, and then run Ray programs with ``ray.init(address="localhost:6379")``. +SSH into the head node, ``source activate tensorflow_p36``, and then run Ray programs with ``ray.init(address="auto")``. .. code-block:: bash @@ -37,7 +37,7 @@ First, install the Google API client (``pip install google-api-python-client``), Then you're ready to go. The provided `ray/python/ray/autoscaler/gcp/example-full.yaml `__ cluster config file will create a small cluster with a n1-standard-2 head node (on-demand) configured to autoscale up to two n1-standard-2 `preemptible workers `__. Note that you'll need to fill in your project id in those templates. Try it out by running these commands from your personal computer. Once the cluster is started, you can then -SSH into the head node and then run Ray programs with ``ray.init(address="localhost:6379")``. +SSH into the head node and then run Ray programs with ``ray.init(address="auto")``. .. code-block:: bash @@ -59,7 +59,7 @@ This is used when you have a list of machine IP addresses to connect in a Ray cl Be sure to specify the proper ``head_ip``, list of ``worker_ips``, and the ``ssh_user`` field. Try it out by running these commands from your personal computer. Once the cluster is started, you can then -SSH into the head node and then run Ray programs with ``ray.init(address="localhost:6379")``. +SSH into the head node and then run Ray programs with ``ray.init(address="auto")``. .. code-block:: bash @@ -77,7 +77,7 @@ SSH into the head node and then run Ray programs with ``ray.init(address="localh Running commands on new and existing clusters --------------------------------------------- -You can use ``ray exec`` to conveniently run commands on clusters. Note that scripts you run should connect to Ray via ``ray.init(address="localhost:6379")``. +You can use ``ray exec`` to conveniently run commands on clusters. Note that scripts you run should connect to Ray via ``ray.init(address="auto")``. .. code-block:: bash diff --git a/python/ray/monitor.py b/python/ray/monitor.py index 1cd7da8f471c1..12c30614f7eca 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -123,7 +123,6 @@ def xray_heartbeat_batch_handler(self, unused_channel, data): # Update the load metrics for this raylet. client_id = ray.utils.binary_to_hex(heartbeat_message.client_id) ip = self.raylet_id_to_ip_map.get(client_id) - logger.error("RESOURCE LOAD {}".format(resource_load)) if ip: self.load_metrics.update(ip, total_resources, available_resources, resource_load) @@ -361,6 +360,7 @@ def run(self): try: self._run() except Exception: + logger.exception("Error in monitor loop") if self.autoscaler: self.autoscaler.kill_workers() raise From 512d70ce7c71ddb677e69c916b3e1408f7eb2e6d Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 10 Sep 2019 18:05:38 -0700 Subject: [PATCH 3/5] remove inf loop --- python/ray/autoscaler/autoscaler.py | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index 40ffbddd913af..1d3d76565ac55 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -729,19 +729,11 @@ def request_resources(self, resources): def kill_workers(self): logger.error("StandardAutoscaler: kill_workers triggered") - - while True: - try: - nodes = self.workers() - if nodes: - self.provider.terminate_nodes(nodes) - logger.error( - "StandardAutoscaler: terminated {} node(s)".format( - len(nodes))) - except Exception: - traceback.print_exc() - - time.sleep(10) + nodes = self.workers() + if nodes: + self.provider.terminate_nodes(nodes) + logger.error("StandardAutoscaler: terminated {} node(s)".format( + len(nodes))) def typename(v): From 823beb9e2f03843876283575a856aa63192f4c89 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Wed, 11 Sep 2019 11:27:50 -0700 Subject: [PATCH 4/5] period --- src/ray/raylet/scheduling_queue.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/raylet/scheduling_queue.cc b/src/ray/raylet/scheduling_queue.cc index c20c07405185b..8e20aacfa69c9 100644 --- a/src/ray/raylet/scheduling_queue.cc +++ b/src/ray/raylet/scheduling_queue.cc @@ -132,7 +132,7 @@ const Task &SchedulingQueue::GetTaskOfState(const TaskID &task_id, ResourceSet SchedulingQueue::GetResourceLoad() const { auto load = ready_queue_->GetCurrentResourceLoad(); - // Also take into account infeasible tasks so they show up for autoscaling + // Also take into account infeasible tasks so they show up for autoscaling. load.AddResources( task_queues_[static_cast(TaskState::INFEASIBLE)]->GetCurrentResourceLoad()); return load; From 962922972c298ad1df91aa2d2b412309b11ce8d0 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Wed, 11 Sep 2019 11:29:28 -0700 Subject: [PATCH 5/5] fix lint --- python/ray/autoscaler/autoscaler.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/ray/autoscaler/autoscaler.py b/python/ray/autoscaler/autoscaler.py index 1d3d76565ac55..6e1e8c00c2c33 100644 --- a/python/ray/autoscaler/autoscaler.py +++ b/python/ray/autoscaler/autoscaler.py @@ -10,7 +10,6 @@ import os import subprocess import threading -import traceback import time from collections import defaultdict