Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core][Enable gcs scheduler 7/n] Prefer actor owner's node #30789

Merged
merged 10 commits into from
Jan 23, 2023
91 changes: 91 additions & 0 deletions release/nightly_tests/many_nodes_tests/multi_master_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import argparse
import os
from time import sleep, perf_counter
import json
import ray


def test_max_actors_launch(cpus_per_actor, total_actors, num_masters):
# By default, there are 50 groups, each group has 1 master and 99 slaves.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have you run this test? (do you need help from us running this? )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see this slack post (https://ray-distributed.slack.com/archives/C0334JQHG86/p1669131998615139) for the results. Generally, gcs scheduler shows ~2.5X better latency compared to raylet scheduler.

num_slaves_per_master = total_actors / num_masters - 1

@ray.remote(num_cpus=cpus_per_actor)
class Actor:
def foo(self):
pass

def create(self):
return [
Actor.options(max_restarts=-1).remote()
for _ in range(num_slaves_per_master)
]

print("Start launch actors")
# The 50 masters are spreaded.
actors = [
Actor.options(max_restarts=-1, scheduling_strategy="SPREAD").remote()
for _ in range(num_masters)
]
slaves_per_master = []
for master in actors:
slaves_per_master.append(master.create.remote())
for slaves in slaves_per_master:
actors.extend(ray.get(slaves))
return actors


def test_actor_ready(actors):
remaining = [actor.foo.remote() for actor in actors]
ray.get(remaining)


def parse_script_args():
parser = argparse.ArgumentParser()
parser.add_argument("--cpus-per-actor", type=float, default=0.2)
parser.add_argument("--total-actors", type=int, default=5000)
parser.add_argument("--num-masters", type=int, default=50)
parser.add_argument("--no-report", default=False, action="store_true")
parser.add_argument("--fail", default=False, action="store_true")
return parser.parse_known_args()


def main():
args, unknown = parse_script_args()

ray.init(address="auto")

actor_launch_start = perf_counter()
actors = test_max_actors_launch(
args.cpus_per_actor, args.total_actors, args.num_masters
)
actor_launch_end = perf_counter()
actor_launch_time = actor_launch_end - actor_launch_start
if args.fail:
sleep(10)
return
actor_ready_start = perf_counter()
test_actor_ready(actors)
actor_ready_end = perf_counter()
actor_ready_time = actor_ready_end - actor_ready_start

