From 30b0199537f6380fe8e8d2fcfa131755976efd1f Mon Sep 17 00:00:00 2001 From: Chong-Li Date: Wed, 30 Nov 2022 22:22:56 +0800 Subject: [PATCH 1/7] Prefer actor owner's node Signed-off-by: Chong-Li --- python/ray/_private/ray_constants.py | 2 +- .../many_nodes_tests/multi_master_test.py | 89 +++++++++++++++++++ release/release_tests.yaml | 21 +++++ src/ray/common/ray_config_def.h | 2 +- src/ray/common/task/task.cc | 7 ++ src/ray/common/task/task.h | 9 ++ src/ray/gcs/gcs_server/gcs_actor_scheduler.cc | 6 +- src/ray/raylet/local_task_manager.cc | 9 +- .../scheduling/cluster_resource_scheduler.cc | 8 +- .../scheduling/cluster_resource_scheduler.h | 9 +- .../raylet/scheduling/cluster_task_manager.cc | 2 + .../policy/hybrid_scheduling_policy.cc | 33 ++++--- .../policy/hybrid_scheduling_policy.h | 1 + .../scheduling/policy/scheduling_options.h | 16 +++- 14 files changed, 189 insertions(+), 25 deletions(-) create mode 100644 release/nightly_tests/many_nodes_tests/multi_master_test.py diff --git a/python/ray/_private/ray_constants.py b/python/ray/_private/ray_constants.py index 383a2900bc8d3..33dab470a1a6c 100644 --- a/python/ray/_private/ray_constants.py +++ b/python/ray/_private/ray_constants.py @@ -359,7 +359,7 @@ def env_bool(key, default): def gcs_actor_scheduling_enabled(): - return os.environ.get("RAY_gcs_actor_scheduling_enabled") == "true" + return os.environ.get("RAY_gcs_actor_scheduling_enabled") != "false" DEFAULT_RESOURCES = {"CPU", "GPU", "memory", "object_store_memory"} diff --git a/release/nightly_tests/many_nodes_tests/multi_master_test.py b/release/nightly_tests/many_nodes_tests/multi_master_test.py new file mode 100644 index 0000000000000..b9148a4bd50a6 --- /dev/null +++ b/release/nightly_tests/many_nodes_tests/multi_master_test.py @@ -0,0 +1,89 @@ +import argparse +import os +from time import sleep, perf_counter +import json +import ray + + +def test_max_actors_launch(cpus_per_actor, total_actors, num_masters): + # By default, there are 50 groups, each group has 1 master and 99 slaves. + num_slaves_per_master = total_actors/num_masters - 1 + + @ray.remote(num_cpus=cpus_per_actor) + class Actor: + def foo(self): + pass + + def create(self): + return [ + Actor.options(max_restarts=-1).remote() + for _ in range(num_slaves_per_master) + ] + + print("Start launch actors") + # The 50 masters are spreaded. + actors = [ + Actor.options(max_restarts=-1, scheduling_strategy="SPREAD").remote() + for _ in range(num_masters) + ] + slaves_per_master = [] + for master in actors: + slaves_per_master.append(master.create.remote()) + for slaves in slaves_per_master: + actors.extend(ray.get(slaves)) + return actors + + +def test_actor_ready(actors): + remaining = [actor.foo.remote() for actor in actors] + ray.get(remaining) + + +def parse_script_args(): + parser = argparse.ArgumentParser() + parser.add_argument("--cpus-per-actor", type=float, default=0.2) + parser.add_argument("--total-actors", type=int, default=5000) + parser.add_argument("--num-masters", type=int, default=50) + parser.add_argument("--no-report", default=False, action="store_true") + parser.add_argument("--fail", default=False, action="store_true") + return parser.parse_known_args() + + +def main(): + args, unknown = parse_script_args() + + ray.init(address="auto") + + actor_launch_start = perf_counter() + actors = test_max_actors_launch(args.cpus_per_actor, args.total_actors, args.num_masters) + actor_launch_end = perf_counter() + actor_launch_time = actor_launch_end - actor_launch_start + if args.fail: + sleep(10) + return + actor_ready_start = perf_counter() + test_actor_ready(actors) + actor_ready_end = perf_counter() + actor_ready_time = actor_ready_end - actor_ready_start + + print(f"Actor launch time: {actor_launch_time} ({args.total_actors} actors)") + print(f"Actor ready time: {actor_ready_time} ({args.total_actors} actors)") + print( + f"Total time: {actor_launch_time + actor_ready_time}" + f" ({args.total_actors} actors)" + ) + + if "TEST_OUTPUT_JSON" in os.environ and not args.no_report: + out_file = open(os.environ["TEST_OUTPUT_JSON"], "w") + results = { + "actor_launch_time": actor_launch_time, + "actor_ready_time": actor_ready_time, + "total_time": actor_launch_time + actor_ready_time, + "num_actors": args.total_actors, + "success": "1", + } + json.dump(results, out_file) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/release/release_tests.yaml b/release/release_tests.yaml index 868060185ad37..2d110da4093bd 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -3986,6 +3986,27 @@ type: sdk_command file_manager: sdk +- name: many_nodes_multi_master_test + group: core-daily-test + working_dir: nightly_tests + legacy: + test_name: many_nodes_multi_master_test + test_suite: nightly_tests + + frequency: nightly-3x + team: core + cluster: + cluster_env: many_nodes_tests/app_config.yaml + cluster_compute: many_nodes_tests/compute_config.yaml + + run: + timeout: 7200 + script: python many_nodes_tests/multi_master_test.py + wait_for_nodes: + num_nodes: 251 + + type: sdk_command + file_manager: sdk - name: pg_autoscaling_regression_test group: core-daily-test diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index 21d59e914a888..a0ef7396f346f 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -602,7 +602,7 @@ RAY_CONFIG(uint64_t, subscriber_timeout_ms, 300 * 1000) RAY_CONFIG(uint64_t, gcs_actor_table_min_duration_ms, /* 5 min */ 60 * 1000 * 5) /// Whether to enable GCS-based actor scheduling. -RAY_CONFIG(bool, gcs_actor_scheduling_enabled, false); +RAY_CONFIG(bool, gcs_actor_scheduling_enabled, true); RAY_CONFIG(uint32_t, max_error_msg_size_bytes, 512 * 1024) diff --git a/src/ray/common/task/task.cc b/src/ray/common/task/task.cc index 734ad527a16d2..e2ac8571c4e5e 100644 --- a/src/ray/common/task/task.cc +++ b/src/ray/common/task/task.cc @@ -26,12 +26,19 @@ RayTask::RayTask(TaskSpecification task_spec) : task_spec_(std::move(task_spec)) ComputeDependencies(); } +RayTask::RayTask(TaskSpecification task_spec, std::string preferred_node_id) + : task_spec_(std::move(task_spec)), preferred_node_id_(std::move(preferred_node_id)) { + ComputeDependencies(); +} + const TaskSpecification &RayTask::GetTaskSpecification() const { return task_spec_; } const std::vector &RayTask::GetDependencies() const { return dependencies_; } +const std::string &RayTask::GetPreferredNodeID() const { return preferred_node_id_; } + void RayTask::ComputeDependencies() { dependencies_ = task_spec_.GetDependencies(); } std::string RayTask::DebugString() const { diff --git a/src/ray/common/task/task.h b/src/ray/common/task/task.h index ca5fb324b765a..fce9cde177572 100644 --- a/src/ray/common/task/task.h +++ b/src/ray/common/task/task.h @@ -43,6 +43,8 @@ class RayTask { /// Construct a `RayTask` object from a `TaskSpecification`. RayTask(TaskSpecification task_spec); + RayTask(TaskSpecification task_spec, std::string preferred_node_id); + /// Get the immutable specification for the task. /// /// \return The immutable specification for the task. @@ -54,6 +56,11 @@ class RayTask { /// \return The object dependencies. const std::vector &GetDependencies() const; + /// Get the task's preferred node id for scheduling. + /// + /// \return The preferred node id. + const std::string &GetPreferredNodeID() const; + std::string DebugString() const; private: @@ -66,6 +73,8 @@ class RayTask { /// A cached copy of the task's object dependencies, including arguments from /// the TaskSpecification. std::vector dependencies_; + + std::string preferred_node_id_; }; } // namespace ray diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc index 6d98d8b02aa29..2e4b2efd541c6 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc @@ -49,7 +49,8 @@ GcsActorScheduler::GcsActorScheduler( void GcsActorScheduler::Schedule(std::shared_ptr actor) { RAY_CHECK(actor->GetNodeID().IsNil() && actor->GetWorkerID().IsNil()); - if (RayConfig::instance().gcs_actor_scheduling_enabled()) { + if (RayConfig::instance().gcs_actor_scheduling_enabled() && + !actor->GetCreationTaskSpecification().GetRequiredResources().IsEmpty()) { ScheduleByGcs(actor); } else { ScheduleByRaylet(actor); @@ -93,7 +94,8 @@ void GcsActorScheduler::ScheduleByGcs(std::shared_ptr actor) { }; // Queue and schedule the actor locally (gcs). - cluster_task_manager_->QueueAndScheduleTask(actor->GetCreationTaskSpecification(), + RayTask task(actor->GetCreationTaskSpecification(), actor->GetOwnerNodeID().Binary()); + cluster_task_manager_->QueueAndScheduleTask(task, /*grant_or_reject*/ false, /*is_selected_based_on_locality*/ false, /*reply*/ reply.get(), diff --git a/src/ray/raylet/local_task_manager.cc b/src/ray/raylet/local_task_manager.cc index 22ce849064538..5cab5351fa3e1 100644 --- a/src/ray/raylet/local_task_manager.cc +++ b/src/ray/raylet/local_task_manager.cc @@ -335,6 +335,7 @@ void LocalTaskManager::SpillWaitingTasks() { /*prioritize_local_node*/ true, /*exclude_local_node*/ task_dependencies_blocked, /*requires_object_store_memory*/ true, + /*preferred_node_id*/ task.GetPreferredNodeID(), &is_infeasible); } else { // If scheduling strategy is spread, we prefer honoring spread decision @@ -380,6 +381,7 @@ bool LocalTaskManager::TrySpillback(const std::shared_ptr &work, /*prioritize_local_node*/ true, /*exclude_local_node*/ false, /*requires_object_store_memory*/ false, + /*preferred_node_id*/ work->task.GetPreferredNodeID(), &is_infeasible); if (is_infeasible || scheduling_node_id.IsNil() || @@ -1020,7 +1022,12 @@ ResourceRequest LocalTaskManager::CalcNormalTaskResources() const { } if (auto allocated_instances = worker->GetAllocatedInstances()) { - total_normal_task_resources += allocated_instances->ToResourceRequest(); + auto resource_request = allocated_instances->ToResourceRequest(); + // Blocked normal task workers have temporarily released its allocated CPU. + if (worker->IsBlocked()) { + resource_request.Set(ResourceID::CPU(), 0); + } + total_normal_task_resources += resource_request; } } return total_normal_task_resources; diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc index 3fad6456f2222..efe3e89b1ed36 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc @@ -123,6 +123,7 @@ scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode( const rpc::SchedulingStrategy &scheduling_strategy, bool actor_creation, bool force_spillback, + const std::string &preferred_node_id, int64_t *total_violations, bool *is_infeasible) { // The zero cpu actor is a special case that must be handled the same way by all @@ -168,7 +169,8 @@ scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode( scheduling_policy_->Schedule(resource_request, SchedulingOptions::Hybrid( /*avoid_local_node*/ force_spillback, - /*require_node_available*/ force_spillback)); + /*require_node_available*/ force_spillback, + preferred_node_id)); } *is_infeasible = best_node_id.IsNil(); @@ -192,6 +194,7 @@ scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode( bool requires_object_store_memory, bool actor_creation, bool force_spillback, + const std::string &preferred_node_id, int64_t *total_violations, bool *is_infeasible) { ResourceRequest resource_request = @@ -200,6 +203,7 @@ scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode( scheduling_strategy, actor_creation, force_spillback, + preferred_node_id, total_violations, is_infeasible); } @@ -247,6 +251,7 @@ scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode( bool prioritize_local_node, bool exclude_local_node, bool requires_object_store_memory, + const std::string &preferred_node_id, bool *is_infeasible) { // If the local node is available, we should directly return it instead of // going through the full hybrid policy since we don't want spillback. @@ -266,6 +271,7 @@ scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode( requires_object_store_memory, task_spec.IsActorCreationTask(), exclude_local_node, + preferred_node_id, &_unused, is_infeasible); diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.h b/src/ray/raylet/scheduling/cluster_resource_scheduler.h index ff576e59ae8cd..da6d43138efdf 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.h +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.h @@ -85,6 +85,7 @@ class ClusterResourceScheduler { /// prioritize_local_node if set to true. /// \param requires_object_store_memory: take object store memory usage as part of /// scheduling decision. + /// \param preferred_node_id: the node where the task is preferred to be placed. /// \param is_infeasible[out]: It is set /// true if the task is not schedulable because it is infeasible. /// @@ -94,6 +95,7 @@ class ClusterResourceScheduler { bool prioritize_local_node, bool exclude_local_node, bool requires_object_store_memory, + const std::string &preferred_node_id, bool *is_infeasible); /// Subtract the resources required by a given resource request (resource_request) from @@ -160,10 +162,11 @@ class ClusterResourceScheduler { /// \param scheduling_strategy: Strategy about how to schedule this task. /// \param actor_creation: True if this is an actor creation task. /// \param force_spillback: True if we want to avoid local node. - /// \param violations: The number of soft constraint violations associated + /// \param preferred_node_id: The node where the task is preferred to be placed. + /// \param violations[out]: The number of soft constraint violations associated /// with the node returned by this function (assuming /// a node that can schedule resource_request is found). - /// \param is_infeasible[in]: It is set true if the task is not schedulable because it + /// \param is_infeasible[out]: It is set true if the task is not schedulable because it /// is infeasible. /// /// \return -1, if no node can schedule the current request; otherwise, @@ -173,6 +176,7 @@ class ClusterResourceScheduler { const rpc::SchedulingStrategy &scheduling_strategy, bool actor_creation, bool force_spillback, + const std::string &preferred_node_id, int64_t *violations, bool *is_infeasible); @@ -188,6 +192,7 @@ class ClusterResourceScheduler { bool requires_object_store_memory, bool actor_creation, bool force_spillback, + const std::string &preferred_node_id, int64_t *violations, bool *is_infeasible); diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index 64e25862aeabb..a5178daa715ad 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -100,6 +100,7 @@ void ClusterTaskManager::ScheduleAndDispatchTasks() { work->PrioritizeLocalNode(), /*exclude_local_node*/ false, /*requires_object_store_memory*/ false, + /*preferred_node_id*/ task.GetPreferredNodeID(), &is_infeasible); // There is no node that has available resources to run the request. @@ -194,6 +195,7 @@ void ClusterTaskManager::TryScheduleInfeasibleTask() { work->PrioritizeLocalNode(), /*exclude_local_node*/ false, /*requires_object_store_memory*/ false, + /*preferred_node_id*/ task.GetPreferredNodeID(), &is_infeasible); // There is no node that has available resources to run the request. diff --git a/src/ray/raylet/scheduling/policy/hybrid_scheduling_policy.cc b/src/ray/raylet/scheduling/policy/hybrid_scheduling_policy.cc index e23664d81e26e..6742cd78dddda 100644 --- a/src/ray/raylet/scheduling/policy/hybrid_scheduling_policy.cc +++ b/src/ray/raylet/scheduling/policy/hybrid_scheduling_policy.cc @@ -28,14 +28,18 @@ scheduling::NodeID HybridSchedulingPolicy::HybridPolicyWithFilter( float spread_threshold, bool force_spillback, bool require_node_available, + const std::string &preferred_node, NodeFilter node_filter) { - // Step 1: Generate the traversal order. We guarantee that the first node is local, to - // encourage local scheduling. The rest of the traversal order should be globally - // consistent, to encourage using "warm" workers. + // Step 1: Generate the traversal order. We guarantee that the first node is local (or + // the preferred node, if provided), to encourage local/preferred scheduling. The rest + // of the traversal order should be globally consistent, to encourage using "warm" + // workers. std::vector round; round.reserve(nodes_.size()); - const auto local_it = nodes_.find(local_node_id_); - RAY_CHECK(local_it != nodes_.end()); + auto preferred_node_id = + preferred_node.empty() ? local_node_id_ : scheduling::NodeID(preferred_node); + const auto preferred_it = nodes_.find(preferred_node_id); + RAY_CHECK(preferred_it != nodes_.end()); auto predicate = [this, node_filter](scheduling::NodeID node_id, const NodeResources &node_resources) { if (!is_node_available_(node_id)) { @@ -52,18 +56,18 @@ scheduling::NodeID HybridSchedulingPolicy::HybridPolicyWithFilter( return !has_gpu; }; - const auto &local_node_view = local_it->second.GetLocalView(); - // If we should include local node at all, make sure it is at the front of the list - // so that + const auto &preferred_node_view = preferred_it->second.GetLocalView(); + // If we should include local/preferred node at all, make sure it is at the front of the + // list so that // 1. It's first in traversal order. // 2. It's easy to avoid sorting it. - if (predicate(local_node_id_, local_node_view) && !force_spillback) { - round.push_back(local_node_id_); + if (predicate(preferred_node_id, preferred_node_view) && !force_spillback) { + round.push_back(preferred_node_id); } const auto start_index = round.size(); for (const auto &pair : nodes_) { - if (pair.first != local_node_id_ && + if (pair.first != preferred_node_id && predicate(pair.first, pair.second.GetLocalView())) { round.push_back(pair.first); } @@ -144,7 +148,8 @@ scheduling::NodeID HybridSchedulingPolicy::Schedule( return HybridPolicyWithFilter(resource_request, options.spread_threshold, options.avoid_local_node, - options.require_node_available); + options.require_node_available, + options.preferred_node_id); } // Try schedule on non-GPU nodes. @@ -152,6 +157,7 @@ scheduling::NodeID HybridSchedulingPolicy::Schedule( options.spread_threshold, options.avoid_local_node, /*require_node_available*/ true, + options.preferred_node_id, NodeFilter::kNonGpu); if (!best_node_id.IsNil()) { return best_node_id; @@ -162,7 +168,8 @@ scheduling::NodeID HybridSchedulingPolicy::Schedule( return HybridPolicyWithFilter(resource_request, options.spread_threshold, options.avoid_local_node, - options.require_node_available); + options.require_node_available, + options.preferred_node_id); } } // namespace raylet_scheduling_policy diff --git a/src/ray/raylet/scheduling/policy/hybrid_scheduling_policy.h b/src/ray/raylet/scheduling/policy/hybrid_scheduling_policy.h index 4dd3a055889d6..773eaf8fcb6b0 100644 --- a/src/ray/raylet/scheduling/policy/hybrid_scheduling_policy.h +++ b/src/ray/raylet/scheduling/policy/hybrid_scheduling_policy.h @@ -89,6 +89,7 @@ class HybridSchedulingPolicy : public ISchedulingPolicy { float spread_threshold, bool force_spillback, bool require_available, + const std::string &preferred_node, NodeFilter node_filter = NodeFilter::kAny); }; } // namespace raylet_scheduling_policy diff --git a/src/ray/raylet/scheduling/policy/scheduling_options.h b/src/ray/raylet/scheduling/policy/scheduling_options.h index 42b852e52020f..3709f0265f79f 100644 --- a/src/ray/raylet/scheduling/policy/scheduling_options.h +++ b/src/ray/raylet/scheduling/policy/scheduling_options.h @@ -57,12 +57,17 @@ struct SchedulingOptions { } // construct option for hybrid scheduling policy. - static SchedulingOptions Hybrid(bool avoid_local_node, bool require_node_available) { + static SchedulingOptions Hybrid(bool avoid_local_node, + bool require_node_available, + const std::string &preferred_node_id = std::string()) { return SchedulingOptions(SchedulingType::HYBRID, RayConfig::instance().scheduler_spread_threshold(), avoid_local_node, require_node_available, - RayConfig::instance().scheduler_avoid_gpu_nodes()); + RayConfig::instance().scheduler_avoid_gpu_nodes(), + /*max_cpu_fraction_per_node*/ 1.0, + /*scheduling_context*/ nullptr, + preferred_node_id); } static SchedulingOptions NodeAffinity(bool avoid_local_node, @@ -152,6 +157,7 @@ struct SchedulingOptions { std::shared_ptr scheduling_context; std::string node_affinity_node_id; bool node_affinity_soft = false; + std::string preferred_node_id; private: SchedulingOptions(SchedulingType type, @@ -160,14 +166,16 @@ struct SchedulingOptions { bool require_node_available, bool avoid_gpu_nodes, double max_cpu_fraction_per_node = 1.0, - std::shared_ptr scheduling_context = nullptr) + std::shared_ptr scheduling_context = nullptr, + const std::string &preferred_node_id = std::string()) : scheduling_type(type), spread_threshold(spread_threshold), avoid_local_node(avoid_local_node), require_node_available(require_node_available), avoid_gpu_nodes(avoid_gpu_nodes), max_cpu_fraction_per_node(max_cpu_fraction_per_node), - scheduling_context(std::move(scheduling_context)) {} + scheduling_context(std::move(scheduling_context)), + preferred_node_id(preferred_node_id) {} friend class ::ray::raylet::SchedulingPolicyTest; }; From 5fe48e6d9c98c6a3c0c16e47ff7d5414ee99cf3f Mon Sep 17 00:00:00 2001 From: Chong-Li Date: Wed, 30 Nov 2022 23:00:10 +0800 Subject: [PATCH 2/7] Fix Signed-off-by: Chong-Li --- .../many_nodes_tests/multi_master_test.py | 8 +- .../cluster_resource_scheduler_test.cc | 155 ++++++++++++++---- 2 files changed, 128 insertions(+), 35 deletions(-) diff --git a/release/nightly_tests/many_nodes_tests/multi_master_test.py b/release/nightly_tests/many_nodes_tests/multi_master_test.py index b9148a4bd50a6..b3f706424eb5a 100644 --- a/release/nightly_tests/many_nodes_tests/multi_master_test.py +++ b/release/nightly_tests/many_nodes_tests/multi_master_test.py @@ -7,7 +7,7 @@ def test_max_actors_launch(cpus_per_actor, total_actors, num_masters): # By default, there are 50 groups, each group has 1 master and 99 slaves. - num_slaves_per_master = total_actors/num_masters - 1 + num_slaves_per_master = total_actors / num_masters - 1 @ray.remote(num_cpus=cpus_per_actor) class Actor: @@ -55,7 +55,9 @@ def main(): ray.init(address="auto") actor_launch_start = perf_counter() - actors = test_max_actors_launch(args.cpus_per_actor, args.total_actors, args.num_masters) + actors = test_max_actors_launch( + args.cpus_per_actor, args.total_actors, args.num_masters + ) actor_launch_end = perf_counter() actor_launch_time = actor_launch_end - actor_launch_start if args.fail: @@ -86,4 +88,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc index 3c5ecf450413c..fdc4891c79a54 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc @@ -310,6 +310,7 @@ TEST_F(ClusterResourceSchedulerTest, NodeAffinitySchedulingStrategyTest) { false, false, false, + std::string(), &violations, &is_infeasible); ASSERT_EQ(node_id_1, remote_node_id); @@ -322,6 +323,7 @@ TEST_F(ClusterResourceSchedulerTest, NodeAffinitySchedulingStrategyTest) { false, false, false, + std::string(), &violations, &is_infeasible); ASSERT_EQ(node_id_2, local_node_id); @@ -334,6 +336,7 @@ TEST_F(ClusterResourceSchedulerTest, NodeAffinitySchedulingStrategyTest) { false, false, false, + std::string(), &violations, &is_infeasible); ASSERT_TRUE(node_id_3.IsNil()); @@ -346,6 +349,7 @@ TEST_F(ClusterResourceSchedulerTest, NodeAffinitySchedulingStrategyTest) { false, false, false, + std::string(), &violations, &is_infeasible); ASSERT_EQ(node_id_4, local_node_id); @@ -371,6 +375,7 @@ TEST_F(ClusterResourceSchedulerTest, SpreadSchedulingStrategyTest) { false, false, false, + std::string(), &violations, &is_infeasible); absl::flat_hash_map resource_available({{"CPU", 9}}); @@ -381,6 +386,7 @@ TEST_F(ClusterResourceSchedulerTest, SpreadSchedulingStrategyTest) { false, false, false, + std::string(), &violations, &is_infeasible); ASSERT_EQ((std::set{node_id_1, node_id_2}), @@ -408,8 +414,13 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingUpdateAvailableResourcesTest) { bool is_infeasible; rpc::SchedulingStrategy scheduling_strategy; scheduling_strategy.mutable_default_scheduling_strategy(); - auto node_id = resource_scheduler.GetBestSchedulableNode( - resource_request, scheduling_strategy, false, false, &violations, &is_infeasible); + auto node_id = resource_scheduler.GetBestSchedulableNode(resource_request, + scheduling_strategy, + false, + false, + std::string(), + &violations, + &is_infeasible); ASSERT_EQ(node_id.ToInt(), 1); ASSERT_TRUE(violations == 0); @@ -527,8 +538,13 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingResourceRequestTest) { ResourceRequest resource_request = CreateResourceRequest({{ResourceID::CPU(), 11}}); int64_t violations; bool is_infeasible; - auto node_id = resource_scheduler.GetBestSchedulableNode( - resource_request, scheduling_strategy, false, false, &violations, &is_infeasible); + auto node_id = resource_scheduler.GetBestSchedulableNode(resource_request, + scheduling_strategy, + false, + false, + std::string(), + &violations, + &is_infeasible); ASSERT_TRUE(node_id.IsNil()); } @@ -537,8 +553,13 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingResourceRequestTest) { ResourceRequest resource_request = CreateResourceRequest({{ResourceID::CPU(), 5}}); int64_t violations; bool is_infeasible; - auto node_id = resource_scheduler.GetBestSchedulableNode( - resource_request, scheduling_strategy, false, false, &violations, &is_infeasible); + auto node_id = resource_scheduler.GetBestSchedulableNode(resource_request, + scheduling_strategy, + false, + false, + std::string(), + &violations, + &is_infeasible); ASSERT_TRUE(!node_id.IsNil()); ASSERT_TRUE(violations == 0); } @@ -548,8 +569,13 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingResourceRequestTest) { {{ResourceID::CPU(), 5}, {ResourceID::Memory(), 2}, {ResourceID("custom1"), 11}}); int64_t violations; bool is_infeasible; - auto node_id = resource_scheduler.GetBestSchedulableNode( - resource_request, scheduling_strategy, false, false, &violations, &is_infeasible); + auto node_id = resource_scheduler.GetBestSchedulableNode(resource_request, + scheduling_strategy, + false, + false, + std::string(), + &violations, + &is_infeasible); ASSERT_TRUE(node_id.IsNil()); } // Custom resources, no constraint violation. @@ -558,8 +584,13 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingResourceRequestTest) { {{ResourceID::CPU(), 5}, {ResourceID::Memory(), 2}, {ResourceID("custom1"), 5}}); int64_t violations; bool is_infeasible; - auto node_id = resource_scheduler.GetBestSchedulableNode( - resource_request, scheduling_strategy, false, false, &violations, &is_infeasible); + auto node_id = resource_scheduler.GetBestSchedulableNode(resource_request, + scheduling_strategy, + false, + false, + std::string(), + &violations, + &is_infeasible); ASSERT_TRUE(!node_id.IsNil()); ASSERT_TRUE(violations == 0); } @@ -571,8 +602,13 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingResourceRequestTest) { {ResourceID("custom100"), 5}}); int64_t violations; bool is_infeasible; - auto node_id = resource_scheduler.GetBestSchedulableNode( - resource_request, scheduling_strategy, false, false, &violations, &is_infeasible); + auto node_id = resource_scheduler.GetBestSchedulableNode(resource_request, + scheduling_strategy, + false, + false, + std::string(), + &violations, + &is_infeasible); ASSERT_TRUE(node_id.IsNil()); } // Placement hints, no constraint violation. @@ -581,8 +617,13 @@ TEST_F(ClusterResourceSchedulerTest, SchedulingResourceRequestTest) { {{ResourceID::CPU(), 5}, {ResourceID::Memory(), 2}, {ResourceID("custom1"), 5}}); int64_t violations; bool is_infeasible; - auto node_id = resource_scheduler.GetBestSchedulableNode( - resource_request, scheduling_strategy, false, false, &violations, &is_infeasible); + auto node_id = resource_scheduler.GetBestSchedulableNode(resource_request, + scheduling_strategy, + false, + false, + std::string(), + &violations, + &is_infeasible); ASSERT_TRUE(!node_id.IsNil()); ASSERT_TRUE(violations == 0); } @@ -888,6 +929,7 @@ TEST_F(ClusterResourceSchedulerTest, DeadNodeTest) { false, false, false, + std::string(), &violations, &is_infeasible)); EXPECT_CALL(*gcs_client_->mock_node_accessor, Get(node_id, ::testing::_)) @@ -899,6 +941,7 @@ TEST_F(ClusterResourceSchedulerTest, DeadNodeTest) { false, false, false, + std::string(), &violations, &is_infeasible) .IsNil()); @@ -1093,6 +1136,7 @@ TEST_F(ClusterResourceSchedulerTest, TestAlwaysSpillInfeasibleTask) { false, false, false, + std::string(), &total_violations, &is_infeasible) .IsNil()); @@ -1108,6 +1152,7 @@ TEST_F(ClusterResourceSchedulerTest, TestAlwaysSpillInfeasibleTask) { false, false, false, + std::string(), &total_violations, &is_infeasible)); @@ -1122,6 +1167,7 @@ TEST_F(ClusterResourceSchedulerTest, TestAlwaysSpillInfeasibleTask) { false, false, false, + std::string(), &total_violations, &is_infeasible)); } @@ -1339,19 +1385,29 @@ TEST_F(ClusterResourceSchedulerTest, DirtyLocalViewTest) { resource_scheduler.GetClusterResourceManager().AddOrUpdateNode( remote, {{"CPU", 2.}}, {{"CPU", num_slots_available}}); for (int j = 0; j < num_slots_available; j++) { - ASSERT_EQ( - remote, - resource_scheduler.GetBestSchedulableNode( - task_spec, scheduling_strategy, false, false, true, &t, &is_infeasible)); + ASSERT_EQ(remote, + resource_scheduler.GetBestSchedulableNode(task_spec, + scheduling_strategy, + false, + false, + true, + std::string(), + &t, + &is_infeasible)); // Allocate remote resources. ASSERT_TRUE(resource_scheduler.AllocateRemoteTaskResources(remote, task_spec)); } // Our local view says there are not enough resources on the remote node to // schedule another task. - ASSERT_EQ( - resource_scheduler.GetBestSchedulableNode( - task_spec, scheduling_strategy, false, false, true, &t, &is_infeasible), - scheduling::NodeID::Nil()); + ASSERT_EQ(resource_scheduler.GetBestSchedulableNode(task_spec, + scheduling_strategy, + false, + false, + true, + std::string(), + &t, + &is_infeasible), + scheduling::NodeID::Nil()); ASSERT_FALSE( resource_scheduler.GetLocalResourceManager().AllocateLocalTaskResources( task_spec, task_allocation)); @@ -1371,32 +1427,62 @@ TEST_F(ClusterResourceSchedulerTest, DynamicResourceTest) { rpc::SchedulingStrategy scheduling_strategy; scheduling_strategy.mutable_default_scheduling_strategy(); - auto result = resource_scheduler.GetBestSchedulableNode( - resource_request, scheduling_strategy, false, false, false, &t, &is_infeasible); + auto result = resource_scheduler.GetBestSchedulableNode(resource_request, + scheduling_strategy, + false, + false, + false, + std::string(), + &t, + &is_infeasible); ASSERT_TRUE(result.IsNil()); resource_scheduler.GetLocalResourceManager().AddLocalResourceInstances( scheduling::ResourceID("custom123"), {0., 1.0, 1.0}); - result = resource_scheduler.GetBestSchedulableNode( - resource_request, scheduling_strategy, false, false, false, &t, &is_infeasible); + result = resource_scheduler.GetBestSchedulableNode(resource_request, + scheduling_strategy, + false, + false, + false, + std::string(), + &t, + &is_infeasible); ASSERT_FALSE(result.IsNil()) << resource_scheduler.DebugString(); resource_request["custom123"] = 3; - result = resource_scheduler.GetBestSchedulableNode( - resource_request, scheduling_strategy, false, false, false, &t, &is_infeasible); + result = resource_scheduler.GetBestSchedulableNode(resource_request, + scheduling_strategy, + false, + false, + false, + std::string(), + &t, + &is_infeasible); ASSERT_TRUE(result.IsNil()); resource_scheduler.GetLocalResourceManager().AddLocalResourceInstances( scheduling::ResourceID("custom123"), {1.0}); - result = resource_scheduler.GetBestSchedulableNode( - resource_request, scheduling_strategy, false, false, false, &t, &is_infeasible); + result = resource_scheduler.GetBestSchedulableNode(resource_request, + scheduling_strategy, + false, + false, + false, + std::string(), + &t, + &is_infeasible); ASSERT_FALSE(result.IsNil()); resource_scheduler.GetLocalResourceManager().DeleteLocalResource( scheduling::ResourceID("custom123")); - result = resource_scheduler.GetBestSchedulableNode( - resource_request, scheduling_strategy, false, false, false, &t, &is_infeasible); + result = resource_scheduler.GetBestSchedulableNode(resource_request, + scheduling_strategy, + false, + false, + false, + std::string(), + &t, + &is_infeasible); ASSERT_TRUE(result.IsNil()); } @@ -1436,6 +1522,7 @@ TEST_F(ClusterResourceSchedulerTest, TestForceSpillback) { false, false, /*force_spillback=*/false, + std::string(), &total_violations, &is_infeasible), scheduling::NodeID("local")); @@ -1446,6 +1533,7 @@ TEST_F(ClusterResourceSchedulerTest, TestForceSpillback) { false, false, /*force_spillback=*/true, + std::string(), &total_violations, &is_infeasible), scheduling::NodeID::Nil()); @@ -1457,6 +1545,7 @@ TEST_F(ClusterResourceSchedulerTest, TestForceSpillback) { false, false, /*force_spillback=*/true, + std::string(), &total_violations, &is_infeasible), scheduling::NodeID::Nil()); @@ -1467,6 +1556,7 @@ TEST_F(ClusterResourceSchedulerTest, TestForceSpillback) { false, false, /*force_spillback=*/true, + std::string(), &total_violations, &is_infeasible), node_ids[51]); @@ -1574,6 +1664,7 @@ TEST_F(ClusterResourceSchedulerTest, AffinityWithBundleScheduleTest) { scheduling_strategy, true, false, + std::string(), &violations, &is_infeasible), except_node_id); From 1bf9eb186365fcc9cf57c07bc096ad8ec29f64b6 Mon Sep 17 00:00:00 2001 From: Chong-Li Date: Thu, 1 Dec 2022 00:32:09 +0800 Subject: [PATCH 3/7] Fix Signed-off-by: Chong-Li --- src/ray/gcs/gcs_server/gcs_actor_scheduler.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc index 2e4b2efd541c6..9e174007aeb68 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc @@ -94,7 +94,9 @@ void GcsActorScheduler::ScheduleByGcs(std::shared_ptr actor) { }; // Queue and schedule the actor locally (gcs). - RayTask task(actor->GetCreationTaskSpecification(), actor->GetOwnerNodeID().Binary()); + auto owner_node = gcs_node_manager_.GetAliveNode(actor->GetOwnerNodeID()); + RayTask task(actor->GetCreationTaskSpecification(), + owner_node.has_value() ? actor->GetOwnerNodeID().Binary() : std::string()); cluster_task_manager_->QueueAndScheduleTask(task, /*grant_or_reject*/ false, /*is_selected_based_on_locality*/ false, From abebc823a735af57d6ec37e4f3fdf76eedfdd037 Mon Sep 17 00:00:00 2001 From: Chong-Li Date: Wed, 14 Dec 2022 11:46:30 +0800 Subject: [PATCH 4/7] Fix Signed-off-by: Chong-Li --- python/ray/_private/ray_constants.py | 2 +- src/ray/common/ray_config_def.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/ray/_private/ray_constants.py b/python/ray/_private/ray_constants.py index 33dab470a1a6c..383a2900bc8d3 100644 --- a/python/ray/_private/ray_constants.py +++ b/python/ray/_private/ray_constants.py @@ -359,7 +359,7 @@ def env_bool(key, default): def gcs_actor_scheduling_enabled(): - return os.environ.get("RAY_gcs_actor_scheduling_enabled") != "false" + return os.environ.get("RAY_gcs_actor_scheduling_enabled") == "true" DEFAULT_RESOURCES = {"CPU", "GPU", "memory", "object_store_memory"} diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index a0ef7396f346f..21d59e914a888 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -602,7 +602,7 @@ RAY_CONFIG(uint64_t, subscriber_timeout_ms, 300 * 1000) RAY_CONFIG(uint64_t, gcs_actor_table_min_duration_ms, /* 5 min */ 60 * 1000 * 5) /// Whether to enable GCS-based actor scheduling. -RAY_CONFIG(bool, gcs_actor_scheduling_enabled, true); +RAY_CONFIG(bool, gcs_actor_scheduling_enabled, false); RAY_CONFIG(uint32_t, max_error_msg_size_bytes, 512 * 1024) From c2308af8fafdd8dc36b539029372b074c507de1e Mon Sep 17 00:00:00 2001 From: Chong-Li Date: Tue, 3 Jan 2023 17:47:02 +0800 Subject: [PATCH 5/7] Merge preferred_node_id and prioritize_local_node Signed-off-by: Chong-Li --- src/ray/raylet/local_task_manager.cc | 6 ++---- src/ray/raylet/scheduling/cluster_resource_scheduler.cc | 7 +++---- src/ray/raylet/scheduling/cluster_resource_scheduler.h | 6 ++---- src/ray/raylet/scheduling/cluster_task_manager.cc | 8 ++++---- 4 files changed, 11 insertions(+), 16 deletions(-) diff --git a/src/ray/raylet/local_task_manager.cc b/src/ray/raylet/local_task_manager.cc index 8efd6b36d3c88..8ba81b26a94a7 100644 --- a/src/ray/raylet/local_task_manager.cc +++ b/src/ray/raylet/local_task_manager.cc @@ -334,10 +334,9 @@ void LocalTaskManager::SpillWaitingTasks() { if (!task.GetTaskSpecification().IsSpreadSchedulingStrategy()) { scheduling_node_id = cluster_resource_scheduler_->GetBestSchedulableNode( task.GetTaskSpecification(), - /*prioritize_local_node*/ true, + /*preferred_node_id*/ self_node_id_.Binary(), /*exclude_local_node*/ task_dependencies_blocked, /*requires_object_store_memory*/ true, - /*preferred_node_id*/ task.GetPreferredNodeID(), &is_infeasible); } else { // If scheduling strategy is spread, we prefer honoring spread decision @@ -380,10 +379,9 @@ bool LocalTaskManager::TrySpillback(const std::shared_ptr &work, // We should prefer to stay local if possible // to avoid unnecessary spillback // since this node is already selected by the cluster scheduler. - /*prioritize_local_node*/ true, + /*preferred_node_id*/ self_node_id_.Binary(), /*exclude_local_node*/ false, /*requires_object_store_memory*/ false, - /*preferred_node_id*/ work->task.GetPreferredNodeID(), &is_infeasible); if (is_infeasible || scheduling_node_id.IsNil() || diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc index efe3e89b1ed36..35afa17f94b13 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc @@ -248,14 +248,13 @@ bool ClusterResourceScheduler::IsSchedulableOnNode( scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode( const TaskSpecification &task_spec, - bool prioritize_local_node, + const std::string &preferred_node_id, bool exclude_local_node, bool requires_object_store_memory, - const std::string &preferred_node_id, bool *is_infeasible) { // If the local node is available, we should directly return it instead of // going through the full hybrid policy since we don't want spillback. - if (prioritize_local_node && !exclude_local_node && + if (preferred_node_id == local_node_id_.Binary() && !exclude_local_node && IsSchedulableOnNode(local_node_id_, task_spec.GetRequiredResources().GetResourceMap(), requires_object_store_memory)) { @@ -282,7 +281,7 @@ scheduling::NodeID ClusterResourceScheduler::GetBestSchedulableNode( requires_object_store_memory)) { // Prefer waiting on the local node since the local node is chosen for a reason (e.g. // spread). - if (prioritize_local_node) { + if (preferred_node_id == local_node_id_.Binary()) { *is_infeasible = false; return local_node_id_; } diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.h b/src/ray/raylet/scheduling/cluster_resource_scheduler.h index da6d43138efdf..b2f690dc86bcb 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.h +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.h @@ -80,22 +80,20 @@ class ClusterResourceScheduler { /// In hybrid mode, see `scheduling_policy.h` for a description of the policy. /// /// \param task_spec: Task/Actor to be scheduled. - /// \param prioritize_local_node: true if we want to try out local node first. + /// \param preferred_node_id: the node where the task is preferred to be placed. /// \param exclude_local_node: true if we want to avoid local node. This will cancel /// prioritize_local_node if set to true. /// \param requires_object_store_memory: take object store memory usage as part of /// scheduling decision. - /// \param preferred_node_id: the node where the task is preferred to be placed. /// \param is_infeasible[out]: It is set /// true if the task is not schedulable because it is infeasible. /// /// \return emptry string, if no node can schedule the current request; otherwise, /// return the string name of a node that can schedule the resource request. scheduling::NodeID GetBestSchedulableNode(const TaskSpecification &task_spec, - bool prioritize_local_node, + const std::string &preferred_node_id, bool exclude_local_node, bool requires_object_store_memory, - const std::string &preferred_node_id, bool *is_infeasible); /// Subtract the resources required by a given resource request (resource_request) from diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index a5178daa715ad..1d87aaaca0e88 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -97,10 +97,10 @@ void ClusterTaskManager::ScheduleAndDispatchTasks() { << task.GetTaskSpecification().TaskId(); auto scheduling_node_id = cluster_resource_scheduler_->GetBestSchedulableNode( task.GetTaskSpecification(), - work->PrioritizeLocalNode(), + /*preferred_node_id*/ work->PrioritizeLocalNode() ? self_node_id_.Binary() + : task.GetPreferredNodeID(), /*exclude_local_node*/ false, /*requires_object_store_memory*/ false, - /*preferred_node_id*/ task.GetPreferredNodeID(), &is_infeasible); // There is no node that has available resources to run the request. @@ -192,10 +192,10 @@ void ClusterTaskManager::TryScheduleInfeasibleTask() { bool is_infeasible; cluster_resource_scheduler_->GetBestSchedulableNode( task.GetTaskSpecification(), - work->PrioritizeLocalNode(), + /*preferred_node_id*/ work->PrioritizeLocalNode() ? self_node_id_.Binary() + : task.GetPreferredNodeID(), /*exclude_local_node*/ false, /*requires_object_store_memory*/ false, - /*preferred_node_id*/ task.GetPreferredNodeID(), &is_infeasible); // There is no node that has available resources to run the request. From 8dc68ba67b320385956e12f94886e3a91040a0c0 Mon Sep 17 00:00:00 2001 From: Chong-Li Date: Mon, 16 Jan 2023 18:37:57 +0800 Subject: [PATCH 6/7] Fix Signed-off-by: Chong-Li --- release/release_tests.yaml | 42 +++++++++---------- src/ray/common/task/task.h | 3 +- src/ray/gcs/gcs_server/gcs_actor_scheduler.cc | 2 +- .../scheduling/cluster_resource_manager.h | 1 + .../scheduling/cluster_resource_scheduler.h | 1 + .../cluster_resource_scheduler_test.cc | 39 +++++++++++++++++ .../policy/hybrid_scheduling_policy.cc | 2 +- 7 files changed, 66 insertions(+), 24 deletions(-) diff --git a/release/release_tests.yaml b/release/release_tests.yaml index b4f3564c6728e..4c8272614baf5 100644 --- a/release/release_tests.yaml +++ b/release/release_tests.yaml @@ -4087,27 +4087,27 @@ type: sdk_command file_manager: sdk -- name: many_nodes_multi_master_test - group: core-daily-test - working_dir: nightly_tests - legacy: - test_name: many_nodes_multi_master_test - test_suite: nightly_tests - - frequency: nightly-3x - team: core - cluster: - cluster_env: many_nodes_tests/app_config.yaml - cluster_compute: many_nodes_tests/compute_config.yaml - - run: - timeout: 7200 - script: python many_nodes_tests/multi_master_test.py - wait_for_nodes: - num_nodes: 251 - - type: sdk_command - file_manager: sdk +#- name: many_nodes_multi_master_test +# group: core-daily-test +# working_dir: nightly_tests +# legacy: +# test_name: many_nodes_multi_master_test +# test_suite: nightly_tests +# +# frequency: nightly-3x +# team: core +# cluster: +# cluster_env: many_nodes_tests/app_config.yaml +# cluster_compute: many_nodes_tests/compute_config.yaml +# +# run: +# timeout: 7200 +# script: python many_nodes_tests/multi_master_test.py +# wait_for_nodes: +# num_nodes: 251 +# +# type: sdk_command +# file_manager: sdk - name: pg_autoscaling_regression_test group: core-daily-test diff --git a/src/ray/common/task/task.h b/src/ray/common/task/task.h index fce9cde177572..5a4a9e323de53 100644 --- a/src/ray/common/task/task.h +++ b/src/ray/common/task/task.h @@ -56,7 +56,8 @@ class RayTask { /// \return The object dependencies. const std::vector &GetDependencies() const; - /// Get the task's preferred node id for scheduling. + /// Get the task's preferred node id for scheduling. If the returned value + /// is empty, then it means the task has no preferred node. /// /// \return The preferred node id. const std::string &GetPreferredNodeID() const; diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc index 9e174007aeb68..0d08b337e8d54 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc @@ -94,7 +94,7 @@ void GcsActorScheduler::ScheduleByGcs(std::shared_ptr actor) { }; // Queue and schedule the actor locally (gcs). - auto owner_node = gcs_node_manager_.GetAliveNode(actor->GetOwnerNodeID()); + const auto &owner_node = gcs_node_manager_.GetAliveNode(actor->GetOwnerNodeID()); RayTask task(actor->GetCreationTaskSpecification(), owner_node.has_value() ? actor->GetOwnerNodeID().Binary() : std::string()); cluster_task_manager_->QueueAndScheduleTask(task, diff --git a/src/ray/raylet/scheduling/cluster_resource_manager.h b/src/ray/raylet/scheduling/cluster_resource_manager.h index c41b9b31a6a91..1db8fcf0d3256 100644 --- a/src/ray/raylet/scheduling/cluster_resource_manager.h +++ b/src/ray/raylet/scheduling/cluster_resource_manager.h @@ -168,6 +168,7 @@ class ClusterResourceManager { FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingAddOrUpdateNodeTest); FRIEND_TEST(ClusterResourceSchedulerTest, NodeAffinitySchedulingStrategyTest); FRIEND_TEST(ClusterResourceSchedulerTest, SpreadSchedulingStrategyTest); + FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingWithPreferredNodeTest); FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingResourceRequestTest); FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingUpdateTotalResourcesTest); FRIEND_TEST(ClusterResourceSchedulerTest, diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.h b/src/ray/raylet/scheduling/cluster_resource_scheduler.h index b2f690dc86bcb..bffc1639a481e 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.h +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.h @@ -220,6 +220,7 @@ class ClusterResourceScheduler { FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingAddOrUpdateNodeTest); FRIEND_TEST(ClusterResourceSchedulerTest, NodeAffinitySchedulingStrategyTest); FRIEND_TEST(ClusterResourceSchedulerTest, SpreadSchedulingStrategyTest); + FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingWithPreferredNodeTest); FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingResourceRequestTest); FRIEND_TEST(ClusterResourceSchedulerTest, SchedulingUpdateTotalResourcesTest); FRIEND_TEST(ClusterResourceSchedulerTest, diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc index fdc4891c79a54..f1e2c49dd54fb 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler_test.cc @@ -393,6 +393,45 @@ TEST_F(ClusterResourceSchedulerTest, SpreadSchedulingStrategyTest) { (std::set{local_node_id, remote_node_id})); } +TEST_F(ClusterResourceSchedulerTest, SchedulingWithPreferredNodeTest) { + absl::flat_hash_map resource_total({{"CPU", 10}}); + auto local_node_id = scheduling::NodeID(NodeID::FromRandom().Binary()); + ClusterResourceScheduler resource_scheduler( + local_node_id, resource_total, is_node_available_fn_); + AssertPredefinedNodeResources(); + auto remote_node_id = scheduling::NodeID(NodeID::FromRandom().Binary()); + resource_scheduler.GetClusterResourceManager().AddOrUpdateNode( + remote_node_id, resource_total, resource_total); + + absl::flat_hash_map resource_request({{"CPU", 5}}); + int64_t violations; + bool is_infeasible; + rpc::SchedulingStrategy scheduling_strategy; + scheduling_strategy.mutable_default_scheduling_strategy(); + // Select node with the remote node preferred. + auto node_id_1 = resource_scheduler.GetBestSchedulableNode(resource_request, + scheduling_strategy, + false, + false, + false, + remote_node_id.Binary(), + &violations, + &is_infeasible); + + // If no preferred node specified, then still prefer the local one. + auto node_id_2 = resource_scheduler.GetBestSchedulableNode(resource_request, + scheduling_strategy, + false, + false, + false, + std::string(), + &violations, + &is_infeasible); + + ASSERT_EQ((std::set{node_id_1, node_id_2}), + (std::set{remote_node_id, local_node_id})); +} + TEST_F(ClusterResourceSchedulerTest, SchedulingUpdateAvailableResourcesTest) { // Create cluster resources. NodeResources node_resources = CreateNodeResources({{ResourceID::CPU(), 10}, diff --git a/src/ray/raylet/scheduling/policy/hybrid_scheduling_policy.cc b/src/ray/raylet/scheduling/policy/hybrid_scheduling_policy.cc index 6742cd78dddda..0b83a52b242c5 100644 --- a/src/ray/raylet/scheduling/policy/hybrid_scheduling_policy.cc +++ b/src/ray/raylet/scheduling/policy/hybrid_scheduling_policy.cc @@ -38,7 +38,7 @@ scheduling::NodeID HybridSchedulingPolicy::HybridPolicyWithFilter( round.reserve(nodes_.size()); auto preferred_node_id = preferred_node.empty() ? local_node_id_ : scheduling::NodeID(preferred_node); - const auto preferred_it = nodes_.find(preferred_node_id); + auto preferred_it = nodes_.find(preferred_node_id); RAY_CHECK(preferred_it != nodes_.end()); auto predicate = [this, node_filter](scheduling::NodeID node_id, const NodeResources &node_resources) { From 8b420028714320e52a335aaf31084a8a2ea3ba9b Mon Sep 17 00:00:00 2001 From: Chong-Li Date: Thu, 19 Jan 2023 18:17:31 +0800 Subject: [PATCH 7/7] More comments about empty string Signed-off-by: Chong-Li --- src/ray/raylet/scheduling/cluster_resource_scheduler.h | 3 ++- src/ray/raylet/scheduling/policy/scheduling_options.h | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.h b/src/ray/raylet/scheduling/cluster_resource_scheduler.h index bffc1639a481e..3f62762696ee5 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.h +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.h @@ -80,7 +80,8 @@ class ClusterResourceScheduler { /// In hybrid mode, see `scheduling_policy.h` for a description of the policy. /// /// \param task_spec: Task/Actor to be scheduled. - /// \param preferred_node_id: the node where the task is preferred to be placed. + /// \param preferred_node_id: the node where the task is preferred to be placed. An + /// empty `preferred_node_id` (string) means no preferred node. /// \param exclude_local_node: true if we want to avoid local node. This will cancel /// prioritize_local_node if set to true. /// \param requires_object_store_memory: take object store memory usage as part of diff --git a/src/ray/raylet/scheduling/policy/scheduling_options.h b/src/ray/raylet/scheduling/policy/scheduling_options.h index 3709f0265f79f..cd624f9b610c8 100644 --- a/src/ray/raylet/scheduling/policy/scheduling_options.h +++ b/src/ray/raylet/scheduling/policy/scheduling_options.h @@ -157,6 +157,8 @@ struct SchedulingOptions { std::shared_ptr scheduling_context; std::string node_affinity_node_id; bool node_affinity_soft = false; + // The node where the task is preferred to be placed. By default, this node id + // is empty, which means no preferred node. std::string preferred_node_id; private: