Skip to content

Commit

Permalink
fix it
Browse files Browse the repository at this point in the history
  • Loading branch information
ericl committed Sep 11, 2019
1 parent 336aef1 commit f6ac687
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 28 deletions.
2 changes: 1 addition & 1 deletion doc/source/autoscaling.rst
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ with GPU worker nodes instead.

.. code-block:: yaml
min_workers: 1 # must have at least 1 GPU worker (issue #2106)
min_workers: 0 # NOTE: older Ray versions may need 1+ GPU workers (#2106)
max_workers: 10
head_node:
InstanceType: m4.large
Expand Down
9 changes: 8 additions & 1 deletion python/ray/autoscaler/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,11 @@ def __init__(self):
self.last_heartbeat_time_by_ip = {}
self.static_resources_by_ip = {}
self.dynamic_resources_by_ip = {}
self.resource_load_by_ip = {}
self.local_ip = services.get_node_ip_address()

def update(self, ip, static_resources, dynamic_resources):
def update(self, ip, static_resources, dynamic_resources, resource_load):
self.resource_load_by_ip[ip] = resource_load
self.static_resources_by_ip[ip] = static_resources

# We are not guaranteed to have a corresponding dynamic resource for
Expand Down Expand Up @@ -204,6 +206,7 @@ def prune(mapping):
prune(self.last_used_time_by_ip)
prune(self.static_resources_by_ip)
prune(self.dynamic_resources_by_ip)
prune(self.resource_load_by_ip)
prune(self.last_heartbeat_time_by_ip)

def approx_workers_used(self):
Expand All @@ -218,7 +221,11 @@ def get_resource_usage(self):
resources_total = {}
for ip, max_resources in self.static_resources_by_ip.items():
avail_resources = self.dynamic_resources_by_ip[ip]
resource_load = self.resource_load_by_ip[ip]
max_frac = 0.0
for resource_id, amount in resource_load.items():
if amount > 0:
max_frac = 1.0 # the resource is saturated
for resource_id, amount in max_resources.items():
used = amount - avail_resources[resource_id]
if resource_id not in resources_used:
Expand Down
3 changes: 2 additions & 1 deletion python/ray/log_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ def update_log_filenames(self):
log_file_paths = glob.glob("{}/worker*[.out|.err]".format(
self.logs_dir))
# segfaults and other serious errors are logged here
raylet_err_paths = glob.glob("{}/raylet*.err".format(self.logs_dir))
raylet_err_paths = (glob.glob("{}/raylet*.err".format(self.logs_dir)) +
glob.glob("{}/monitor*.err".format(self.logs_dir)))
for file_path in log_file_paths + raylet_err_paths:
if os.path.isfile(
file_path) and file_path not in self.log_filenames:
Expand Down
6 changes: 5 additions & 1 deletion python/ray/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ def xray_heartbeat_batch_handler(self, unused_channel, data):
message = ray.gcs_utils.HeartbeatBatchTableData.FromString(
heartbeat_data)
for heartbeat_message in message.batch:
resource_load = dict(
zip(heartbeat_message.resource_load_label,
heartbeat_message.resource_load_capacity))
total_resources = dict(
zip(heartbeat_message.resources_total_label,
heartbeat_message.resources_total_capacity))
Expand All @@ -120,9 +123,10 @@ def xray_heartbeat_batch_handler(self, unused_channel, data):
# Update the load metrics for this raylet.
client_id = ray.utils.binary_to_hex(heartbeat_message.client_id)
ip = self.raylet_id_to_ip_map.get(client_id)
logger.error("RESOURCE LOAD {}".format(resource_load))
if ip:
self.load_metrics.update(ip, total_resources,
available_resources)
available_resources, resource_load)
else:
logger.warning(
"Monitor: "
Expand Down
55 changes: 33 additions & 22 deletions python/ray/tests/test_autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,45 +142,56 @@ def terminate_node(self, node_id):
class LoadMetricsTest(unittest.TestCase):
def testUpdate(self):
lm = LoadMetrics()
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1})
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {})
assert lm.approx_workers_used() == 0.5
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0})
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {})
assert lm.approx_workers_used() == 1.0
lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 0})
lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 0}, {})
assert lm.approx_workers_used() == 2.0

def testLoadMessages(self):
lm = LoadMetrics()
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {})
assert lm.approx_workers_used() == 0.5
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {"CPU": 1})
assert lm.approx_workers_used() == 1.0
lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 1}, {})
assert lm.approx_workers_used() == 1.5
lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 1}, {"GPU": 1})
assert lm.approx_workers_used() == 2.0

def testPruneByNodeIp(self):
lm = LoadMetrics()
lm.update("1.1.1.1", {"CPU": 1}, {"CPU": 0})
lm.update("2.2.2.2", {"CPU": 1}, {"CPU": 0})
lm.update("1.1.1.1", {"CPU": 1}, {"CPU": 0}, {})
lm.update("2.2.2.2", {"CPU": 1}, {"CPU": 0}, {})
lm.prune_active_ips({"1.1.1.1", "4.4.4.4"})
assert lm.approx_workers_used() == 1.0