print(f"Actor launch time: {actor_launch_time} ({args.total_actors} actors)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you tell me a bit more about the success / failure criteria of this test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is similar to the existing one: https://github.com/ray-project/ray/blob/master/release/nightly_tests/many_nodes_tests/actor_test.py. They both focus on the scheduling latency of a large number of actors.

The only difference from the actor_test.py is that this test simulates a multi-master(driver) scenario. This means (distributed) master actors are creating slave actors concurrently. Please see this discussion (https://ray-distributed.slack.com/archives/C0334JQHG86/p1667468940596309) for details.

print(f"Actor ready time: {actor_ready_time} ({args.total_actors} actors)")
print(
f"Total time: {actor_launch_time + actor_ready_time}"
f" ({args.total_actors} actors)"
)

if "TEST_OUTPUT_JSON" in os.environ and not args.no_report:
out_file = open(os.environ["TEST_OUTPUT_JSON"], "w")
results = {
"actor_launch_time": actor_launch_time,
"actor_ready_time": actor_ready_time,
"total_time": actor_launch_time + actor_ready_time,
"num_actors": args.total_actors,
"success": "1",
}
json.dump(results, out_file)


if __name__ == "__main__":
main()
21 changes: 21 additions & 0 deletions release/release_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3890,6 +3890,27 @@
num_nodes: 251


#- name: many_nodes_multi_master_test
# group: core-daily-test
# working_dir: nightly_tests
# legacy:
# test_name: many_nodes_multi_master_test
# test_suite: nightly_tests
#
# frequency: nightly-3x
# team: core
# cluster:
# cluster_env: many_nodes_tests/app_config.yaml
# cluster_compute: many_nodes_tests/compute_config.yaml
#
# run:
# timeout: 7200
# script: python many_nodes_tests/multi_master_test.py
# wait_for_nodes:
# num_nodes: 251
#
# type: sdk_command
# file_manager: sdk

- name: pg_autoscaling_regression_test
group: core-daily-test
Expand Down
7 changes: 7 additions & 0 deletions src/ray/common/task/task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,19 @@ RayTask::RayTask(TaskSpecification task_spec) : task_spec_(std::move(task_spec))
ComputeDependencies();
}

RayTask::RayTask(TaskSpecification task_spec, std::string preferred_node_id)
: task_spec_(std::move(task_spec)), preferred_node_id_(std::move(preferred_node_id)) {
ComputeDependencies();
}

const TaskSpecification &RayTask::GetTaskSpecification() const { return task_spec_; }

const std::vector<rpc::ObjectReference> &RayTask::GetDependencies() const {
return dependencies_;
}

const std::string &RayTask::GetPreferredNodeID() const { return preferred_node_id_; }

void RayTask::ComputeDependencies() { dependencies_ = task_spec_.GetDependencies(); }

std::string RayTask::DebugString() const {
Expand Down
10 changes: 10 additions & 0 deletions src/ray/common/task/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class RayTask {
/// Construct a `RayTask` object from a `TaskSpecification`.
RayTask(TaskSpecification task_spec);

RayTask(TaskSpecification task_spec, std::string preferred_node_id);

/// Get the immutable specification for the task.
///
/// \return The immutable specification for the task.
Expand All @@ -54,6 +56,12 @@ class RayTask {
/// \return The object dependencies.
const std::vector<rpc::ObjectReference> &GetDependencies() const;

/// Get the task's preferred node id for scheduling. If the returned value
/// is empty, then it means the task has no preferred node.
///
/// \return The preferred node id.
const std::string &GetPreferredNodeID() const;

std::string DebugString() const;

private:
Expand All @@ -66,6 +74,8 @@ class RayTask {
/// A cached copy of the task's object dependencies, including arguments from
/// the TaskSpecification.
std::vector<rpc::ObjectReference> dependencies_;

std::string preferred_node_id_;
};

} // namespace ray
8 changes: 6 additions & 2 deletions src/ray/gcs/gcs_server/gcs_actor_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ GcsActorScheduler::GcsActorScheduler(
void GcsActorScheduler::Schedule(std::shared_ptr<GcsActor> actor) {
RAY_CHECK(actor->GetNodeID().IsNil() && actor->GetWorkerID().IsNil());

if (RayConfig::instance().gcs_actor_scheduling_enabled()) {
if (RayConfig::instance().gcs_actor_scheduling_enabled() &&
!actor->GetCreationTaskSpecification().GetRequiredResources().IsEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition is a little weird here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a quick way to make the empty-resource actors go through the legacy logic (randomly select a forward raylet).

ScheduleByGcs(actor);
} else {
ScheduleByRaylet(actor);
Expand Down Expand Up @@ -93,7 +94,10 @@ void GcsActorScheduler::ScheduleByGcs(std::shared_ptr<GcsActor> actor) {
};

// Queue and schedule the actor locally (gcs).
cluster_task_manager_->QueueAndScheduleTask(actor->GetCreationTaskSpecification(),
const auto &owner_node = gcs_node_manager_.GetAliveNode(actor->GetOwnerNodeID());
RayTask task(actor->GetCreationTaskSpecification(),
owner_node.has_value() ? actor->GetOwnerNodeID().Binary() : std::string());
cluster_task_manager_->QueueAndScheduleTask(task,
/*grant_or_reject*/ false,
/*is_selected_based_on_locality*/ false,
/*reply*/ reply.get(),
Expand Down
11 changes: 8 additions & 3 deletions src/ray/raylet/local_task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ void LocalTaskManager::SpillWaitingTasks() {
if (!task.GetTaskSpecification().IsSpreadSchedulingStrategy()) {
scheduling_node_id = cluster_resource_scheduler_->GetBestSchedulableNode(
task.GetTaskSpecification(),
/*prioritize_local_node*/ true,
/*preferred_node_id*/ self_node_id_.Binary(),
/*exclude_local_node*/ task_dependencies_blocked,
/*requires_object_store_memory*/ true,
&is_infeasible);
Expand Down Expand Up @@ -379,7 +379,7 @@ bool LocalTaskManager::TrySpillback(const std::shared_ptr<internal::Work> &work,
// We should prefer to stay local if possible
// to avoid unnecessary spillback
// since this node is already selected by the cluster scheduler.
/*prioritize_local_node*/ true,
/*preferred_node_id*/ self_node_id_.Binary(),
/*exclude_local_node*/ false,
/*requires_object_store_memory*/ false,
&is_infeasible);
Expand Down Expand Up @@ -1022,7 +1022,12 @@ ResourceRequest LocalTaskManager::CalcNormalTaskResources() const {
}

if (auto allocated_instances = worker->GetAllocatedInstances()) {
total_normal_task_resources += allocated_instances->ToResourceRequest();
auto resource_request = allocated_instances->ToResourceRequest();
// Blocked normal task workers have temporarily released its allocated CPU.
if (worker->IsBlocked()) {
resource_request.Set(ResourceID::CPU(), 0);
}
total_normal_task_resources += resource_request;
}
}
return total_normal_task_resources;
Expand Down
1 change: 1 addition & 0 deletions src/ray/raylet/scheduling/cluster_resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ class ClusterResourceManager {
FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingAddOrUpdateNodeTest);
FRIEND_TEST(ClusterResourceSchedulerTest, NodeAffinitySchedulingStrategyTest);
FRIEND_TEST(ClusterResourceSchedulerTest, SpreadSchedulingStrategyTest);
FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingWithPreferredNodeTest);
FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingResourceRequestTest);
FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingUpdateTotalResourcesTest);
FRIEND_TEST(ClusterResourceSchedulerTest,
Expand Down
13 changes: 9 additions & 4 deletions src/ray/raylet/scheduling/cluster_resource_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode(
const rpc::SchedulingStrategy &scheduling_strategy,
bool actor_creation,
bool force_spillback,
const std::string &preferred_node_id,
int64_t *total_violations,
bool *is_infeasible) {
// The zero cpu actor is a special case that must be handled the same way by all
Expand Down Expand Up @@ -168,7 +169,8 @@ scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode(
scheduling_policy_->Schedule(resource_request,
SchedulingOptions::Hybrid(
/*avoid_local_node*/ force_spillback,
/*require_node_available*/ force_spillback));
/*require_node_available*/ force_spillback,
preferred_node_id));
}

*is_infeasible = best_node_id.IsNil();
Expand All @@ -192,6 +194,7 @@ scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode(
bool requires_object_store_memory,
bool actor_creation,
bool force_spillback,
const std::string &preferred_node_id,
int64_t *total_violations,
bool *is_infeasible) {
ResourceRequest resource_request =
Expand All @@ -200,6 +203,7 @@ scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode(
scheduling_strategy,
actor_creation,
force_spillback,
preferred_node_id,
total_violations,
is_infeasible);
}
Expand Down Expand Up @@ -244,13 +248,13 @@ bool ClusterResourceScheduler::IsSchedulableOnNode(

scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode(
const TaskSpecification &task_spec,
bool prioritize_local_node,
const std::string &preferred_node_id,
bool exclude_local_node,
bool requires_object_store_memory,
bool *is_infeasible) {
// If the local node is available, we should directly return it instead of
// going through the full hybrid policy since we don't want spillback.
if (prioritize_local_node && !exclude_local_node &&
if (preferred_node_id == local_node_id_.Binary() && !exclude_local_node &&
IsSchedulableOnNode(local_node_id_,
task_spec.GetRequiredResources().GetResourceMap(),
requires_object_store_memory)) {
Expand All @@ -266,6 +270,7 @@ scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode(
requires_object_store_memory,
task_spec.IsActorCreationTask(),
exclude_local_node,
preferred_node_id,
&_unused,
is_infeasible);

Expand All @@ -276,7 +281,7 @@ scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode(
requires_object_store_memory)) {
// Prefer waiting on the local node since the local node is chosen for a reason (e.g.
// spread).
if (prioritize_local_node) {
if (preferred_node_id == local_node_id_.Binary()) {
*is_infeasible = false;
return local_node_id_;
}
Expand Down
13 changes: 9 additions & 4 deletions src/ray/raylet/scheduling/cluster_resource_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ class ClusterResourceScheduler {
/// In hybrid mode, see `scheduling_policy.h` for a description of the policy.
///
/// \param task_spec: Task/Actor to be scheduled.
/// \param prioritize_local_node: true if we want to try out local node first.
/// \param preferred_node_id: the node where the task is preferred to be placed. An
/// empty `preferred_node_id` (string) means no preferred node.
/// \param exclude_local_node: true if we want to avoid local node. This will cancel
/// prioritize_local_node if set to true.
/// \param requires_object_store_memory: take object store memory usage as part of
Expand All @@ -90,7 +91,7 @@ class ClusterResourceScheduler {
/// \return emptry string, if no node can schedule the current request; otherwise,
/// return the string name of a node that can schedule the resource request.
scheduling::NodeID GetBestSchedulableNode(const TaskSpecification &task_spec,
bool prioritize_local_node,
const std::string &preferred_node_id,
bool exclude_local_node,
bool requires_object_store_memory,
bool *is_infeasible);
Expand Down Expand Up @@ -159,10 +160,11 @@ class ClusterResourceScheduler {
/// \param scheduling_strategy: Strategy about how to schedule this task.
/// \param actor_creation: True if this is an actor creation task.
/// \param force_spillback: True if we want to avoid local node.
/// \param violations: The number of soft constraint violations associated
/// \param preferred_node_id: The node where the task is preferred to be placed.
/// \param violations[out]: The number of soft constraint violations associated
/// with the node returned by this function (assuming
/// a node that can schedule resource_request is found).
/// \param is_infeasible[in]: It is set true if the task is not schedulable because it
/// \param is_infeasible[out]: It is set true if the task is not schedulable because it
/// is infeasible.
///
/// \return -1, if no node can schedule the current request; otherwise,
Expand All @@ -172,6 +174,7 @@ class ClusterResourceScheduler {
const rpc::SchedulingStrategy &scheduling_strategy,
bool actor_creation,
bool force_spillback,
const std::string &preferred_node_id,
int64_t *violations,
bool *is_infeasible);

Expand All @@ -187,6 +190,7 @@ class ClusterResourceScheduler {
bool requires_object_store_memory,
bool actor_creation,
bool force_spillback,
const std::string &preferred_node_id,
int64_t *violations,
bool *is_infeasible);

Expand Down Expand Up @@ -216,6 +220,7 @@ class ClusterResourceScheduler {
FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingAddOrUpdateNodeTest);
FRIEND_TEST(ClusterResourceSchedulerTest, NodeAffinitySchedulingStrategyTest);
FRIEND_TEST(ClusterResourceSchedulerTest, SpreadSchedulingStrategyTest);
FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingWithPreferredNodeTest);
FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingResourceRequestTest);
FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingUpdateTotalResourcesTest);
FRIEND_TEST(ClusterResourceSchedulerTest,
Expand Down
Loading