-
Notifications
You must be signed in to change notification settings - Fork 6.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Core][Enable gcs scheduler 7/n] Prefer actor owner's node #30789
Changes from 7 commits
30b0199
5fe48e6
1bf9eb1
abebc82
cbaf785
c2308af
354cc60
8dc68ba
8b42002
a1e1070
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
import argparse | ||
import os | ||
from time import sleep, perf_counter | ||
import json | ||
import ray | ||
|
||
|
||
def test_max_actors_launch(cpus_per_actor, total_actors, num_masters): | ||
# By default, there are 50 groups, each group has 1 master and 99 slaves. | ||
num_slaves_per_master = total_actors / num_masters - 1 | ||
|
||
@ray.remote(num_cpus=cpus_per_actor) | ||
class Actor: | ||
def foo(self): | ||
pass | ||
|
||
def create(self): | ||
return [ | ||
Actor.options(max_restarts=-1).remote() | ||
for _ in range(num_slaves_per_master) | ||
] | ||
|
||
print("Start launch actors") | ||
# The 50 masters are spreaded. | ||
actors = [ | ||
Actor.options(max_restarts=-1, scheduling_strategy="SPREAD").remote() | ||
for _ in range(num_masters) | ||
] | ||
slaves_per_master = [] | ||
for master in actors: | ||
slaves_per_master.append(master.create.remote()) | ||
for slaves in slaves_per_master: | ||
actors.extend(ray.get(slaves)) | ||
return actors | ||
|
||
|
||
def test_actor_ready(actors): | ||
remaining = [actor.foo.remote() for actor in actors] | ||
ray.get(remaining) | ||
|
||
|
||
def parse_script_args(): | ||
parser = argparse.ArgumentParser() | ||
parser.add_argument("--cpus-per-actor", type=float, default=0.2) | ||
parser.add_argument("--total-actors", type=int, default=5000) | ||
parser.add_argument("--num-masters", type=int, default=50) | ||
parser.add_argument("--no-report", default=False, action="store_true") | ||
parser.add_argument("--fail", default=False, action="store_true") | ||
return parser.parse_known_args() | ||
|
||
|
||
def main(): | ||
args, unknown = parse_script_args() | ||
|
||
ray.init(address="auto") | ||
|
||
actor_launch_start = perf_counter() | ||
actors = test_max_actors_launch( | ||
args.cpus_per_actor, args.total_actors, args.num_masters | ||
) | ||
actor_launch_end = perf_counter() | ||
actor_launch_time = actor_launch_end - actor_launch_start | ||
if args.fail: | ||
sleep(10) | ||
return | ||
actor_ready_start = perf_counter() | ||
test_actor_ready(actors) | ||
actor_ready_end = perf_counter() | ||
actor_ready_time = actor_ready_end - actor_ready_start | ||
|
||
print(f"Actor launch time: {actor_launch_time} ({args.total_actors} actors)") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you tell me a bit more about the success / failure criteria of this test? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test is similar to the existing one: https://github.com/ray-project/ray/blob/master/release/nightly_tests/many_nodes_tests/actor_test.py. They both focus on the scheduling latency of a large number of actors. The only difference from the |
||
print(f"Actor ready time: {actor_ready_time} ({args.total_actors} actors)") | ||
print( | ||
f"Total time: {actor_launch_time + actor_ready_time}" | ||
f" ({args.total_actors} actors)" | ||
) | ||
|
||
if "TEST_OUTPUT_JSON" in os.environ and not args.no_report: | ||
out_file = open(os.environ["TEST_OUTPUT_JSON"], "w") | ||
results = { | ||
"actor_launch_time": actor_launch_time, | ||
"actor_ready_time": actor_ready_time, | ||
"total_time": actor_launch_time + actor_ready_time, | ||
"num_actors": args.total_actors, | ||
"success": "1", | ||
} | ||
json.dump(results, out_file) | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4087,6 +4087,27 @@ | |
type: sdk_command | ||
file_manager: sdk | ||
|
||
- name: many_nodes_multi_master_test | ||
group: core-daily-test | ||
working_dir: nightly_tests | ||
legacy: | ||
test_name: many_nodes_multi_master_test | ||
test_suite: nightly_tests | ||
|
||
frequency: nightly-3x | ||
team: core | ||
cluster: | ||
cluster_env: many_nodes_tests/app_config.yaml | ||
cluster_compute: many_nodes_tests/compute_config.yaml | ||
|
||
run: | ||
timeout: 7200 | ||
script: python many_nodes_tests/multi_master_test.py | ||
wait_for_nodes: | ||
num_nodes: 251 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we test this in smaller # of nodes? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This config is actually the same as the one of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is pretty expensive. Can you comment it out for now? We can re-enable it once we enable GCS actor scheduler by default. (Let's just use it for benchmarking purpose for now.) |
||
|
||
type: sdk_command | ||
file_manager: sdk | ||
|
||
- name: pg_autoscaling_regression_test | ||
group: core-daily-test | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,7 +49,8 @@ GcsActorScheduler::GcsActorScheduler( | |
void GcsActorScheduler::Schedule(std::shared_ptr<GcsActor> actor) { | ||
RAY_CHECK(actor->GetNodeID().IsNil() && actor->GetWorkerID().IsNil()); | ||
|
||
if (RayConfig::instance().gcs_actor_scheduling_enabled()) { | ||
if (RayConfig::instance().gcs_actor_scheduling_enabled() && | ||
!actor->GetCreationTaskSpecification().GetRequiredResources().IsEmpty()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This condition is a little weird here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is just a quick way to make the empty-resource actors go through the legacy logic (randomly select a forward raylet). |
||
ScheduleByGcs(actor); | ||
} else { | ||
ScheduleByRaylet(actor); | ||
|
@@ -93,7 +94,10 @@ void GcsActorScheduler::ScheduleByGcs(std::shared_ptr<GcsActor> actor) { | |
}; | ||
|
||
// Queue and schedule the actor locally (gcs). | ||
cluster_task_manager_->QueueAndScheduleTask(actor->GetCreationTaskSpecification(), | ||
auto owner_node = gcs_node_manager_.GetAliveNode(actor->GetOwnerNodeID()); | ||
Chong-Li marked this conversation as resolved.
Show resolved
Hide resolved
|
||
RayTask task(actor->GetCreationTaskSpecification(), | ||
owner_node.has_value() ? actor->GetOwnerNodeID().Binary() : std::string()); | ||
cluster_task_manager_->QueueAndScheduleTask(task, | ||
/*grant_or_reject*/ false, | ||
/*is_selected_based_on_locality*/ false, | ||
/*reply*/ reply.get(), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have you run this test? (do you need help from us running this? )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see this slack post (https://ray-distributed.slack.com/archives/C0334JQHG86/p1669131998615139) for the results. Generally, gcs scheduler shows ~2.5X better latency compared to raylet scheduler.