diff --git a/release/nightly_tests/many_nodes_tests/multi_master_test.py b/release/nightly_tests/many_nodes_tests/multi_master_test.py new file mode 100644 index 0000000000000..b3f706424eb5a --- /dev/null +++ b/release/nightly_tests/many_nodes_tests/multi_master_test.py @@ -0,0 +1,91 @@ +import argparse +import os +from time import sleep, perf_counter +import json +import ray + + +def test_max_actors_launch(cpus_per_actor, total_actors, num_masters): + # By default, there are 50 groups, each group has 1 master and 99 slaves. + num_slaves_per_master = total_actors / num_masters - 1 + + @ray.remote(num_cpus=cpus_per_actor) + class Actor: + def foo(self): + pass + + def create(self): + return [ + Actor.options(max_restarts=-1).remote() + for _ in range(num_slaves_per_master) + ] + + print("Start launch actors") + # The 50 masters are spreaded. + actors = [ + Actor.options(max_restarts=-1, scheduling_strategy="SPREAD").remote() + for _ in range(num_masters) + ] + slaves_per_master = [] + for master in actors: + slaves_per_master.append(master.create.remote()) + for slaves in slaves_per_master: + actors.extend(ray.get(slaves)) + return actors + + +def test_actor_ready(actors): + remaining = [actor.foo.remote() for actor in actors] + ray.get(remaining) + + +def parse_script_args(): + parser = argparse.ArgumentParser() + parser.add_argument("--cpus-per-actor", type=float, default=0.2) + parser.add_argument("--total-actors", type=int, default=5000) + parser.add_argument("--num-masters", type=int, default=50) + parser.add_argument("--no-report", default=False, action="store_true") + parser.add_argument("--fail", default=False, action="store_true") + return parser.parse_known_args() + + +def main(): + args, unknown = parse_script_args() + + ray.init(address="auto") + + actor_launch_start = perf_counter() + actors = test_max_actors_launch( + args.cpus_per_actor, args.total_actors, args.num_masters + ) + actor_launch_end = perf_counter() + actor_launch_time = actor_launch_end - actor_launch_start + if args.fail: + sleep(10) + return + actor_ready_start = perf_counter() + test_actor_ready(actors) + actor_ready_end = perf_counter() + actor_ready_time = actor_ready_end - actor_ready_start + + print(f"Actor launch time: {actor_launch_time} ({args.total_actors} actors)") + print(f"Actor ready time: {actor_ready_time} ({args.total_actors} actors)") + print( + f"Total time: {actor_launch_time + actor_ready_time}" + f" ({args.total_actors} actors)" + ) + + if "TEST_OUTPUT_JSON" in os.environ and not args.no_report: + out_file = open(os.environ["TEST_OUTPUT_JSON"], "w") + results = { + "actor_launch_time": actor_launch_time, + "actor_ready_time": actor_ready_time, + "total_time": actor_launch_time + actor_ready_time, + "num_actors": args.total_actors, + "success": "1", + } + json.dump(results, out_file) + + +if __name__ == "__main__": + main() diff --git a/release/release_tests.yaml b/release/release_tests.yaml index a13fc6171353a..91c2d5685cab6 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -3890,6 +3890,27 @@ num_nodes: 251 +#- name: many_nodes_multi_master_test +# group: core-daily-test +# working_dir: nightly_tests +# legacy: +# test_name: many_nodes_multi_master_test +# test_suite: nightly_tests +# +# frequency: nightly-3x +# team: core +# cluster: +# cluster_env: many_nodes_tests/app_config.yaml +# cluster_compute: many_nodes_tests/compute_config.yaml +# +# run: +# timeout: 7200 +# script: python many_nodes_tests/multi_master_test.py +# wait_for_nodes: +# num_nodes: 251 +# +# type: sdk_command +# file_manager: sdk - name: pg_autoscaling_regression_test group: core-daily-test diff --git a/src/ray/common/task/task.cc b/src/ray/common/task/task.cc index 734ad527a16d2..e2ac8571c4e5e 100644 --- a/src/ray/common/task/task.cc +++ b/src/ray/common/task/task.cc @@ -26,12 +26,19 @@ RayTask::RayTask(TaskSpecification task_spec) : task_spec_(std::move(task_spec)) ComputeDependencies(); } +RayTask::RayTask(TaskSpecification task_spec, std::string preferred_node_id) + : task_spec_(std::move(task_spec)), preferred_node_id_(std::move(preferred_node_id)) { + ComputeDependencies(); +} + const TaskSpecification &RayTask::GetTaskSpecification() const { return task_spec_; } const std::vector &RayTask::GetDependencies() const { return dependencies_; } +const std::string &RayTask::GetPreferredNodeID() const { return preferred_node_id_; } + void RayTask::ComputeDependencies() { dependencies_ = task_spec_.GetDependencies(); } std::string RayTask::DebugString() const { diff --git a/src/ray/common/task/task.h b/src/ray/common/task/task.h index ca5fb324b765a..5a4a9e323de53 100644 --- a/src/ray/common/task/task.h +++ b/src/ray/common/task/task.h @@ -43,6 +43,8 @@ class RayTask { /// Construct a `RayTask` object from a `TaskSpecification`. RayTask(TaskSpecification task_spec); + RayTask(TaskSpecification task_spec, std::string preferred_node_id); + /// Get the immutable specification for the task. /// /// \return The immutable specification for the task. @@ -54,6 +56,12 @@ class RayTask { /// \return The object dependencies. const std::vector &GetDependencies() const; + /// Get the task's preferred node id for scheduling. If the returned value + /// is empty, then it means the task has no preferred node. + /// + /// \return The preferred node id. + const std::string &GetPreferredNodeID() const; + std::string DebugString() const; private: @@ -66,6 +74,8 @@ class RayTask { /// A cached copy of the task's object dependencies, including arguments from /// the TaskSpecification. std::vector dependencies_; + + std::string preferred_node_id_; }; } // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc index 6d98d8b02aa29..0d08b337e8d54 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc @@ -49,7 +49,8 @@ GcsActorScheduler::GcsActorScheduler( void GcsActorScheduler::Schedule(std::shared_ptr actor) { RAY_CHECK(actor->GetNodeID().IsNil() && actor->GetWorkerID().IsNil()); - if (RayConfig::instance().gcs_actor_scheduling_enabled()) { + if (RayConfig::instance().gcs_actor_scheduling_enabled() && + !actor->GetCreationTaskSpecification().GetRequiredResources().IsEmpty()) { ScheduleByGcs(actor); } else { ScheduleByRaylet(actor); @@ -93,7 +94,10 @@ void GcsActorScheduler::ScheduleByGcs(std::shared_ptr actor) { }; // Queue and schedule the actor locally (gcs). - cluster_task_manager_->QueueAndScheduleTask(actor->GetCreationTaskSpecification(), + const auto &owner_node = gcs_node_manager_.GetAliveNode(actor->GetOwnerNodeID()); + RayTask task(actor->GetCreationTaskSpecification(), + owner_node.has_value() ? actor->GetOwnerNodeID().Binary() : std::string()); + cluster_task_manager_->QueueAndScheduleTask(task, /*grant_or_reject*/ false, /*is_selected_based_on_locality*/ false, /*reply*/ reply.get(), diff --git a/src/ray/raylet/local_task_manager.cc b/src/ray/raylet/local_task_manager.cc index 4a2ba1982424c..8ba81b26a94a7 100644 --- a/src/ray/raylet/local_task_manager.cc +++ b/src/ray/raylet/local_task_manager.cc @@ -334,7 +334,7 @@ void LocalTaskManager::SpillWaitingTasks() { if (!task.GetTaskSpecification().IsSpreadSchedulingStrategy()) { scheduling_node_id = cluster_resource_scheduler_->GetBestSchedulableNode( task.GetTaskSpecification(), - /*prioritize_local_node*/ true, + /*preferred_node_id*/ self_node_id_.Binary(), /*exclude_local_node*/ task_dependencies_blocked, /*requires_object_store_memory*/ true, &is_infeasible); @@ -379,7 +379,7 @@ bool LocalTaskManager::TrySpillback(const std::shared_ptr &work, // We should prefer to stay local if possible // to avoid unnecessary spillback // since this node is already selected by the cluster scheduler. - /*prioritize_local_node*/ true, + /*preferred_node_id*/ self_node_id_.Binary(), /*exclude_local_node*/ false, /*requires_object_store_memory*/ false, &is_infeasible); @@ -1022,7 +1022,12 @@ ResourceRequest LocalTaskManager::CalcNormalTaskResources() const { } if (auto allocated_instances = worker->GetAllocatedInstances()) { - total_normal_task_resources += allocated_instances->ToResourceRequest(); + auto resource_request = allocated_instances->ToResourceRequest(); + // Blocked normal task workers have temporarily released its allocated CPU. + if (worker->IsBlocked()) { + resource_request.Set(ResourceID::CPU(), 0); + } + total_normal_task_resources += resource_request; } } return total_normal_task_resources; diff --git a/src/ray/raylet/scheduling/cluster_resource_manager.h b/src/ray/raylet/scheduling/cluster_resource_manager.h index c41b9b31a6a91..1db8fcf0d3256 100644 --- a/src/ray/raylet/scheduling/cluster_resource_manager.h +++ b/src/ray/raylet/scheduling/cluster_resource_manager.h @@ -168,6 +168,7 @@ class ClusterResourceManager { FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingAddOrUpdateNodeTest); FRIEND_TEST(ClusterResourceSchedulerTest, NodeAffinitySchedulingStrategyTest); FRIEND_TEST(ClusterResourceSchedulerTest, SpreadSchedulingStrategyTest); + FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingWithPreferredNodeTest); FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingResourceRequestTest); FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingUpdateTotalResourcesTest); FRIEND_TEST(ClusterResourceSchedulerTest, diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc index 3fad6456f2222..35afa17f94b13 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc @@ -123,6 +123,7 @@ scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode( const rpc::SchedulingStrategy &scheduling_strategy, bool actor_creation, bool force_spillback, + const std::string &preferred_node_id, int64_t *total_violations, bool *is_infeasible) { // The zero cpu actor is a special case that must be handled the same way by all @@ -168,7 +169,8 @@ scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode( scheduling_policy_->Schedule(resource_request, SchedulingOptions::Hybrid( /*avoid_local_node*/ force_spillback, - /*require_node_available*/ force_spillback)); + /*require_node_available*/ force_spillback, + preferred_node_id)); } *is_infeasible = best_node_id.IsNil(); @@ -192,6 +194,7 @@ scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode( bool requires_object_store_memory, bool actor_creation, bool force_spillback, + const std::string &preferred_node_id, int64_t *total_violations, bool *is_infeasible) { ResourceRequest resource_request = @@ -200,6 +203,7 @@ scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode( scheduling_strategy, actor_creation, force_spillback, + preferred_node_id, total_violations, is_infeasible); } @@ -244,13 +248,13 @@ bool ClusterResourceScheduler::IsSchedulableOnNode( scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode( const TaskSpecification &task_spec, - bool prioritize_local_node, + const std::string &preferred_node_id, bool exclude_local_node, bool requires_object_store_memory, bool *is_infeasible) { // If the local node is available, we should directly return it instead of // going through the full hybrid policy since we don't want spillback. - if (prioritize_local_node && !exclude_local_node && + if (preferred_node_id == local_node_id_.Binary() && !exclude_local_node && IsSchedulableOnNode(local_node_id_, task_spec.GetRequiredResources().GetResourceMap(), requires_object_store_memory)) { @@ -266,6 +270,7 @@ scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode( requires_object_store_memory, task_spec.IsActorCreationTask(), exclude_local_node, + preferred_node_id, &_unused, is_infeasible); @@ -276,7 +281,7 @@ scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode( requires_object_store_memory)) { // Prefer waiting on the local node since the local node is chosen for a reason (e.g. // spread). - if (prioritize_local_node) { + if (preferred_node_id == local_node_id_.Binary()) { *is_infeasible = false; return local_node_id_; } diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.h b/src/ray/raylet/scheduling/cluster_resource_scheduler.h index 9c5b2e96ef11a..5e48369b91720 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.h +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.h @@ -79,7 +79,8 @@ class ClusterResourceScheduler { /// In hybrid mode, see `scheduling_policy.h` for a description of the policy. /// /// \param task_spec: Task/Actor to be scheduled. - /// \param prioritize_local_node: true if we want to try out local node first. + /// \param preferred_node_id: the node where the task is preferred to be placed. An + /// empty `preferred_node_id` (string) means no preferred node. /// \param exclude_local_node: true if we want to avoid local node. This will cancel /// prioritize_local_node if set to true. /// \param requires_object_store_memory: take object store memory usage as part of @@ -90,7 +91,7 @@ class ClusterResourceScheduler { /// \return emptry string, if no node can schedule the current request; otherwise, /// return the string name of a node that can schedule the resource request. scheduling::NodeID GetBestSchedulableNode(const TaskSpecification &task_spec, - bool prioritize_local_node, + const std::string &preferred_node_id, bool exclude_local_node, bool requires_object_store_memory, bool *is_infeasible); @@ -159,10 +160,11 @@ class ClusterResourceScheduler { /// \param scheduling_strategy: Strategy about how to schedule this task. /// \param actor_creation: True if this is an actor creation task. /// \param force_spillback: True if we want to avoid local node. - /// \param violations: The number of soft constraint violations associated + /// \param preferred_node_id: The node where the task is preferred to be placed. + /// \param violations[out]: The number of soft constraint violations associated /// with the node returned by this function (assuming /// a node that can schedule resource_request is found). - /// \param is_infeasible[in]: It is set true if the task is not schedulable because it + /// \param is_infeasible[out]: It is set true if the task is not schedulable because it /// is infeasible. /// /// \return -1, if no node can schedule the current request; otherwise, @@ -172,6 +174,7 @@ class ClusterResourceScheduler { const rpc::SchedulingStrategy &scheduling_strategy, bool actor_creation, bool force_spillback, + const std::string &preferred_node_id, int64_t *violations, bool *is_infeasible); @@ -187,6 +190,7 @@ class ClusterResourceScheduler { bool requires_object_store_memory, bool actor_creation, bool force_spillback, + const std::string &preferred_node_id, int64_t *violations, bool *is_infeasible); @@ -216,6 +220,7 @@ class ClusterResourceScheduler { FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingAddOrUpdateNodeTest); FRIEND_TEST(ClusterResourceSchedulerTest, NodeAffinitySchedulingStrategyTest); FRIEND_TEST(ClusterResourceSchedulerTest, SpreadSchedulingStrategyTest); + FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingWithPreferredNodeTest); FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingResourceRequestTest); FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingUpdateTotalResourcesTest); FRIEND_TEST(ClusterResourceSchedulerTest, diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc index 3c5ecf450413c..f1e2c49dd54fb 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc @@ -310,6 +310,7 @@ TEST_F(ClusterResourceSchedulerTest, NodeAffinitySchedulingStrategyTest) { false, false, false, + std::string(), &violations, &is_infeasible); ASSERT_EQ(node_id_1, remote_node_id); @@ -322,6 +323,7 @@ TEST_F(ClusterResourceSchedulerTest, NodeAffinitySchedulingStrategyTest) { false, false, false, + std::string(), &violations, &is_infeasible); ASSERT_EQ(node_id_2, local_node_id); @@ -334,6 +336,7 @@ TEST_F(ClusterResourceSchedulerTest, NodeAffinitySchedulingStrategyTest) { false, false, false, + std::string(), &violations, &is_infeasible); ASSERT_TRUE(node_id_3.IsNil()); @@ -346,6 +349,7 @@ TEST_F(ClusterResourceSchedulerTest, NodeAffinitySchedulingStrategyTest) { false, false, false, + std::string(), &violations, &is_infeasible); ASSERT_EQ(node_id_4, local_node_id); @@ -371,6 +375,7 @@ TEST_F(ClusterResourceSchedulerTest, SpreadSchedulingStrategyTest) { false, false, false, + std::string(), &violations, &is_infeasible); absl::flat_hash_map resource_available({{"CPU", 9}}); @@ -381,12 +386,52 @@ TEST_F(ClusterResourceSchedulerTest, SpreadSchedulingStrategyTest) { false, false, false, + std::string(), &violations, &is_infeasible); ASSERT_EQ((std::set{node_id_1, node_id_2}), (std::set{local_node_id, remote_node_id})); } +TEST_F(ClusterResourceSchedulerTest, SchedulingWithPreferredNodeTest) { + absl::flat_hash_map resource_total({{"CPU", 10}}); + auto local_node_id = scheduling::NodeID(NodeID::FromRandom().Binary()); + ClusterResourceScheduler resource_scheduler( + local_node_id, resource_total, is_node_available_fn_); + AssertPredefinedNodeResources(); + auto remote_node_id = scheduling::NodeID(NodeID::FromRandom().Binary()); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode( + remote_node_id, resource_total, resource_total); + + absl::flat_hash_map resource_request({{"CPU", 5}}); + int64_t violations; + bool is_infeasible; + rpc::SchedulingStrategy scheduling_strategy; + scheduling_strategy.mutable_default_scheduling_strategy(); + // Select node with the remote node preferred. + auto node_id_1 = resource_scheduler.GetBestSchedulableNode(resource_request, + scheduling_strategy, + false, + false, + false, + remote_node_id.Binary(), + &violations, + &is_infeasible); + + // If no preferred node specified, then still prefer the local one. + auto node_id_2 = resource_scheduler.GetBestSchedulableNode(resource_request, + scheduling_strategy, + false, + false, + false, + std::string(), + &violations, + &is_infeasible); + + ASSERT_EQ((std::set{node_id_1, node_id_2}), + (std::set{remote_node_id, local_node_id})); +} + TEST_F(ClusterResourceSchedulerTest, SchedulingUpdateAvailableResourcesTest) { // Create cluster resources. NodeResources node_resources = CreateNodeResources({{ResourceID::CPU(), 10}, @@ -408,8 +453,13 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingUpdateAvailableResourcesTest) { bool is_infeasible; rpc::SchedulingStrategy scheduling_strategy; scheduling_strategy.mutable_default_scheduling_strategy(); - auto node_id = resource_scheduler.GetBestSchedulableNode( - resource_request, scheduling_strategy, false, false, &violations, &is_infeasible); + auto node_id = resource_scheduler.GetBestSchedulableNode(resource_request, + scheduling_strategy, + false, + false, + std::string(), + &violations, + &is_infeasible); ASSERT_EQ(node_id.ToInt(), 1); ASSERT_TRUE(violations == 0); @@ -527,8 +577,13 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingResourceRequestTest) { ResourceRequest resource_request = CreateResourceRequest({{ResourceID::CPU(), 11}}); int64_t violations; bool is_infeasible; - auto node_id = resource_scheduler.GetBestSchedulableNode( - resource_request, scheduling_strategy, false, false, &violations, &is_infeasible); + auto node_id = resource_scheduler.GetBestSchedulableNode(resource_request, + scheduling_strategy, + false, + false, + std::string(), + &violations, + &is_infeasible); ASSERT_TRUE(node_id.IsNil()); } @@ -537,8 +592,13 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingResourceRequestTest) { ResourceRequest resource_request = CreateResourceRequest({{ResourceID::CPU(), 5}}); int64_t violations; bool is_infeasible; - auto node_id = resource_scheduler.GetBestSchedulableNode( - resource_request, scheduling_strategy, false, false, &violations, &is_infeasible); + auto node_id = resource_scheduler.GetBestSchedulableNode(resource_request, + scheduling_strategy, + false, + false, + std::string(), + &violations, + &is_infeasible); ASSERT_TRUE(!node_id.IsNil()); ASSERT_TRUE(violations == 0); } @@ -548,8 +608,13 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingResourceRequestTest) { {{ResourceID::CPU(), 5}, {ResourceID::Memory(), 2}, {ResourceID("custom1"), 11}}); int64_t violations; bool is_infeasible; - auto node_id = resource_scheduler.GetBestSchedulableNode( - resource_request, scheduling_strategy, false, false, &violations, &is_infeasible); + auto node_id = resource_scheduler.GetBestSchedulableNode(resource_request, + scheduling_strategy, + false, + false, + std::string(), + &violations, + &is_infeasible); ASSERT_TRUE(node_id.IsNil()); } // Custom resources, no constraint violation. @@ -558,8 +623,13 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingResourceRequestTest) { {{ResourceID::CPU(), 5}, {ResourceID::Memory(), 2}, {ResourceID("custom1"), 5}}); int64_t violations; bool is_infeasible; - auto node_id = resource_scheduler.GetBestSchedulableNode( - resource_request, scheduling_strategy, false, false, &violations, &is_infeasible); + auto node_id = resource_scheduler.GetBestSchedulableNode(resource_request, + scheduling_strategy, + false, + false, + std::string(), + &violations, + &is_infeasible); ASSERT_TRUE(!node_id.IsNil()); ASSERT_TRUE(violations == 0); } @@ -571,8 +641,13 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingResourceRequestTest) { {ResourceID("custom100"), 5}}); int64_t violations; bool is_infeasible; - auto node_id = resource_scheduler.GetBestSchedulableNode( - resource_request, scheduling_strategy, false, false, &violations, &is_infeasible); + auto node_id = resource_scheduler.GetBestSchedulableNode(resource_request, + scheduling_strategy, + false, + false, + std::string(), + &violations, + &is_infeasible); ASSERT_TRUE(node_id.IsNil()); } // Placement hints, no constraint violation. @@ -581,8 +656,13 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingResourceRequestTest) { {{ResourceID::CPU(), 5}, {ResourceID::Memory(), 2}, {ResourceID("custom1"), 5}}); int64_t violations; bool is_infeasible; - auto node_id = resource_scheduler.GetBestSchedulableNode( - resource_request, scheduling_strategy, false, false, &violations, &is_infeasible); + auto node_id = resource_scheduler.GetBestSchedulableNode(resource_request, + scheduling_strategy, + false, + false, + std::string(), + &violations, + &is_infeasible); ASSERT_TRUE(!node_id.IsNil()); ASSERT_TRUE(violations == 0); } @@ -888,6 +968,7 @@ TEST_F(ClusterResourceSchedulerTest, DeadNodeTest) { false, false, false, + std::string(), &violations, &is_infeasible)); EXPECT_CALL(*gcs_client_->mock_node_accessor, Get(node_id, ::testing::_)) @@ -899,6 +980,7 @@ TEST_F(ClusterResourceSchedulerTest, DeadNodeTest) { false, false, false, + std::string(), &violations, &is_infeasible) .IsNil()); @@ -1093,6 +1175,7 @@ TEST_F(ClusterResourceSchedulerTest, TestAlwaysSpillInfeasibleTask) { false, false, false, + std::string(), &total_violations, &is_infeasible) .IsNil()); @@ -1108,6 +1191,7 @@ TEST_F(ClusterResourceSchedulerTest, TestAlwaysSpillInfeasibleTask) { false, false, false, + std::string(), &total_violations, &is_infeasible)); @@ -1122,6 +1206,7 @@ TEST_F(ClusterResourceSchedulerTest, TestAlwaysSpillInfeasibleTask) { false, false, false, + std::string(), &total_violations, &is_infeasible)); } @@ -1339,19 +1424,29 @@ TEST_F(ClusterResourceSchedulerTest, DirtyLocalViewTest) { resource_scheduler.GetClusterResourceManager().AddOrUpdateNode( remote, {{"CPU", 2.}}, {{"CPU", num_slots_available}}); for (int j = 0; j < num_slots_available; j++) { - ASSERT_EQ( - remote, - resource_scheduler.GetBestSchedulableNode( - task_spec, scheduling_strategy, false, false, true, &t, &is_infeasible)); + ASSERT_EQ(remote, + resource_scheduler.GetBestSchedulableNode(task_spec, + scheduling_strategy, + false, + false, + true, + std::string(), + &t, + &is_infeasible)); // Allocate remote resources. ASSERT_TRUE(resource_scheduler.AllocateRemoteTaskResources(remote, task_spec)); } // Our local view says there are not enough resources on the remote node to // schedule another task. - ASSERT_EQ( - resource_scheduler.GetBestSchedulableNode( - task_spec, scheduling_strategy, false, false, true, &t, &is_infeasible), - scheduling::NodeID::Nil()); + ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(task_spec, + scheduling_strategy, + false, + false, + true, + std::string(), + &t, + &is_infeasible), + scheduling::NodeID::Nil()); ASSERT_FALSE( resource_scheduler.GetLocalResourceManager().AllocateLocalTaskResources( task_spec, task_allocation)); @@ -1371,32 +1466,62 @@ TEST_F(ClusterResourceSchedulerTest, DynamicResourceTest) { rpc::SchedulingStrategy scheduling_strategy; scheduling_strategy.mutable_default_scheduling_strategy(); - auto result = resource_scheduler.GetBestSchedulableNode( - resource_request, scheduling_strategy, false, false, false, &t, &is_infeasible); + auto result = resource_scheduler.GetBestSchedulableNode(resource_request, + scheduling_strategy, + false, + false, + false, + std::string(), + &t, + &is_infeasible); ASSERT_TRUE(result.IsNil()); resource_scheduler.GetLocalResourceManager().AddLocalResourceInstances( scheduling::ResourceID("custom123"), {0., 1.0, 1.0}); - result = resource_scheduler.GetBestSchedulableNode( - resource_request, scheduling_strategy, false, false, false, &t, &is_infeasible); + result = resource_scheduler.GetBestSchedulableNode(resource_request, + scheduling_strategy, + false, + false, + false, + std::string(), + &t, + &is_infeasible); ASSERT_FALSE(result.IsNil()) << resource_scheduler.DebugString(); resource_request["custom123"] = 3; - result = resource_scheduler.GetBestSchedulableNode( - resource_request, scheduling_strategy, false, false, false, &t, &is_infeasible); + result = resource_scheduler.GetBestSchedulableNode(resource_request, + scheduling_strategy, + false, + false, + false, + std::string(), + &t, + &is_infeasible); ASSERT_TRUE(result.IsNil()); resource_scheduler.GetLocalResourceManager().AddLocalResourceInstances( scheduling::ResourceID("custom123"), {1.0}); - result = resource_scheduler.GetBestSchedulableNode( - resource_request, scheduling_strategy, false, false, false, &t, &is_infeasible); + result = resource_scheduler.GetBestSchedulableNode(resource_request, + scheduling_strategy, + false, + false, + false, + std::string(), + &t, + &is_infeasible); ASSERT_FALSE(result.IsNil()); resource_scheduler.GetLocalResourceManager().DeleteLocalResource( scheduling::ResourceID("custom123")); - result = resource_scheduler.GetBestSchedulableNode( - resource_request, scheduling_strategy, false, false, false, &t, &is_infeasible); + result = resource_scheduler.GetBestSchedulableNode(resource_request, + scheduling_strategy, + false, + false, + false, + std::string(), + &t, + &is_infeasible); ASSERT_TRUE(result.IsNil()); } @@ -1436,6 +1561,7 @@ TEST_F(ClusterResourceSchedulerTest, TestForceSpillback) { false, false, /*force_spillback=*/false, + std::string(), &total_violations, &is_infeasible), scheduling::NodeID("local")); @@ -1446,6 +1572,7 @@ TEST_F(ClusterResourceSchedulerTest, TestForceSpillback) { false, false, /*force_spillback=*/true, + std::string(), &total_violations, &is_infeasible), scheduling::NodeID::Nil()); @@ -1457,6 +1584,7 @@ TEST_F(ClusterResourceSchedulerTest, TestForceSpillback) { false, false, /*force_spillback=*/true, + std::string(), &total_violations, &is_infeasible), scheduling::NodeID::Nil()); @@ -1467,6 +1595,7 @@ TEST_F(ClusterResourceSchedulerTest, TestForceSpillback) { false, false, /*force_spillback=*/true, + std::string(), &total_violations, &is_infeasible), node_ids[51]); @@ -1574,6 +1703,7 @@ TEST_F(ClusterResourceSchedulerTest, AffinityWithBundleScheduleTest) { scheduling_strategy, true, false, + std::string(), &violations, &is_infeasible), except_node_id); diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index 64e25862aeabb..1d87aaaca0e88 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -97,7 +97,8 @@ void ClusterTaskManager::ScheduleAndDispatchTasks() { << task.GetTaskSpecification().TaskId(); auto scheduling_node_id = cluster_resource_scheduler_->GetBestSchedulableNode( task.GetTaskSpecification(), - work->PrioritizeLocalNode(), + /*preferred_node_id*/ work->PrioritizeLocalNode() ? self_node_id_.Binary() + : task.GetPreferredNodeID(), /*exclude_local_node*/ false, /*requires_object_store_memory*/ false, &is_infeasible); @@ -191,7 +192,8 @@ void ClusterTaskManager::TryScheduleInfeasibleTask() { bool is_infeasible; cluster_resource_scheduler_->GetBestSchedulableNode( task.GetTaskSpecification(), - work->PrioritizeLocalNode(), + /*preferred_node_id*/ work->PrioritizeLocalNode() ? self_node_id_.Binary() + : task.GetPreferredNodeID(), /*exclude_local_node*/ false, /*requires_object_store_memory*/ false, &is_infeasible); diff --git a/src/ray/raylet/scheduling/policy/hybrid_scheduling_policy.cc b/src/ray/raylet/scheduling/policy/hybrid_scheduling_policy.cc index e23664d81e26e..0b83a52b242c5 100644 --- a/src/ray/raylet/scheduling/policy/hybrid_scheduling_policy.cc +++ b/src/ray/raylet/scheduling/policy/hybrid_scheduling_policy.cc @@ -28,14 +28,18 @@ scheduling::NodeID HybridSchedulingPolicy::HybridPolicyWithFilter( float spread_threshold, bool force_spillback, bool require_node_available, + const std::string &preferred_node, NodeFilter node_filter) { - // Step 1: Generate the traversal order. We guarantee that the first node is local, to - // encourage local scheduling. The rest of the traversal order should be globally - // consistent, to encourage using "warm" workers. + // Step 1: Generate the traversal order. We guarantee that the first node is local (or + // the preferred node, if provided), to encourage local/preferred scheduling. The rest + // of the traversal order should be globally consistent, to encourage using "warm" + // workers. std::vector round; round.reserve(nodes_.size()); - const auto local_it = nodes_.find(local_node_id_); - RAY_CHECK(local_it != nodes_.end()); + auto preferred_node_id = + preferred_node.empty() ? local_node_id_ : scheduling::NodeID(preferred_node); + auto preferred_it = nodes_.find(preferred_node_id); + RAY_CHECK(preferred_it != nodes_.end()); auto predicate = [this, node_filter](scheduling::NodeID node_id, const NodeResources &node_resources) { if (!is_node_available_(node_id)) { @@ -52,18 +56,18 @@ scheduling::NodeID HybridSchedulingPolicy::HybridPolicyWithFilter( return !has_gpu; }; - const auto &local_node_view = local_it->second.GetLocalView(); - // If we should include local node at all, make sure it is at the front of the list - // so that + const auto &preferred_node_view = preferred_it->second.GetLocalView(); + // If we should include local/preferred node at all, make sure it is at the front of the + // list so that // 1. It's first in traversal order. // 2. It's easy to avoid sorting it. - if (predicate(local_node_id_, local_node_view) && !force_spillback) { - round.push_back(local_node_id_); + if (predicate(preferred_node_id, preferred_node_view) && !force_spillback) { + round.push_back(preferred_node_id); } const auto start_index = round.size(); for (const auto &pair : nodes_) { - if (pair.first != local_node_id_ && + if (pair.first != preferred_node_id && predicate(pair.first, pair.second.GetLocalView())) { round.push_back(pair.first); } @@ -144,7 +148,8 @@ scheduling::NodeID HybridSchedulingPolicy::Schedule( return HybridPolicyWithFilter(resource_request, options.spread_threshold, options.avoid_local_node, - options.require_node_available); + options.require_node_available, + options.preferred_node_id); } // Try schedule on non-GPU nodes. @@ -152,6 +157,7 @@ scheduling::NodeID HybridSchedulingPolicy::Schedule( options.spread_threshold, options.avoid_local_node, /*require_node_available*/ true, + options.preferred_node_id, NodeFilter::kNonGpu); if (!best_node_id.IsNil()) { return best_node_id; @@ -162,7 +168,8 @@ scheduling::NodeID HybridSchedulingPolicy::Schedule( return HybridPolicyWithFilter(resource_request, options.spread_threshold, options.avoid_local_node, - options.require_node_available); + options.require_node_available, + options.preferred_node_id); } } // namespace raylet_scheduling_policy diff --git a/src/ray/raylet/scheduling/policy/hybrid_scheduling_policy.h b/src/ray/raylet/scheduling/policy/hybrid_scheduling_policy.h index 4dd3a055889d6..773eaf8fcb6b0 100644 --- a/src/ray/raylet/scheduling/policy/hybrid_scheduling_policy.h +++ b/src/ray/raylet/scheduling/policy/hybrid_scheduling_policy.h @@ -89,6 +89,7 @@ class HybridSchedulingPolicy : public ISchedulingPolicy { float spread_threshold, bool force_spillback, bool require_available, + const std::string &preferred_node, NodeFilter node_filter = NodeFilter::kAny); }; } // namespace raylet_scheduling_policy diff --git a/src/ray/raylet/scheduling/policy/scheduling_options.h b/src/ray/raylet/scheduling/policy/scheduling_options.h index 42b852e52020f..cd624f9b610c8 100644 --- a/src/ray/raylet/scheduling/policy/scheduling_options.h +++ b/src/ray/raylet/scheduling/policy/scheduling_options.h @@ -57,12 +57,17 @@ struct SchedulingOptions { } // construct option for hybrid scheduling policy. - static SchedulingOptions Hybrid(bool avoid_local_node, bool require_node_available) { + static SchedulingOptions Hybrid(bool avoid_local_node, + bool require_node_available, + const std::string &preferred_node_id = std::string()) { return SchedulingOptions(SchedulingType::HYBRID, RayConfig::instance().scheduler_spread_threshold(), avoid_local_node, require_node_available, - RayConfig::instance().scheduler_avoid_gpu_nodes()); + RayConfig::instance().scheduler_avoid_gpu_nodes(), + /*max_cpu_fraction_per_node*/ 1.0, + /*scheduling_context*/ nullptr, + preferred_node_id); } static SchedulingOptions NodeAffinity(bool avoid_local_node, @@ -152,6 +157,9 @@ struct SchedulingOptions { std::shared_ptr scheduling_context; std::string node_affinity_node_id; bool node_affinity_soft = false; + // The node where the task is preferred to be placed. By default, this node id + // is empty, which means no preferred node. + std::string preferred_node_id; private: SchedulingOptions(SchedulingType type, @@ -160,14 +168,16 @@ struct SchedulingOptions { bool require_node_available, bool avoid_gpu_nodes, double max_cpu_fraction_per_node = 1.0, - std::shared_ptr scheduling_context = nullptr) + std::shared_ptr scheduling_context = nullptr, + const std::string &preferred_node_id = std::string()) : scheduling_type(type), spread_threshold(spread_threshold), avoid_local_node(avoid_local_node), require_node_available(require_node_available), avoid_gpu_nodes(avoid_gpu_nodes), max_cpu_fraction_per_node(max_cpu_fraction_per_node), - scheduling_context(std::move(scheduling_context)) {} + scheduling_context(std::move(scheduling_context)), + preferred_node_id(preferred_node_id) {} friend class ::ray::raylet::SchedulingPolicyTest; };