def testBottleneckResource(self):
lm = LoadMetrics()
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0})
lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2})
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {})
lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2}, {})
assert lm.approx_workers_used() == 1.88

def testHeartbeat(self):
lm = LoadMetrics()
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1})
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1}, {})
lm.mark_active("2.2.2.2")
assert "1.1.1.1" in lm.last_heartbeat_time_by_ip
assert "2.2.2.2" in lm.last_heartbeat_time_by_ip
assert "3.3.3.3" not in lm.last_heartbeat_time_by_ip

def testDebugString(self):
lm = LoadMetrics()
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0})
lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2})
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0}, {})
lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2}, {})
lm.update("3.3.3.3", {
"memory": 20,
"object_store_memory": 40
}, {
"memory": 0,
"object_store_memory": 20
})
}, {})
debug = lm.info_string()
assert ("ResourceUsage=2.0/4.0 CPU, 14.0/16.0 GPU, "
"1.05 GiB/1.05 GiB memory, "
Expand Down Expand Up @@ -418,8 +429,8 @@ def testAggressiveAutoscaling(self):
tag_filters={TAG_RAY_NODE_TYPE: "worker"}, )
addrs += head_ip
for addr in addrs:
lm.update(addr, {"CPU": 2}, {"CPU": 0})
lm.update(addr, {"CPU": 2}, {"CPU": 2})
lm.update(addr, {"CPU": 2}, {"CPU": 0}, {})
lm.update(addr, {"CPU": 2}, {"CPU": 2}, {})
assert autoscaler.bringup
autoscaler.update()

Expand All @@ -428,7 +439,7 @@ def testAggressiveAutoscaling(self):
self.waitForNodes(1)

# All of the nodes are down. Simulate some load on the head node
lm.update(head_ip, {"CPU": 2}, {"CPU": 0})
lm.update(head_ip, {"CPU": 2}, {"CPU": 0}, {})

autoscaler.update()
self.waitForNodes(6) # expected due to batch sizes and concurrency
Expand Down Expand Up @@ -702,17 +713,17 @@ def testScaleUpBasedOnLoad(self):

# Scales up as nodes are reported as used
local_ip = services.get_node_ip_address()
lm.update(local_ip, {"CPU": 2}, {"CPU": 0}) # head
lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 0}) # worker 1
lm.update(local_ip, {"CPU": 2}, {"CPU": 0}, {}) # head
lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 0}, {}) # worker 1
autoscaler.update()
self.waitForNodes(3)
lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 0})
lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 0}, {})
autoscaler.update()
self.waitForNodes(5)

# Holds steady when load is removed
lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 2})
lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 2})
lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 2}, {})
lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 2}, {})
autoscaler.update()
assert autoscaler.num_launches_pending.value == 0
assert len(self.provider.non_terminated_nodes({})) == 5
Expand Down Expand Up @@ -746,20 +757,20 @@ def testDontScaleBelowTarget(self):

# Scales up as nodes are reported as used
local_ip = services.get_node_ip_address()
lm.update(local_ip, {"CPU": 2}, {"CPU": 0}) # head
lm.update(local_ip, {"CPU": 2}, {"CPU": 0}, {}) # head
# 1.0 nodes used => target nodes = 2 => target workers = 1
autoscaler.update()
self.waitForNodes(1)

# Make new node idle, and never used.
# Should hold steady as target is still 2.
lm.update("172.0.0.0", {"CPU": 0}, {"CPU": 0})
lm.update("172.0.0.0", {"CPU": 0}, {"CPU": 0}, {})
lm.last_used_time_by_ip["172.0.0.0"] = 0
autoscaler.update()
assert len(self.provider.non_terminated_nodes({})) == 1

# Reduce load on head => target nodes = 1 => target workers = 0
lm.update(local_ip, {"CPU": 2}, {"CPU": 1})
lm.update(local_ip, {"CPU": 2}, {"CPU": 1}, {})
autoscaler.update()
assert len(self.provider.non_terminated_nodes({})) == 0

Expand Down
7 changes: 5 additions & 2 deletions src/ray/raylet/scheduling_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,11 @@ const Task &SchedulingQueue::GetTaskOfState(const TaskID &task_id,
}

ResourceSet SchedulingQueue::GetResourceLoad() const {
// TODO(atumanov): consider other types of tasks as part of load.
return ready_queue_->GetCurrentResourceLoad();
auto load = ready_queue_->GetCurrentResourceLoad();
// Also take into account infeasible tasks so they show up for autoscaling
load.AddResources(
task_queues_[static_cast<int>(TaskState::INFEASIBLE)]->GetCurrentResourceLoad());
return load;
}

const std::unordered_set<TaskID> &SchedulingQueue::GetBlockedTaskIds() const {
Expand Down

0 comments on commit f6ac687

Please sign in to comment.