From 63c15d7ced42c96c89664d3535f68d84b2adcfce Mon Sep 17 00:00:00 2001 From: SongGuyang Date: Wed, 11 Aug 2021 08:03:17 +0800 Subject: [PATCH] [core] make 'PopWorker' to be an async function (#17202) * make 'PopWorker' to be an async function * pop worker async works * fix * address comments * bugfix * fix cluster_task_manager_test * fix * bugfix of detached actor * address comments * fix * address comments * fix aioredis * Revert "fix aioredis" This reverts commit 041b983eac95b105ab0e853e84c4cf2647008431. * bug fix * fix * fix test_step_resources test * format * add unit test * fix * add test case PopWorkerStatus * address commit * fix lint * address comments * add python test * address comments * make an independent function * Update test_basic_3.py Co-authored-by: Hao Chen --- .../workflow/tests/test_basic_workflows_2.py | 5 + python/ray/tests/test_basic_3.py | 60 ++ src/ray/gcs/gcs_server/gcs_actor_scheduler.cc | 1 + src/ray/raylet/node_manager.cc | 15 +- .../raylet/scheduling/cluster_task_manager.cc | 249 +++++-- .../raylet/scheduling/cluster_task_manager.h | 62 +- .../scheduling/cluster_task_manager_test.cc | 249 +++++-- src/ray/raylet/test/util.h | 4 +- src/ray/raylet/worker.h | 8 +- src/ray/raylet/worker_pool.cc | 295 ++++---- src/ray/raylet/worker_pool.h | 121 +++- src/ray/raylet/worker_pool_test.cc | 656 +++++++++++------- 12 files changed, 1101 insertions(+), 624 deletions(-) diff --git a/python/ray/experimental/workflow/tests/test_basic_workflows_2.py b/python/ray/experimental/workflow/tests/test_basic_workflows_2.py index 434e58ccf3a0b..eb565b2fc2191 100644 --- a/python/ray/experimental/workflow/tests/test_basic_workflows_2.py +++ b/python/ray/experimental/workflow/tests/test_basic_workflows_2.py @@ -32,9 +32,13 @@ def test_init_twice_2(call_ray_start, reset_workflow, tmp_path): }], indirect=True) def test_step_resources(workflow_start_regular, tmp_path): lock_path = str(tmp_path / "lock") + # We use signal actor here because we can't guarantee the order of tasks + # sent from worker to raylet. + signal_actor = ray.test_utils.SignalActor.remote() @workflow.step def step_run(): + ray.wait([signal_actor.send.remote()]) with FileLock(lock_path): return None @@ -45,6 +49,7 @@ def remote_run(): lock = FileLock(lock_path) lock.acquire() ret = step_run.options(num_cpus=2).step().run_async() + ray.wait([signal_actor.wait.remote()]) obj = remote_run.remote() with pytest.raises(ray.exceptions.GetTimeoutError): ray.get(obj, timeout=2) diff --git a/python/ray/tests/test_basic_3.py b/python/ray/tests/test_basic_3.py index 18db91e4d0a6e..6fd3ce074aaa9 100644 --- a/python/ray/tests/test_basic_3.py +++ b/python/ray/tests/test_basic_3.py @@ -11,7 +11,9 @@ from ray.test_utils import ( dicts_equal, wait_for_pid_to_exit, + wait_for_condition, ) +from pathlib import Path import ray @@ -226,5 +228,63 @@ def get(self): ray.get([a.get.remote()]) +@pytest.mark.parametrize( + "ray_start_cluster", [{ + "_system_config": { + "event_stats_print_interval_ms": 100, + "debug_dump_period_milliseconds": 100, + "event_stats": True + } + }], + indirect=True) +def test_worker_startup_count(ray_start_cluster): + """Test that no extra workers started while no available cpu resources + in cluster.""" + + cluster = ray_start_cluster + # Cluster total cpu resources is 4. + cluster.add_node(num_cpus=4, ) + ray.init(address=cluster.address) + + # A slow function never returns. It will hold cpu resources all the way. + @ray.remote + def slow_function(): + while True: + time.sleep(1000) + + # Flood a large scale lease worker requests. + for i in range(10000): + # Use random cpu resources to make sure that all tasks are sent + # to the raylet. Because core worker will cache tasks with the + # same resource shape. + num_cpus = 0.24 + np.random.uniform(0, 0.01) + slow_function.options(num_cpus=num_cpus).remote() + + # Check "debug_state.txt" to ensure no extra workers were started. + session_dir = ray.worker.global_worker.node.address_info["session_dir"] + session_path = Path(session_dir) + debug_state_path = session_path / "debug_state.txt" + + def get_num_workers(): + with open(debug_state_path) as f: + for line in f.readlines(): + num_workers_prefix = "- num PYTHON workers: " + if num_workers_prefix in line: + return int(line[len(num_workers_prefix):]) + return None + + # Wait for "debug_state.txt" to be updated to reflect the started worker. + start = time.time() + wait_for_condition(lambda: get_num_workers() == 16) + time_waited = time.time() - start + print(f"Waited {time_waited} for debug_state.txt to be updated") + + # Check that no more workers started for a while. + for i in range(100): + num = get_num_workers() + assert num == 16 + time.sleep(0.1) + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc index 95b0986b64eae..13d68735a70fd 100644 --- a/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc +++ b/src/ray/gcs/gcs_server/gcs_actor_scheduler.cc @@ -130,6 +130,7 @@ void GcsActorScheduler::CancelOnLeasing(const NodeID &node_id, const ActorID &ac const TaskID &task_id) { // NOTE: This method will cancel the outstanding lease request and remove leasing // information from the internal state. + RAY_LOG(DEBUG) << "Canceling worker leasing of task " << task_id; auto node_it = node_to_actors_when_leasing_.find(node_id); RAY_CHECK(node_it != node_to_actors_when_leasing_.end()); node_it->second.erase(actor_id); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 2fde9ffe67de4..f75d989195747 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -183,11 +183,6 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self config.worker_commands, /*starting_worker_timeout_callback=*/ [this] { cluster_task_manager_->ScheduleAndDispatchTasks(); }, - /*runtime_env_setup_failed_callback=*/ - [this](const TaskID &task_id) { - RAY_CHECK(cluster_task_manager_->CancelTask( - task_id, /*runtime_env_setup_failed=*/true)); - }, config.ray_debugger_external, /*get_time=*/[]() { return absl::GetCurrentTimeNanos() / 1e6; }), client_call_manager_(io_service), @@ -299,11 +294,6 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self auto get_node_info_func = [this](const NodeID &node_id) { return gcs_client_->Nodes().Get(node_id); }; - auto is_owner_alive = [this](const WorkerID &owner_worker_id, - const NodeID &owner_node_id) { - return !(failed_workers_cache_.count(owner_worker_id) > 0 || - failed_nodes_cache_.count(owner_node_id) > 0); - }; auto announce_infeasible_task = [this](const RayTask &task) { PublishInfeasibleTaskError(task); }; @@ -320,6 +310,11 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self "return values are greater than the remaining capacity."; max_task_args_memory = 0; } + auto is_owner_alive = [this](const WorkerID &owner_worker_id, + const NodeID &owner_node_id) { + return !(failed_workers_cache_.count(owner_worker_id) > 0 || + failed_nodes_cache_.count(owner_node_id) > 0); + }; cluster_task_manager_ = std::shared_ptr(new ClusterTaskManager( self_node_id_, std::dynamic_pointer_cast(cluster_resource_scheduler_), diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index d538d118d1a0f..4f3fca29b634e 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -16,7 +16,6 @@ #include -#include #include #include "ray/stats/stats.h" @@ -72,8 +71,8 @@ bool ClusterTaskManager::SchedulePendingTasks() { // blocking where a task which cannot be scheduled because // there are not enough available resources blocks other // tasks from being scheduled. - const Work &work = *work_it; - RayTask task = std::get<0>(work); + const std::shared_ptr &work = *work_it; + RayTask task = work->task; RAY_LOG(DEBUG) << "Scheduling pending task " << task.GetTaskSpecification().TaskId(); auto placement_resources = @@ -113,7 +112,7 @@ bool ClusterTaskManager::SchedulePendingTasks() { // Only announce the first item as infeasible. auto &work_queue = shapes_it->second; const auto &work = work_queue[0]; - const RayTask task = std::get<0>(work); + const RayTask task = work->task; announce_infeasible_task_(task); // TODO(sang): Use a shared pointer deque to reduce copy overhead. @@ -128,8 +127,8 @@ bool ClusterTaskManager::SchedulePendingTasks() { return did_schedule; } -bool ClusterTaskManager::WaitForTaskArgsRequests(Work work) { - const auto &task = std::get<0>(work); +bool ClusterTaskManager::WaitForTaskArgsRequests(std::shared_ptr work) { + const auto &task = work->task; const auto &task_id = task.GetTaskSpecification().TaskId(); const auto &scheduling_key = task.GetTaskSpecification().GetSchedulingClass(); auto object_ids = task.GetTaskSpecification().GetDependencies(); @@ -155,16 +154,116 @@ bool ClusterTaskManager::WaitForTaskArgsRequests(Work work) { return can_dispatch; } +bool ClusterTaskManager::PoppedWorkerHandler( + const std::shared_ptr worker, PopWorkerStatus status, + const TaskID &task_id, SchedulingClass scheduling_class, + const std::shared_ptr &work, bool is_detached_actor, + const rpc::Address &owner_address) { + const auto &reply = work->reply; + const auto &callback = work->callback; + bool canceled = work->status == WorkStatus::CANCELLED; + const auto &task = work->task; + const auto &spec = task.GetTaskSpecification(); + bool dispatched = false; + + // Check whether owner worker or owner node dead. + bool not_detached_with_owner_failed = false; + const auto owner_worker_id = WorkerID::FromBinary(owner_address.worker_id()); + const auto owner_node_id = NodeID::FromBinary(owner_address.raylet_id()); + if (!is_detached_actor && !is_owner_alive_(owner_worker_id, owner_node_id)) { + not_detached_with_owner_failed = true; + } + + auto erase_from_dispatch_queue_fn = [this](const std::shared_ptr &work, + const SchedulingClass &scheduling_class) { + auto shapes_it = tasks_to_dispatch_.find(scheduling_class); + RAY_CHECK(shapes_it != tasks_to_dispatch_.end()); + auto &dispatch_queue = shapes_it->second; + bool erased = false; + for (auto work_it = dispatch_queue.begin(); work_it != dispatch_queue.end(); + work_it++) { + if (*work_it == work) { + dispatch_queue.erase(work_it); + erased = true; + break; + } + } + if (dispatch_queue.empty()) { + tasks_to_dispatch_.erase(shapes_it); + } + RAY_CHECK(erased); + }; + + if (canceled) { + // Task has been canceled. + RAY_LOG(DEBUG) << "Task " << task_id << " has been canceled when worker popped"; + // All the cleaning work has been done when canceled task. Just return + // false without doing anything. + return false; + } + + if (!worker || not_detached_with_owner_failed) { + // There are two cases that will not dispatch the task at this time: + // Case 1: Empty worker popped. + // Case 2: The task owner failed (not alive), except the creation task of + // detached actor. + // In that two case, we should also release worker resources, release task + // args. + + dispatched = false; + // We've already acquired resources so we need to release them. + cluster_resource_scheduler_->ReleaseWorkerResources(work->allocated_instances); + work->allocated_instances = nullptr; + // Release pinned task args. + ReleaseTaskArgs(task_id); + + if (!worker) { + // Empty worker popped. + RAY_LOG(DEBUG) << "This node has available resources, but no worker processes " + "to grant the lease " + << task_id; + if (status == PopWorkerStatus::RuntimeEnvCreationFailed) { + // In case of runtime env creation failed, we cancel this task + // directly and raise a `RuntimeEnvSetupError` exception to user + // eventually. The task will be removed from dispatch queue in + // `CancelTask`. + CancelTask(task_id, true); + } else { + // In other cases, set the work status `WAITING` to make this task + // could be re-dispatched. + work->status = WorkStatus::WAITING; + // Return here because we shouldn't remove task dependencies. + return dispatched; + } + } else if (not_detached_with_owner_failed) { + // The task owner failed. + // Just remove the task from dispatch queue. + RAY_LOG(DEBUG) << "Call back to an owner failed task, task id = " << task_id; + erase_from_dispatch_queue_fn(work, scheduling_class); + } + + } else { + // A worker has successfully popped for a valid task. Dispatch the task to + // the worker. + RAY_LOG(DEBUG) << "Dispatching task " << task_id << " to worker " + << worker->WorkerId(); + + Dispatch(worker, leased_workers_, work->allocated_instances, task, reply, callback); + erase_from_dispatch_queue_fn(work, scheduling_class); + dispatched = true; + } + + // Remove task dependencies. + if (!spec.GetDependencies().empty()) { + task_dependency_manager_.RemoveTaskDependencies(task.GetTaskSpecification().TaskId()); + } + + return dispatched; +} + void ClusterTaskManager::DispatchScheduledTasksToWorkers( WorkerPoolInterface &worker_pool, std::unordered_map> &leased_workers) { - using job_id_runtime_env_hash_pair = std::pair; - // TODO(simon): blocked_runtime_env_to_skip is added as a hack to make sure tasks - // requiring different runtime env doesn't block each other. We need to find a - // long term solution for this, see #17154. - std::unordered_set> - blocked_runtime_env_to_skip; // Check every task in task_to_dispatch queue to see // whether it can be dispatched and ran. This avoids head-of-line // blocking where a task which cannot be dispatched because @@ -172,19 +271,15 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers( // tasks from being dispatched. for (auto shapes_it = tasks_to_dispatch_.begin(); shapes_it != tasks_to_dispatch_.end();) { + auto &scheduling_class = shapes_it->first; auto &dispatch_queue = shapes_it->second; bool is_infeasible = false; for (auto work_it = dispatch_queue.begin(); work_it != dispatch_queue.end();) { auto &work = *work_it; - const auto &task = std::get<0>(work); - const auto &spec = task.GetTaskSpecification(); + const auto &task = work->task; + const auto spec = task.GetTaskSpecification(); TaskID task_id = spec.TaskId(); - const auto runtime_env_worker_key = - std::make_pair(spec.JobId().Hash(), spec.GetRuntimeEnvHash()); - - // Current task and runtime env combination doesn't have an available worker, - // therefore skipping the task. - if (blocked_runtime_env_to_skip.count(runtime_env_worker_key) > 0) { + if (work->status == WorkStatus::WAITING_FOR_WORKER) { work_it++; continue; } @@ -253,6 +348,11 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers( // scheduler will make the same decision. break; } + if (!spec.GetDependencies().empty()) { + task_dependency_manager_.RemoveTaskDependencies( + task.GetTaskSpecification().TaskId()); + } + work_it = dispatch_queue.erase(work_it); } else { // The local node has the available resources to run the task, so we should run // it. @@ -262,38 +362,21 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers( cluster_resource_scheduler_->SerializedTaskResourceInstances( allocated_instances); } - std::shared_ptr worker = - worker_pool_.PopWorker(spec, allocated_instances_serialized_json); - if (!worker) { - RAY_LOG(DEBUG) << "This node has available resources, but no worker processes " - "to grant the lease."; - // We've already acquired resources so we need to release them to avoid - // double-acquiring when the next invocation of this function tries to schedule - // this task. - cluster_resource_scheduler_->ReleaseWorkerResources(allocated_instances); - ReleaseTaskArgs(task_id); - // It may be that no worker was available with the correct runtime env or - // correct job ID. However, another task with a different env or job ID - // might have a worker available, so continue iterating through the queue. - work_it++; - // Keep track of runtime env that doesn't have workers available so we - // won't call PopWorker for subsequent tasks requiring the same runtime env. - blocked_runtime_env_to_skip.insert(runtime_env_worker_key); - continue; - } - - RAY_LOG(DEBUG) << "Dispatching task " << task_id << " to worker " - << worker->WorkerId(); - auto reply = std::get<1>(*work_it); - auto callback = std::get<2>(*work_it); - Dispatch(worker, leased_workers_, allocated_instances, task, reply, callback); - } - - if (!spec.GetDependencies().empty()) { - task_dependency_manager_.RemoveTaskDependencies( - task.GetTaskSpecification().TaskId()); + work->allocated_instances = allocated_instances; + work->status = WorkStatus::WAITING_FOR_WORKER; + bool is_detached_actor = spec.IsDetachedActor(); + auto &owner_address = spec.CallerAddress(); + worker_pool_.PopWorker( + spec, + [this, task_id, scheduling_class, work, is_detached_actor, owner_address]( + const std::shared_ptr worker, + PopWorkerStatus status) -> bool { + return PoppedWorkerHandler(worker, status, task_id, scheduling_class, work, + is_detached_actor, owner_address); + }, + allocated_instances_serialized_json); + work_it++; } - work_it = dispatch_queue.erase(work_it); } if (is_infeasible) { infeasible_tasks_[shapes_it->first] = std::move(shapes_it->second); @@ -306,8 +389,9 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers( } } -bool ClusterTaskManager::TrySpillback(const Work &work, bool &is_infeasible) { - const auto &spec = std::get<0>(work).GetTaskSpecification(); +bool ClusterTaskManager::TrySpillback(const std::shared_ptr &work, + bool &is_infeasible) { + const auto &spec = work->task.GetTaskSpecification(); int64_t _unused; auto placement_resources = spec.GetRequiredPlacementResources().GetResourceMap(); std::string node_id_string = cluster_resource_scheduler_->GetBestSchedulableNode( @@ -331,7 +415,7 @@ void ClusterTaskManager::QueueAndScheduleTask( RAY_LOG(DEBUG) << "Queuing and scheduling task " << task.GetTaskSpecification().TaskId(); metric_tasks_queued_++; - Work work = std::make_tuple(task, reply, [send_reply_callback] { + auto work = std::make_shared(task, reply, [send_reply_callback] { send_reply_callback(Status::OK(), nullptr, nullptr); }); const auto &scheduling_class = task.GetTaskSpecification().GetSchedulingClass(); @@ -355,7 +439,7 @@ void ClusterTaskManager::TasksUnblocked(const std::vector &ready_ids) { auto it = waiting_tasks_index_.find(task_id); if (it != waiting_tasks_index_.end()) { auto work = *it->second; - const auto &task = std::get<0>(work); + const auto &task = work->task; const auto &scheduling_key = task.GetTaskSpecification().GetSchedulingClass(); RAY_LOG(DEBUG) << "Args ready, task can be dispatched " << task.GetTaskSpecification().TaskId(); @@ -485,9 +569,9 @@ void ClusterTaskManager::ReturnWorkerResources(std::shared_ptr ReleaseWorkerResources(worker); } -void ReplyCancelled(Work &work, bool runtime_env_setup_failed) { - auto reply = std::get<1>(work); - auto callback = std::get<2>(work); +void ReplyCancelled(std::shared_ptr &work, bool runtime_env_setup_failed) { + auto reply = work->reply; + auto callback = work->callback; reply->set_canceled(true); reply->set_runtime_env_setup_failed(runtime_env_setup_failed); callback(); @@ -501,10 +585,10 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id, shapes_it++) { auto &work_queue = shapes_it->second; for (auto work_it = work_queue.begin(); work_it != work_queue.end(); work_it++) { - const auto &task = std::get<0>(*work_it); + const auto &task = (*work_it)->task; if (task.GetTaskSpecification().TaskId() == task_id) { RemoveFromBacklogTracker(task); - RAY_LOG(DEBUG) << "Canceling task " << task_id; + RAY_LOG(DEBUG) << "Canceling task " << task_id << " from schedule queue."; ReplyCancelled(*work_it, runtime_env_setup_failed); work_queue.erase(work_it); if (work_queue.empty()) { @@ -518,14 +602,23 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id, shapes_it++) { auto &work_queue = shapes_it->second; for (auto work_it = work_queue.begin(); work_it != work_queue.end(); work_it++) { - const auto &task = std::get<0>(*work_it); + const auto &task = (*work_it)->task; if (task.GetTaskSpecification().TaskId() == task_id) { RemoveFromBacklogTracker(task); + RAY_LOG(DEBUG) << "Canceling task " << task_id << " from dispatch queue."; ReplyCancelled(*work_it, runtime_env_setup_failed); + if ((*work_it)->status == WorkStatus::WAITING_FOR_WORKER) { + // We've already acquired resources so we need to release them. + cluster_resource_scheduler_->ReleaseWorkerResources( + (*work_it)->allocated_instances); + // Release pinned task args. + ReleaseTaskArgs(task_id); + } if (!task.GetTaskSpecification().GetDependencies().empty()) { task_dependency_manager_.RemoveTaskDependencies( task.GetTaskSpecification().TaskId()); } + (*work_it)->status = WorkStatus::CANCELLED; work_queue.erase(work_it); if (work_queue.empty()) { tasks_to_dispatch_.erase(shapes_it); @@ -539,9 +632,10 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id, shapes_it++) { auto &work_queue = shapes_it->second; for (auto work_it = work_queue.begin(); work_it != work_queue.end(); work_it++) { - const auto &task = std::get<0>(*work_it); + const auto &task = (*work_it)->task; if (task.GetTaskSpecification().TaskId() == task_id) { RemoveFromBacklogTracker(task); + RAY_LOG(DEBUG) << "Canceling task " << task_id << " from infeasible queue."; ReplyCancelled(*work_it, runtime_env_setup_failed); work_queue.erase(work_it); if (work_queue.empty()) { @@ -554,7 +648,7 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id, auto iter = waiting_tasks_index_.find(task_id); if (iter != waiting_tasks_index_.end()) { - const auto &task = std::get<0>(*iter->second); + const auto &task = (*iter->second)->task; RemoveFromBacklogTracker(task); ReplyCancelled(*iter->second, runtime_env_setup_failed); if (!task.GetTaskSpecification().GetDependencies().empty()) { @@ -576,7 +670,7 @@ void ClusterTaskManager::FillPendingActorInfo(rpc::GetNodeStatsReply *reply) con for (const auto &shapes_it : infeasible_tasks_) { auto &work_queue = shapes_it.second; for (const auto &work_it : work_queue) { - RayTask task = std::get<0>(work_it); + RayTask task = work_it->task; if (task.GetTaskSpecification().IsActorCreationTask()) { if (num_reported++ > kMaxPendingActorsToReport) { break; // Protect the raylet from reporting too much data. @@ -591,7 +685,7 @@ void ClusterTaskManager::FillPendingActorInfo(rpc::GetNodeStatsReply *reply) con for (const auto &shapes_it : boost::join(tasks_to_dispatch_, tasks_to_schedule_)) { auto &work_queue = shapes_it.second; for (const auto &work_it : work_queue) { - RayTask task = std::get<0>(work_it); + RayTask task = work_it->task; if (task.GetTaskSpecification().IsActorCreationTask()) { if (num_reported++ > kMaxPendingActorsToReport) { break; // Protect the raylet from reporting too much data. @@ -793,7 +887,7 @@ bool ClusterTaskManager::AnyPendingTasks(RayTask *exemplar, bool *any_pending, for (const auto &shapes_it : boost::join(tasks_to_dispatch_, tasks_to_schedule_)) { auto &work_queue = shapes_it.second; for (const auto &work_it : work_queue) { - const auto &task = std::get<0>(work_it); + const auto &task = work_it->task; if (task.GetTaskSpecification().IsActorCreationTask()) { *num_pending_actor_creation += 1; } else { @@ -813,7 +907,8 @@ bool ClusterTaskManager::AnyPendingTasks(RayTask *exemplar, bool *any_pending, std::string ClusterTaskManager::DebugStr() const { // TODO(Shanly): This method will be replaced with `DebugString` once we remove the // legacy scheduler. - auto accumulator = [](size_t state, const std::pair> &pair) { + auto accumulator = [](size_t state, + const std::pair>> &pair) { return state + pair.second.size(); }; size_t num_infeasible_tasks = std::accumulate( @@ -861,7 +956,7 @@ void ClusterTaskManager::TryLocalInfeasibleTaskScheduling() { // We only need to check the first item because every task has the same shape. // If the first entry is infeasible, that means everything else is the same. const auto work = work_queue[0]; - RayTask task = std::get<0>(work); + RayTask task = work->task; RAY_LOG(DEBUG) << "Check if the infeasible task is schedulable in any node. task_id:" << task.GetTaskSpecification().TaskId(); auto placement_resources = @@ -894,8 +989,9 @@ void ClusterTaskManager::TryLocalInfeasibleTaskScheduling() { void ClusterTaskManager::Dispatch( std::shared_ptr worker, std::unordered_map> &leased_workers, - std::shared_ptr &allocated_instances, const RayTask &task, - rpc::RequestWorkerLeaseReply *reply, std::function send_reply_callback) { + const std::shared_ptr &allocated_instances, + const RayTask &task, rpc::RequestWorkerLeaseReply *reply, + std::function send_reply_callback) { metric_tasks_dispatched_++; const auto &task_spec = task.GetTaskSpecification(); @@ -970,9 +1066,10 @@ void ClusterTaskManager::Dispatch( send_reply_callback(); } -void ClusterTaskManager::Spillback(const NodeID &spillback_to, const Work &work) { +void ClusterTaskManager::Spillback(const NodeID &spillback_to, + const std::shared_ptr &work) { metric_tasks_spilled_++; - const auto &task = std::get<0>(work); + const auto &task = work->task; const auto &task_spec = task.GetTaskSpecification(); RemoveFromBacklogTracker(task); RAY_LOG(DEBUG) << "Spilling task " << task_spec.TaskId() << " to node " << spillback_to; @@ -987,13 +1084,13 @@ void ClusterTaskManager::Spillback(const NodeID &spillback_to, const Work &work) RAY_CHECK(node_info_opt) << "Spilling back to a node manager, but no GCS info found for node " << spillback_to; - auto reply = std::get<1>(work); + auto reply = work->reply; reply->mutable_retry_at_raylet_address()->set_ip_address( node_info_opt->node_manager_address()); reply->mutable_retry_at_raylet_address()->set_port(node_info_opt->node_manager_port()); reply->mutable_retry_at_raylet_address()->set_raylet_id(spillback_to.Binary()); - auto send_reply_callback = std::get<2>(work); + auto send_reply_callback = work->callback; send_reply_callback(); } @@ -1110,7 +1207,7 @@ void ClusterTaskManager::SpillWaitingTasks() { auto it = waiting_task_queue_.end(); while (it != waiting_task_queue_.begin()) { it--; - const auto &task = std::get<0>(*it); + const auto &task = (*it)->task; const auto &task_id = task.GetTaskSpecification().TaskId(); // Check whether this task's dependencies are blocked (not being actively diff --git a/src/ray/raylet/scheduling/cluster_task_manager.h b/src/ray/raylet/scheduling/cluster_task_manager.h index 582992bed3610..dfb74a3b9281d 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.h +++ b/src/ray/raylet/scheduling/cluster_task_manager.h @@ -34,8 +34,28 @@ namespace raylet { /// Work represents all the information needed to make a scheduling decision. /// This includes the task, the information we need to communicate to /// dispatch/spillback and the callback to trigger it. -typedef std::tuple> - Work; +enum WorkStatus { + WAITING, + WAITING_FOR_WORKER, + CANCELLED, +}; + +struct Work { + RayTask task; + rpc::RequestWorkerLeaseReply *reply; + std::function callback; + std::shared_ptr allocated_instances; + WorkStatus status = WorkStatus::WAITING; + Work(RayTask task, rpc::RequestWorkerLeaseReply *reply, + std::function callback, WorkStatus status = WorkStatus::WAITING) + : task(task), + reply(reply), + callback(callback), + allocated_instances(nullptr), + status(status){}; + Work(const Work &Work) = delete; + Work &operator=(const Work &work) = delete; +}; typedef std::function(const NodeID &node_id)> NodeInfoGetter; @@ -191,6 +211,13 @@ class ClusterTaskManager : public ClusterTaskManagerInterface { /// \return True if any tasks are ready for dispatch. bool SchedulePendingTasks(); + /// Handle the popped worker from worker pool. + bool PoppedWorkerHandler(const std::shared_ptr worker, + PopWorkerStatus status, const TaskID &task_id, + SchedulingClass scheduling_class, + const std::shared_ptr &work, bool is_detached_actor, + const rpc::Address &owner_address); + /// (Step 3) Attempts to dispatch all tasks which are ready to run. A task /// will be dispatched if it is on `tasks_to_dispatch_` and there are still /// available resources on the node. @@ -208,13 +235,7 @@ class ClusterTaskManager : public ClusterTaskManagerInterface { /// \returns true if the task was spilled. The task may not be spilled if the /// spillback policy specifies the local node (which may happen if no other nodes have /// the available resources). - bool TrySpillback(const Work &spec, bool &is_infeasible); - - /// Helper method to try dispatching a single task from the queue to an - /// available worker. Returns whether the task should be removed from the - /// queue and whether the worker was successfully leased to execute the work. - bool AttemptDispatchWork(const Work &work, std::shared_ptr &worker, - bool *worker_leased); + bool TrySpillback(const std::shared_ptr &work, bool &is_infeasible); /// Reiterate all local infeasible tasks and register them to task_to_schedule_ if it /// becomes feasible to schedule. @@ -243,7 +264,8 @@ class ClusterTaskManager : public ClusterTaskManagerInterface { /// through queues to cancel tasks, etc. /// Queue of lease requests that are waiting for resources to become available. /// Tasks move from scheduled -> dispatch | waiting. - std::unordered_map> tasks_to_schedule_; + std::unordered_map>> + tasks_to_schedule_; /// Queue of lease requests that should be scheduled onto workers. /// Tasks move from scheduled | waiting -> dispatch. @@ -252,7 +274,8 @@ class ClusterTaskManager : public ClusterTaskManagerInterface { /// All tasks in this map that have dependencies should be registered with /// the dependency manager, in case a dependency gets evicted while the task /// is still queued. - std::unordered_map> tasks_to_dispatch_; + std::unordered_map>> + tasks_to_dispatch_; /// Tasks waiting for arguments to be transferred locally. /// Tasks move from waiting -> dispatch. @@ -270,14 +293,16 @@ class ClusterTaskManager : public ClusterTaskManagerInterface { /// in this queue may not match the order in which we initially received the /// tasks. This also means that the PullManager may request dependencies for /// these tasks in a different order than the waiting task queue. - std::list waiting_task_queue_; + std::list> waiting_task_queue_; /// An index for the above queue. - absl::flat_hash_map::iterator> waiting_tasks_index_; + absl::flat_hash_map>::iterator> + waiting_tasks_index_; /// Queue of lease requests that are infeasible. /// Tasks go between scheduling <-> infeasible. - std::unordered_map> infeasible_tasks_; + std::unordered_map>> + infeasible_tasks_; /// Track the cumulative backlog of all workers requesting a lease to this raylet. std::unordered_map backlog_tracker_; @@ -321,15 +346,16 @@ class ClusterTaskManager : public ClusterTaskManagerInterface { /// or placed on a wait queue. /// /// \return True if the work can be immediately dispatched. - bool WaitForTaskArgsRequests(Work work); + bool WaitForTaskArgsRequests(std::shared_ptr work); void Dispatch( std::shared_ptr worker, std::unordered_map> &leased_workers_, - std::shared_ptr &allocated_instances, const RayTask &task, - rpc::RequestWorkerLeaseReply *reply, std::function send_reply_callback); + const std::shared_ptr &allocated_instances, + const RayTask &task, rpc::RequestWorkerLeaseReply *reply, + std::function send_reply_callback); - void Spillback(const NodeID &spillback_to, const Work &work); + void Spillback(const NodeID &spillback_to, const std::shared_ptr &work); void AddToBacklogTracker(const RayTask &task); void RemoveFromBacklogTracker(const RayTask &task); diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index 4f931c96cbba6..0ae7400fd4c39 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -45,34 +45,59 @@ class MockWorkerPool : public WorkerPoolInterface { public: MockWorkerPool() : num_pops(0) {} - std::shared_ptr PopWorker( - const TaskSpecification &task_spec, - const std::string &allocated_instances_serialized_json) { + void PopWorker(const TaskSpecification &task_spec, const PopWorkerCallback &callback, + const std::string &allocated_instances_serialized_json) { num_pops++; const WorkerCacheKey env = { task_spec.OverrideEnvironmentVariables(), task_spec.SerializedRuntimeEnv(), {}}; const int runtime_env_hash = env.IntHash(); - std::shared_ptr worker = nullptr; + callbacks[runtime_env_hash].push_back(callback); + } - for (auto it = workers.begin(); it != workers.end(); it++) { - // Skip if the runtime env doesn't match. - if (runtime_env_hash != (*it)->GetRuntimeEnvHash()) { - continue; - } + void PushWorker(const std::shared_ptr &worker) { + workers.push_front(worker); + } - worker = std::move(*it); - workers.erase(it); - break; + void TriggerCallbacks() { + for (auto it = workers.begin(); it != workers.end();) { + std::shared_ptr worker = *it; + auto runtime_env_hash = worker->GetRuntimeEnvHash(); + bool dispatched = false; + auto cb_it = callbacks.find(runtime_env_hash); + if (cb_it != callbacks.end()) { + auto &list = cb_it->second; + RAY_CHECK(!list.empty()); + for (auto list_it = list.begin(); list_it != list.end();) { + auto &callback = *list_it; + dispatched = callback(worker, PopWorkerStatus::OK); + list_it = list.erase(list_it); + if (dispatched) { + break; + } + } + if (list.empty()) { + callbacks.erase(cb_it); + } + if (dispatched) { + it = workers.erase(it); + continue; + } + } + it++; } - - return worker; } - void PushWorker(const std::shared_ptr &worker) { - workers.push_front(worker); + size_t CallbackSize(int runtime_env_hash) { + auto cb_it = callbacks.find(runtime_env_hash); + if (cb_it != callbacks.end()) { + auto &list = cb_it->second; + return list.size(); + } + return 0; } std::list> workers; + std::unordered_map> callbacks; int num_pops; }; @@ -230,6 +255,18 @@ class ClusterTaskManagerTest : public ::testing::Test { } } + int NumTasksWaitingForWorker() { + int count = 0; + for (const auto &pair : task_manager_.tasks_to_dispatch_) { + for (const auto &work : pair.second) { + if (work->status == WorkStatus::WAITING_FOR_WORKER) { + count++; + } + } + } + return count; + } + NodeID id_; std::shared_ptr scheduler_; MockWorkerPool pool_; @@ -263,7 +300,7 @@ TEST_F(ClusterTaskManagerTest, BasicTest) { }; task_manager_.QueueAndScheduleTask(task, &reply, callback); - + pool_.TriggerCallbacks(); ASSERT_FALSE(callback_occurred); ASSERT_EQ(leased_workers_.size(), 0); ASSERT_EQ(pool_.workers.size(), 0); @@ -271,8 +308,7 @@ TEST_F(ClusterTaskManagerTest, BasicTest) { std::shared_ptr worker = std::make_shared(WorkerID::FromRandom(), 1234); pool_.PushWorker(std::static_pointer_cast(worker)); - - task_manager_.ScheduleAndDispatchTasks(); + pool_.TriggerCallbacks(); ASSERT_TRUE(callback_occurred); ASSERT_EQ(leased_workers_.size(), 1); @@ -283,7 +319,6 @@ TEST_F(ClusterTaskManagerTest, BasicTest) { task_manager_.TaskFinished(leased_workers_.begin()->second, &finished_task); ASSERT_EQ(finished_task.GetTaskSpecification().TaskId(), task.GetTaskSpecification().TaskId()); - AssertNoLeaks(); } @@ -323,6 +358,7 @@ TEST_F(ClusterTaskManagerTest, DispatchQueueNonBlockingTest) { task_manager_.QueueAndScheduleTask(task_B_1, &reply_B_1, empty_callback); task_manager_.QueueAndScheduleTask(task_A, &reply_A, callback); task_manager_.QueueAndScheduleTask(task_B_2, &reply_B_2, empty_callback); + pool_.TriggerCallbacks(); // Push a worker that can only run task A. const WorkerCacheKey env_A = { @@ -331,11 +367,7 @@ TEST_F(ClusterTaskManagerTest, DispatchQueueNonBlockingTest) { std::shared_ptr worker_A = std::make_shared(WorkerID::FromRandom(), 1234, runtime_env_hash_A); pool_.PushWorker(std::static_pointer_cast(worker_A)); - ASSERT_EQ(pool_.workers.size(), 1); - - // Check we can schedule task A, even though task B is at the front of the queue - // and no workers are available for task B. - task_manager_.ScheduleAndDispatchTasks(); + pool_.TriggerCallbacks(); ASSERT_TRUE(callback_occurred); ASSERT_EQ(leased_workers_.size(), 1); @@ -366,6 +398,7 @@ TEST_F(ClusterTaskManagerTest, BlockedWorkerDiesTest) { }; task_manager_.QueueAndScheduleTask(task, &reply, callback); + pool_.TriggerCallbacks(); ASSERT_FALSE(callback_occurred); ASSERT_EQ(leased_workers_.size(), 0); @@ -376,6 +409,7 @@ TEST_F(ClusterTaskManagerTest, BlockedWorkerDiesTest) { pool_.PushWorker(std::static_pointer_cast(worker)); task_manager_.ScheduleAndDispatchTasks(); + pool_.TriggerCallbacks(); ASSERT_TRUE(callback_occurred); ASSERT_EQ(leased_workers_.size(), 1); @@ -409,6 +443,7 @@ TEST_F(ClusterTaskManagerTest, BlockedWorkerDies2Test) { }; task_manager_.QueueAndScheduleTask(task, &reply, callback); + pool_.TriggerCallbacks(); ASSERT_FALSE(callback_occurred); ASSERT_EQ(leased_workers_.size(), 0); @@ -419,6 +454,7 @@ TEST_F(ClusterTaskManagerTest, BlockedWorkerDies2Test) { pool_.PushWorker(std::static_pointer_cast(worker)); task_manager_.ScheduleAndDispatchTasks(); + pool_.TriggerCallbacks(); ASSERT_TRUE(callback_occurred); ASSERT_EQ(leased_workers_.size(), 1); @@ -452,6 +488,7 @@ TEST_F(ClusterTaskManagerTest, NoFeasibleNodeTest) { }; task_manager_.QueueAndScheduleTask(task, &reply, callback); + pool_.TriggerCallbacks(); ASSERT_FALSE(callback_called); ASSERT_EQ(leased_workers_.size(), 0); @@ -488,6 +525,7 @@ TEST_F(ClusterTaskManagerTest, ResourceTakenWhileResolving) { std::unordered_set expected_subscribed_tasks = { task.GetTaskSpecification().TaskId()}; task_manager_.QueueAndScheduleTask(task, &reply, callback); + pool_.TriggerCallbacks(); ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks); ASSERT_EQ(num_callbacks, 0); @@ -500,6 +538,7 @@ TEST_F(ClusterTaskManagerTest, ResourceTakenWhileResolving) { /* This task can run */ auto task2 = CreateTask({{ray::kCPU_ResourceLabel, 5}}, 1); task_manager_.QueueAndScheduleTask(task2, &reply, callback); + pool_.TriggerCallbacks(); ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks); AssertPinnedTaskArgumentsPresent(task2); @@ -527,6 +566,7 @@ TEST_F(ClusterTaskManagerTest, ResourceTakenWhileResolving) { leased_workers_.clear(); task_manager_.ScheduleAndDispatchTasks(); + pool_.TriggerCallbacks(); ASSERT_TRUE(dependency_manager_.subscribed_tasks.empty()); // Task2 is now done so task can run. @@ -561,30 +601,34 @@ TEST_F(ClusterTaskManagerTest, TestSpillAfterAssigned) { auto task = CreateTask({{ray::kCPU_ResourceLabel, 5}}); rpc::RequestWorkerLeaseReply local_reply; task_manager_.QueueAndScheduleTask(task, &local_reply, callback); + pool_.TriggerCallbacks(); ASSERT_EQ(num_callbacks, 0); ASSERT_EQ(leased_workers_.size(), 0); - /* This task can run but not at the same time as the first */ + // Resources are no longer available for the second. auto task2 = CreateTask({{ray::kCPU_ResourceLabel, 5}}); rpc::RequestWorkerLeaseReply spillback_reply; task_manager_.QueueAndScheduleTask(task2, &spillback_reply, callback); + pool_.TriggerCallbacks(); - ASSERT_EQ(num_callbacks, 0); + // The second task was spilled. + ASSERT_EQ(num_callbacks, 1); + ASSERT_EQ(spillback_reply.retry_at_raylet_address().raylet_id(), + remote_node_id.Binary()); ASSERT_EQ(leased_workers_.size(), 0); - // Two workers start. First task is dispatched now, but resources are no - // longer available for the second. + // Two workers start. First task was dispatched now. pool_.PushWorker(std::static_pointer_cast(worker)); pool_.PushWorker(std::static_pointer_cast(worker)); task_manager_.ScheduleAndDispatchTasks(); + pool_.TriggerCallbacks(); // Check that both tasks got removed from the queue. ASSERT_EQ(num_callbacks, 2); // The first task was dispatched. ASSERT_EQ(leased_workers_.size(), 1); - // The second task was spilled. - ASSERT_EQ(spillback_reply.retry_at_raylet_address().raylet_id(), - remote_node_id.Binary()); + // Leave one alive worker. + ASSERT_EQ(pool_.workers.size(), 1); RayTask finished_task; task_manager_.TaskFinished(leased_workers_.begin()->second, &finished_task); @@ -611,19 +655,22 @@ TEST_F(ClusterTaskManagerTest, TaskCancellationTest) { ASSERT_FALSE(task_manager_.CancelTask(task.GetTaskSpecification().TaskId())); task_manager_.QueueAndScheduleTask(task, &reply, callback); + pool_.TriggerCallbacks(); // RayTask is now in dispatch queue. callback_called = false; reply.Clear(); ASSERT_TRUE(task_manager_.CancelTask(task.GetTaskSpecification().TaskId())); task_manager_.ScheduleAndDispatchTasks(); - // RayTask will not execute. + pool_.TriggerCallbacks(); + // Task will not execute. ASSERT_TRUE(callback_called); ASSERT_TRUE(reply.canceled()); ASSERT_EQ(leased_workers_.size(), 0); pool_.PushWorker(std::static_pointer_cast(worker)); task_manager_.QueueAndScheduleTask(task, &reply, callback); + pool_.TriggerCallbacks(); // RayTask is now running so we can't cancel it. callback_called = false; @@ -660,11 +707,13 @@ TEST_F(ClusterTaskManagerTest, TaskCancelInfeasibleTask) { }; task_manager_.QueueAndScheduleTask(task, &reply, callback); + pool_.TriggerCallbacks(); // RayTask is now queued so cancellation works. ASSERT_TRUE(task_manager_.CancelTask(task.GetTaskSpecification().TaskId())); task_manager_.ScheduleAndDispatchTasks(); - // RayTask will not execute. + pool_.TriggerCallbacks(); + // Task will not execute. ASSERT_TRUE(callback_called); ASSERT_TRUE(reply.canceled()); ASSERT_EQ(leased_workers_.size(), 0); @@ -675,6 +724,7 @@ TEST_F(ClusterTaskManagerTest, TaskCancelInfeasibleTask) { auto remote_node_id = NodeID::FromRandom(); AddNode(remote_node_id, 12); task_manager_.ScheduleAndDispatchTasks(); + pool_.TriggerCallbacks(); ASSERT_TRUE(callback_called); ASSERT_TRUE(reply.canceled()); ASSERT_EQ(leased_workers_.size(), 0); @@ -699,6 +749,7 @@ TEST_F(ClusterTaskManagerTest, HeartbeatTest) { }; task_manager_.QueueAndScheduleTask(task, &reply, callback); + pool_.TriggerCallbacks(); ASSERT_TRUE(callback_called); // Now {CPU: 7, GPU: 4, MEM:128} } @@ -715,6 +766,7 @@ TEST_F(ClusterTaskManagerTest, HeartbeatTest) { }; task_manager_.QueueAndScheduleTask(task, &reply, callback); + pool_.TriggerCallbacks(); ASSERT_FALSE(callback_called); // No worker available. // Now {CPU: 7, GPU: 4, MEM:128} with 1 queued task. } @@ -732,6 +784,7 @@ TEST_F(ClusterTaskManagerTest, HeartbeatTest) { }; task_manager_.QueueAndScheduleTask(task, &reply, callback); + pool_.TriggerCallbacks(); ASSERT_FALSE(callback_called); // Infeasible. // Now there is also an infeasible task {CPU: 9}. } @@ -749,6 +802,7 @@ TEST_F(ClusterTaskManagerTest, HeartbeatTest) { }; task_manager_.QueueAndScheduleTask(task, &reply, callback); + pool_.TriggerCallbacks(); ASSERT_FALSE(callback_called); // Infeasible. // Now there is also an infeasible task {CPU: 10}. } @@ -818,12 +872,14 @@ TEST_F(ClusterTaskManagerTest, BacklogReportTest) { RayTask task = CreateTask({{ray::kCPU_ResourceLabel, 8}}); task.SetBacklogSize(10 - i); task_manager_.QueueAndScheduleTask(task, &reply, callback); + pool_.TriggerCallbacks(); } for (int i = 1; i < 10; i++) { RayTask task = CreateTask({{ray::kCPU_ResourceLabel, 8}}); task.SetBacklogSize(10 - i); task_manager_.QueueAndScheduleTask(task, &reply, callback); + pool_.TriggerCallbacks(); to_cancel.push_back(task.GetTaskSpecification().TaskId()); } @@ -848,6 +904,7 @@ TEST_F(ClusterTaskManagerTest, BacklogReportTest) { std::make_shared(WorkerID::FromRandom(), 1234); pool_.PushWorker(worker); task_manager_.ScheduleAndDispatchTasks(); + pool_.TriggerCallbacks(); { rpc::ResourcesData data; @@ -865,7 +922,6 @@ TEST_F(ClusterTaskManagerTest, BacklogReportTest) { for (auto &task_id : to_cancel) { ASSERT_TRUE(task_manager_.CancelTask(task_id)); } - RAY_LOG(ERROR) << "Finished cancelling tasks"; { rpc::ResourcesData data; @@ -902,6 +958,7 @@ TEST_F(ClusterTaskManagerTest, OwnerDeadTest) { is_owner_alive_ = false; task_manager_.QueueAndScheduleTask(task, &reply, callback); + pool_.TriggerCallbacks(); ASSERT_FALSE(callback_occurred); ASSERT_EQ(leased_workers_.size(), 0); @@ -909,6 +966,7 @@ TEST_F(ClusterTaskManagerTest, OwnerDeadTest) { is_owner_alive_ = true; task_manager_.ScheduleAndDispatchTasks(); + pool_.TriggerCallbacks(); ASSERT_FALSE(callback_occurred); ASSERT_EQ(leased_workers_.size(), 0); @@ -929,6 +987,7 @@ TEST_F(ClusterTaskManagerTest, TestInfeasibleTaskWarning) { *callback_occurred = true; }; task_manager_.QueueAndScheduleTask(task, &reply, callback); + pool_.TriggerCallbacks(); ASSERT_EQ(announce_infeasible_task_calls_, 1); // Infeasible warning shouldn't be reprinted when the previous task is still infeasible @@ -938,7 +997,8 @@ TEST_F(ClusterTaskManagerTest, TestInfeasibleTaskWarning) { std::make_shared(WorkerID::FromRandom(), 1234); pool_.PushWorker(std::static_pointer_cast(worker)); task_manager_.ScheduleAndDispatchTasks(); - // RayTask shouldn't be scheduled yet. + pool_.TriggerCallbacks(); + // Task shouldn't be scheduled yet. ASSERT_EQ(announce_infeasible_task_calls_, 1); ASSERT_FALSE(*callback_occurred); ASSERT_EQ(leased_workers_.size(), 0); @@ -949,6 +1009,7 @@ TEST_F(ClusterTaskManagerTest, TestInfeasibleTaskWarning) { auto remote_node_id = NodeID::FromRandom(); AddNode(remote_node_id, 12); task_manager_.ScheduleAndDispatchTasks(); + pool_.TriggerCallbacks(); // Make sure nothing happens locally. ASSERT_EQ(announce_infeasible_task_calls_, 1); ASSERT_TRUE(*callback_occurred); @@ -973,6 +1034,7 @@ TEST_F(ClusterTaskManagerTest, TestMultipleInfeasibleTasksWarnOnce) { *callback_occurred = true; }; task_manager_.QueueAndScheduleTask(task, &reply, callback); + pool_.TriggerCallbacks(); ASSERT_EQ(announce_infeasible_task_calls_, 1); // Make sure the same shape infeasible task won't be announced. @@ -984,6 +1046,7 @@ TEST_F(ClusterTaskManagerTest, TestMultipleInfeasibleTasksWarnOnce) { *callback_occurred2 = true; }; task_manager_.QueueAndScheduleTask(task2, &reply2, callback2); + pool_.TriggerCallbacks(); ASSERT_EQ(announce_infeasible_task_calls_, 1); } @@ -1004,6 +1067,7 @@ TEST_F(ClusterTaskManagerTest, TestAnyPendingTasks) { *callback_occurred = true; }; task_manager_.QueueAndScheduleTask(task, &reply, callback); + pool_.TriggerCallbacks(); ASSERT_TRUE(*callback_occurred); ASSERT_EQ(leased_workers_.size(), 1); ASSERT_EQ(pool_.workers.size(), 0); @@ -1025,6 +1089,7 @@ TEST_F(ClusterTaskManagerTest, TestAnyPendingTasks) { *callback_occurred2 = true; }; task_manager_.QueueAndScheduleTask(task2, &reply2, callback2); + pool_.TriggerCallbacks(); ASSERT_FALSE(*callback_occurred2); ASSERT_TRUE(task_manager_.AnyPendingTasks(&exemplar, &any_pending, &pending_actor_creations, &pending_tasks)); @@ -1054,6 +1119,7 @@ TEST_F(ClusterTaskManagerTest, ArgumentEvicted) { std::unordered_set expected_subscribed_tasks = { task.GetTaskSpecification().TaskId()}; task_manager_.QueueAndScheduleTask(task, &reply, callback); + pool_.TriggerCallbacks(); ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks); ASSERT_EQ(num_callbacks, 0); ASSERT_EQ(leased_workers_.size(), 0); @@ -1067,17 +1133,10 @@ TEST_F(ClusterTaskManagerTest, ArgumentEvicted) { ASSERT_EQ(num_callbacks, 0); ASSERT_EQ(leased_workers_.size(), 0); - /* RayTask argument gets evicted */ - missing_objects_.insert(missing_arg); + /* Worker available and arguments available */ pool_.PushWorker(std::static_pointer_cast(worker)); task_manager_.ScheduleAndDispatchTasks(); - ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks); - ASSERT_EQ(num_callbacks, 0); - ASSERT_EQ(leased_workers_.size(), 0); - - /* Worker available and arguments available */ - missing_objects_.erase(missing_arg); - task_manager_.TasksUnblocked({id}); + pool_.TriggerCallbacks(); ASSERT_EQ(num_callbacks, 1); ASSERT_EQ(leased_workers_.size(), 1); @@ -1103,6 +1162,7 @@ TEST_F(ClusterTaskManagerTest, FeasibleToNonFeasible) { [&callback_occurred1](Status, std::function, std::function) { callback_occurred1 = true; }); + pool_.TriggerCallbacks(); ASSERT_EQ(leased_workers_.size(), 1); ASSERT_TRUE(callback_occurred1); ASSERT_EQ(pool_.workers.size(), 0); @@ -1110,6 +1170,10 @@ TEST_F(ClusterTaskManagerTest, FeasibleToNonFeasible) { ASSERT_EQ(task_manager_.tasks_to_dispatch_.size(), 0); ASSERT_EQ(task_manager_.infeasible_tasks_.size(), 0); + // Delete cpu resource of local node, then task 2 should be turned into + // infeasible. + scheduler_->DeleteLocalResource(ray::kCPU_ResourceLabel); + RayTask task2 = CreateTask({{ray::kCPU_ResourceLabel, 4}}); rpc::RequestWorkerLeaseReply reply2; bool callback_occurred2 = false; @@ -1118,26 +1182,18 @@ TEST_F(ClusterTaskManagerTest, FeasibleToNonFeasible) { [&callback_occurred2](Status, std::function, std::function) { callback_occurred2 = true; }); + pool_.TriggerCallbacks(); ASSERT_EQ(leased_workers_.size(), 1); ASSERT_FALSE(callback_occurred2); ASSERT_EQ(pool_.workers.size(), 0); ASSERT_EQ(task_manager_.tasks_to_schedule_.size(), 0); - // This task is under scheduling - ASSERT_EQ(task_manager_.tasks_to_dispatch_.size(), 1); - ASSERT_EQ(task_manager_.infeasible_tasks_.size(), 0); + ASSERT_EQ(task_manager_.tasks_to_dispatch_.size(), 0); + ASSERT_EQ(task_manager_.infeasible_tasks_.size(), 1); RayTask finished_task; task_manager_.TaskFinished(leased_workers_.begin()->second, &finished_task); ASSERT_EQ(finished_task.GetTaskSpecification().TaskId(), task1.GetTaskSpecification().TaskId()); - // Delete cpu resource of local node, then task 2 should be turned into - // infeasible. - scheduler_->DeleteLocalResource(ray::kCPU_ResourceLabel); - task_manager_.ScheduleAndDispatchTasks(); - ASSERT_FALSE(callback_occurred2); - ASSERT_EQ(task_manager_.tasks_to_schedule_.size(), 0); - ASSERT_EQ(task_manager_.tasks_to_dispatch_.size(), 0); - ASSERT_EQ(task_manager_.infeasible_tasks_.size(), 1); } TEST_F(ClusterTaskManagerTest, RleaseAndReturnWorkerCpuResources) { @@ -1214,22 +1270,17 @@ TEST_F(ClusterTaskManagerTest, TestSpillWaitingTasks) { missing_objects_.insert(missing_arg); } task_manager_.QueueAndScheduleTask(task, replies[i].get(), callback); + pool_.TriggerCallbacks(); } ASSERT_EQ(num_callbacks, 0); + // Local resources could only dispatch one task. + ASSERT_EQ(NumTasksWaitingForWorker(), 1); auto remote_node_id = NodeID::FromRandom(); AddNode(remote_node_id, 16); - // We are fetching dependencies for all waiting tasks and we have enough - // resources available locally to schedule them. We should not spill from the - // waiting queue. - task_manager_.ScheduleAndDispatchTasks(); - ASSERT_EQ(num_callbacks, 0); - - // All waiting tasks are blocked due to lack of memory. We should only spill - // up to the remote node's resource availability. - for (auto &task : tasks) { - dependency_manager_.blocked_tasks.insert(task.GetTaskSpecification().TaskId()); - } + // We are fetching dependencies for all waiting tasks but we have no enough + // resources available locally to schedule tasks except the first. + // We should only spill up to the remote node's resource availability. task_manager_.ScheduleAndDispatchTasks(); ASSERT_EQ(num_callbacks, 2); // Spill from the back of the waiting queue. @@ -1242,19 +1293,13 @@ TEST_F(ClusterTaskManagerTest, TestSpillWaitingTasks) { // Do not spill back tasks ready to dispatch. ASSERT_EQ(replies[4]->retry_at_raylet_address().raylet_id(), ""); - // Add a new node. All task dependencies are being fetched again, so no - // spill. AddNode(remote_node_id, 8); - dependency_manager_.blocked_tasks.clear(); - task_manager_.ScheduleAndDispatchTasks(); - ASSERT_EQ(num_callbacks, 2); - - // Dispatch the ready task. Now we have no more resources available locally, - // so we should spill one waiting task. + // Dispatch the ready task. std::shared_ptr worker = std::make_shared(WorkerID::FromRandom(), 1234); pool_.PushWorker(std::dynamic_pointer_cast(worker)); task_manager_.ScheduleAndDispatchTasks(); + pool_.TriggerCallbacks(); ASSERT_EQ(num_callbacks, 4); // One waiting task spilled. ASSERT_EQ(replies[0]->retry_at_raylet_address().raylet_id(), ""); @@ -1265,6 +1310,7 @@ TEST_F(ClusterTaskManagerTest, TestSpillWaitingTasks) { // Spillback is idempotent. task_manager_.ScheduleAndDispatchTasks(); + pool_.TriggerCallbacks(); ASSERT_EQ(num_callbacks, 4); // One waiting task spilled. ASSERT_EQ(replies[0]->retry_at_raylet_address().raylet_id(), ""); @@ -1304,6 +1350,7 @@ TEST_F(ClusterTaskManagerTest, PinnedArgsMemoryTest) { default_arg_size_ = 600; auto task = CreateTask({{ray::kCPU_ResourceLabel, 1}}, 1); task_manager_.QueueAndScheduleTask(task, &reply, callback); + pool_.TriggerCallbacks(); ASSERT_EQ(num_callbacks, 1); ASSERT_EQ(leased_workers_.size(), 1); ASSERT_EQ(pool_.workers.size(), 1); @@ -1312,6 +1359,7 @@ TEST_F(ClusterTaskManagerTest, PinnedArgsMemoryTest) { // This task cannot run because it would put us over the memory threshold. auto task2 = CreateTask({{ray::kCPU_ResourceLabel, 1}}, 1); task_manager_.QueueAndScheduleTask(task2, &reply, callback); + pool_.TriggerCallbacks(); ASSERT_EQ(num_callbacks, 1); ASSERT_EQ(leased_workers_.size(), 1); ASSERT_EQ(pool_.workers.size(), 1); @@ -1322,6 +1370,7 @@ TEST_F(ClusterTaskManagerTest, PinnedArgsMemoryTest) { leased_workers_.clear(); task_manager_.ScheduleAndDispatchTasks(); + pool_.TriggerCallbacks(); AssertPinnedTaskArgumentsPresent(task2); ASSERT_EQ(num_callbacks, 2); ASSERT_EQ(leased_workers_.size(), 1); @@ -1355,6 +1404,7 @@ TEST_F(ClusterTaskManagerTest, PinnedArgsSameMemoryTest) { default_arg_size_ = 600; auto task = CreateTask({{ray::kCPU_ResourceLabel, 1}}, 1); task_manager_.QueueAndScheduleTask(task, &reply, callback); + pool_.TriggerCallbacks(); ASSERT_EQ(num_callbacks, 1); ASSERT_EQ(leased_workers_.size(), 1); ASSERT_EQ(pool_.workers.size(), 1); @@ -1364,6 +1414,7 @@ TEST_F(ClusterTaskManagerTest, PinnedArgsSameMemoryTest) { auto task2 = CreateTask({{ray::kCPU_ResourceLabel, 1}}, 1, task.GetTaskSpecification().GetDependencyIds()); task_manager_.QueueAndScheduleTask(task2, &reply, callback); + pool_.TriggerCallbacks(); ASSERT_EQ(num_callbacks, 2); ASSERT_EQ(leased_workers_.size(), 2); ASSERT_EQ(pool_.workers.size(), 0); @@ -1392,6 +1443,7 @@ TEST_F(ClusterTaskManagerTest, LargeArgsNoStarvationTest) { auto task = CreateTask({{ray::kCPU_ResourceLabel, 1}}, 1); pool_.PushWorker(std::static_pointer_cast(worker)); task_manager_.QueueAndScheduleTask(task, &reply, callback); + pool_.TriggerCallbacks(); ASSERT_EQ(num_callbacks, 1); ASSERT_EQ(leased_workers_.size(), 1); AssertPinnedTaskArgumentsPresent(task); @@ -1420,6 +1472,53 @@ TEST_F(ClusterTaskManagerTest, TestResourceDiff) { ASSERT_TRUE(resource_data.resource_load_changed()); } +TEST_F(ClusterTaskManagerTest, PopWorkerExactlyOnce) { + // Create and queue one task. + std::string serialized_runtime_env = "mock_env"; + RayTask task = CreateTask({{ray::kCPU_ResourceLabel, 4}}, /*num_args=*/0, /*args=*/{}, + serialized_runtime_env); + auto runtime_env_hash = task.GetTaskSpecification().GetRuntimeEnvHash(); + rpc::RequestWorkerLeaseReply reply; + bool callback_occurred = false; + bool *callback_occurred_ptr = &callback_occurred; + auto callback = [callback_occurred_ptr](Status, std::function, + std::function) { + *callback_occurred_ptr = true; + }; + + task_manager_.QueueAndScheduleTask(task, &reply, callback); + + // Make sure callback doesn't occurred. + ASSERT_FALSE(callback_occurred); + ASSERT_EQ(leased_workers_.size(), 0); + ASSERT_EQ(pool_.workers.size(), 0); + // Popworker was called once. + ASSERT_EQ(pool_.CallbackSize(runtime_env_hash), 1); + // Try to schedule and dispatch tasks. + task_manager_.ScheduleAndDispatchTasks(); + // Popworker has been called once, don't call it repeatedly. + ASSERT_EQ(pool_.CallbackSize(runtime_env_hash), 1); + // Push a worker and try to call back. + std::shared_ptr worker = + std::make_shared(WorkerID::FromRandom(), 1234, runtime_env_hash); + pool_.PushWorker(std::static_pointer_cast(worker)); + pool_.TriggerCallbacks(); + // Make sure callback has occurred. + ASSERT_TRUE(callback_occurred); + ASSERT_EQ(leased_workers_.size(), 1); + ASSERT_EQ(pool_.workers.size(), 0); + // Try to schedule and dispatch tasks. + task_manager_.ScheduleAndDispatchTasks(); + // Worker has been popped. Don't call `PopWorker` repeatedly. + ASSERT_EQ(pool_.CallbackSize(runtime_env_hash), 0); + + RayTask finished_task; + task_manager_.TaskFinished(leased_workers_.begin()->second, &finished_task); + ASSERT_EQ(finished_task.GetTaskSpecification().TaskId(), + task.GetTaskSpecification().TaskId()); + AssertNoLeaks(); +} + int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); diff --git a/src/ray/raylet/test/util.h b/src/ray/raylet/test/util.h index ff698a656c94e..65afe99058704 100644 --- a/src/ray/raylet/test/util.h +++ b/src/ray/raylet/test/util.h @@ -41,12 +41,12 @@ class MockWorker : public WorkerInterface { const std::string IpAddress() const { return address_.ip_address(); } void SetAllocatedInstances( - std::shared_ptr &allocated_instances) { + const std::shared_ptr &allocated_instances) { allocated_instances_ = allocated_instances; } void SetLifetimeAllocatedInstances( - std::shared_ptr &allocated_instances) { + const std::shared_ptr &allocated_instances) { lifetime_allocated_instances_ = allocated_instances; } diff --git a/src/ray/raylet/worker.h b/src/ray/raylet/worker.h index df4f73b55dd32..ee638836933c2 100644 --- a/src/ray/raylet/worker.h +++ b/src/ray/raylet/worker.h @@ -93,14 +93,14 @@ class WorkerInterface { // Setter, geter, and clear methods for allocated_instances_. virtual void SetAllocatedInstances( - std::shared_ptr &allocated_instances) = 0; + const std::shared_ptr &allocated_instances) = 0; virtual std::shared_ptr GetAllocatedInstances() = 0; virtual void ClearAllocatedInstances() = 0; virtual void SetLifetimeAllocatedInstances( - std::shared_ptr &allocated_instances) = 0; + const std::shared_ptr &allocated_instances) = 0; virtual std::shared_ptr GetLifetimeAllocatedInstances() = 0; virtual void ClearLifetimeAllocatedInstances() = 0; @@ -182,7 +182,7 @@ class Worker : public WorkerInterface { // Setter, geter, and clear methods for allocated_instances_. void SetAllocatedInstances( - std::shared_ptr &allocated_instances) { + const std::shared_ptr &allocated_instances) { allocated_instances_ = allocated_instances; }; @@ -193,7 +193,7 @@ class Worker : public WorkerInterface { void ClearAllocatedInstances() { allocated_instances_ = nullptr; }; void SetLifetimeAllocatedInstances( - std::shared_ptr &allocated_instances) { + const std::shared_ptr &allocated_instances) { lifetime_allocated_instances_ = allocated_instances; }; diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index e779faa478f81..a2a1daa028459 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -57,15 +57,15 @@ namespace ray { namespace raylet { -WorkerPool::WorkerPool( - instrumented_io_context &io_service, const NodeID node_id, - const std::string node_address, int num_workers_soft_limit, - int num_initial_python_workers_for_first_job, int maximum_startup_concurrency, - int min_worker_port, int max_worker_port, const std::vector &worker_ports, - std::shared_ptr gcs_client, const WorkerCommandMap &worker_commands, - std::function starting_worker_timeout_callback, - std::function runtime_env_setup_failed_callback, - int ray_debugger_external, const std::function get_time) +WorkerPool::WorkerPool(instrumented_io_context &io_service, const NodeID node_id, + const std::string node_address, int num_workers_soft_limit, + int num_initial_python_workers_for_first_job, + int maximum_startup_concurrency, int min_worker_port, + int max_worker_port, const std::vector &worker_ports, + std::shared_ptr gcs_client, + const WorkerCommandMap &worker_commands, + std::function starting_worker_timeout_callback, + int ray_debugger_external, const std::function get_time) : io_service_(&io_service), node_id_(node_id), node_address_(node_address), @@ -73,7 +73,6 @@ WorkerPool::WorkerPool( maximum_startup_concurrency_(maximum_startup_concurrency), gcs_client_(std::move(gcs_client)), starting_worker_timeout_callback_(starting_worker_timeout_callback), - runtime_env_setup_failed_callback_(runtime_env_setup_failed_callback), ray_debugger_external(ray_debugger_external), first_job_registered_python_worker_count_(0), first_job_driver_wait_num_python_workers_(std::min( @@ -153,10 +152,30 @@ void WorkerPool::SetAgentManager(std::shared_ptr agent_manager) { agent_manager_ = agent_manager; } +void WorkerPool::PopWorkerCallbackAsync(const PopWorkerCallback &callback, + std::shared_ptr worker, + PopWorkerStatus status) { + // Call back this function asynchronously to make sure executed in different stack. + io_service_->post([this, callback, worker, status]() { + PopWorkerCallbackInternal(callback, worker, status); + }); +} + +void WorkerPool::PopWorkerCallbackInternal(const PopWorkerCallback &callback, + std::shared_ptr worker, + PopWorkerStatus status) { + RAY_CHECK(callback); + auto used = callback(worker, status); + if (worker && !used) { + // The invalid worker not used, restore it to worker pool. + PushWorker(worker); + } +} + Process WorkerPool::StartWorkerProcess( const Language &language, const rpc::WorkerType worker_type, const JobID &job_id, - const std::vector &dynamic_options, const int runtime_env_hash, - const std::string &serialized_runtime_env, + PopWorkerStatus *status, const std::vector &dynamic_options, + const int runtime_env_hash, const std::string &serialized_runtime_env, std::unordered_map override_environment_variables, const std::string &serialized_runtime_env_context, const std::string &allocated_instances_serialized_json) { @@ -167,6 +186,7 @@ Process WorkerPool::StartWorkerProcess( if (it == all_jobs_.end()) { RAY_LOG(DEBUG) << "Job config of job " << job_id << " are not local yet."; // Will reschedule ready tasks in `NodeManager::HandleJobStarted`. + *status = PopWorkerStatus::JobConfigMissing; return Process(); } job_config = &it->second; @@ -188,6 +208,7 @@ Process WorkerPool::StartWorkerProcess( RAY_LOG(DEBUG) << "Worker not started, " << starting_workers << " workers of language type " << static_cast(language) << " pending registration"; + *status = PopWorkerStatus::TooManyStartingWorkerProcesses; return Process(); } // Either there are no workers pending registration or the worker start is being forced. @@ -345,7 +366,6 @@ Process WorkerPool::StartWorkerProcess( auto duration = std::chrono::duration_cast(end - start); stats::ProcessStartupTimeMs.Record(duration.count()); stats::NumWorkersStarted.Record(1); - RAY_LOG(INFO) << "Started worker process of " << workers_to_start << " worker(s) with pid " << proc.GetId(); MonitorStartingWorkerProcess(proc, language, worker_type); @@ -365,27 +385,37 @@ void WorkerPool::MonitorStartingWorkerProcess(const Process &proc, *io_service_, boost::posix_time::seconds( RayConfig::instance().worker_register_timeout_seconds())); // Capture timer in lambda to copy it once, so that it can avoid destructing timer. - timer->async_wait( - [timer, language, proc, worker_type, this](const boost::system::error_code e) { - // check the error code. - auto &state = this->GetStateForLanguage(language); - // Since this process times out to start, remove it from starting_worker_processes - // to avoid the zombie worker. - auto it = state.starting_worker_processes.find(proc); - if (it != state.starting_worker_processes.end()) { - RAY_LOG(INFO) << "Some workers of the worker process(" << proc.GetId() - << ") have not registered to raylet within timeout."; - state.starting_worker_processes.erase(it); - if (IsIOWorkerType(worker_type)) { - // Mark the I/O worker as failed. - auto &io_worker_state = GetIOWorkerStateFromWorkerType(worker_type, state); - io_worker_state.num_starting_io_workers--; - } - // We may have places to start more workers now. - TryStartIOWorkers(language); - starting_worker_timeout_callback_(); - } - }); + timer->async_wait([timer, language, proc, worker_type, + this](const boost::system::error_code e) { + // check the error code. + auto &state = this->GetStateForLanguage(language); + // Since this process times out to start, remove it from starting_worker_processes + // to avoid the zombie worker. + auto it = state.starting_worker_processes.find(proc); + if (it != state.starting_worker_processes.end()) { + RAY_LOG(INFO) << "Some workers of the worker process(" << proc.GetId() + << ") have not registered to raylet within timeout."; + PopWorkerStatus status = PopWorkerStatus::WorkerPendingRegistration; + bool found; + bool used; + TaskID task_id; + InvokePopWorkerCallbackForProcess(state.starting_dedicated_workers_to_tasks, proc, + nullptr, status, &found, &used, &task_id); + if (!found) { + InvokePopWorkerCallbackForProcess(state.starting_workers_to_tasks, proc, nullptr, + status, &found, &used, &task_id); + } + state.starting_worker_processes.erase(it); + if (IsIOWorkerType(worker_type)) { + // Mark the I/O worker as failed. + auto &io_worker_state = GetIOWorkerStateFromWorkerType(worker_type, state); + io_worker_state.num_starting_io_workers--; + } + // We may have places to start more workers now. + TryStartIOWorkers(language); + starting_worker_timeout_callback_(); + } + }); } Process WorkerPool::StartProcess(const std::vector &worker_command_args, @@ -572,7 +602,8 @@ Status WorkerPool::RegisterDriver(const std::shared_ptr &driver delay_callback = true; // Start initial Python workers for the first job. for (int i = 0; i < num_initial_python_workers_for_first_job_; i++) { - StartWorkerProcess(Language::PYTHON, rpc::WorkerType::WORKER, job_id); + PopWorkerStatus status; + StartWorkerProcess(Language::PYTHON, rpc::WorkerType::WORKER, job_id, &status); } } } @@ -695,19 +726,49 @@ void WorkerPool::PopDeleteWorker( } } +void WorkerPool::InvokePopWorkerCallbackForProcess( + std::unordered_map &starting_workers_to_tasks, + const Process &proc, const std::shared_ptr &worker, + const PopWorkerStatus &status, bool *found, bool *worker_used, TaskID *task_id) { + *found = false; + *worker_used = false; + auto it = starting_workers_to_tasks.find(proc); + if (it != starting_workers_to_tasks.end()) { + *found = true; + *task_id = it->second.task_id; + const auto &callback = it->second.callback; + RAY_CHECK(callback); + *worker_used = callback(worker, status); + starting_workers_to_tasks.erase(it); + } +} + void WorkerPool::PushWorker(const std::shared_ptr &worker) { // Since the worker is now idle, unset its assigned task ID. RAY_CHECK(worker->GetAssignedTaskId().IsNil()) << "Idle workers cannot have an assigned task ID"; auto &state = GetStateForLanguage(worker->GetLanguage()); - auto it = state.dedicated_workers_to_tasks.find(worker->GetShimProcess()); - if (it != state.dedicated_workers_to_tasks.end()) { + bool found; + bool used; + TaskID task_id; + InvokePopWorkerCallbackForProcess(state.starting_dedicated_workers_to_tasks, + worker->GetShimProcess(), worker, PopWorkerStatus::OK, + &found, &used, &task_id); + if (found) { // The worker is used for the actor creation task with dynamic options. - // Put it into idle dedicated worker pool. - const auto task_id = it->second; - state.idle_dedicated_workers[task_id] = worker; - } else { - // The worker is not used for the actor creation task with dynamic options. + if (!used) { + // Put it into idle dedicated worker pool. + // TODO(guyang.sgy): This worker will not be used forever. We should kill it. + state.idle_dedicated_workers[task_id] = worker; + } + return; + } + + InvokePopWorkerCallbackForProcess(state.starting_workers_to_tasks, + worker->GetShimProcess(), worker, PopWorkerStatus::OK, + &found, &used, &task_id); + // The worker is not used for the actor creation task with dynamic options. + if (!used) { // Put the worker to the idle pool. state.idle.insert(worker); int64_t now = get_time_(); @@ -866,42 +927,39 @@ void WorkerPool::TryKillingIdleWorkers() { RAY_CHECK(idle_of_all_languages_.size() == idle_of_all_languages_map_.size()); } -int GetRuntimeEnvHash(const TaskSpecification &task_spec) { - // We add required_resource instead of allocated_instances because allocated_instances - // may contains resource ID. - std::unordered_map required_resource{}; - if (RayConfig::instance().worker_resource_limits_enabled()) { - required_resource = task_spec.GetRequiredResources().GetResourceMap(); - } - const WorkerCacheKey env = {task_spec.OverrideEnvironmentVariables(), - task_spec.SerializedRuntimeEnv(), required_resource}; - return env.IntHash(); -} - -std::shared_ptr WorkerPool::PopWorker( - const TaskSpecification &task_spec, - const std::string &allocated_instances_serialized_json) { +void WorkerPool::PopWorker(const TaskSpecification &task_spec, + const PopWorkerCallback &callback, + const std::string &allocated_instances_serialized_json) { + RAY_LOG(DEBUG) << "Pop worker for task " << task_spec.TaskId(); auto &state = GetStateForLanguage(task_spec.GetLanguage()); std::shared_ptr worker = nullptr; - Process proc; - auto start_worker_process_fn = - [this, allocated_instances_serialized_json]( - const TaskSpecification &task_spec, State &state, - std::vector dynamic_options, bool dedicated, - const int runtime_env_hash, const std::string &serialized_runtime_env, - const std::string &serialized_runtime_env_context) -> Process { + auto start_worker_process_fn = [this, allocated_instances_serialized_json]( + const TaskSpecification &task_spec, State &state, + std::vector dynamic_options, + bool dedicated, + const std::string &serialized_runtime_env, + const std::string &serialized_runtime_env_context, + const PopWorkerCallback &callback) -> Process { + PopWorkerStatus status = PopWorkerStatus::OK; Process proc = StartWorkerProcess( - task_spec.GetLanguage(), rpc::WorkerType::WORKER, task_spec.JobId(), - dynamic_options, runtime_env_hash, serialized_runtime_env, + task_spec.GetLanguage(), rpc::WorkerType::WORKER, task_spec.JobId(), &status, + dynamic_options, task_spec.GetRuntimeEnvHash(), serialized_runtime_env, task_spec.OverrideEnvironmentVariables(), serialized_runtime_env_context, allocated_instances_serialized_json); - if (proc.IsValid()) { + if (status == PopWorkerStatus::OK) { + RAY_CHECK(proc.IsValid()); WarnAboutSize(); + auto task_info = TaskWaitingForWorkerInfo{task_spec.TaskId(), callback}; if (dedicated) { - state.dedicated_workers_to_tasks[proc] = task_spec.TaskId(); - state.tasks_with_dedicated_workers.emplace(task_spec.TaskId()); + state.starting_dedicated_workers_to_tasks[proc] = std::move(task_info); + } else { + state.starting_workers_to_tasks[proc] = std::move(task_info); } + } else { + // TODO(guyang.sgy): Wait until a worker is pushed or a worker can be started If + // startup concurrency maxed out or job not started. + PopWorkerCallbackAsync(callback, nullptr, status); } return proc; }; @@ -919,11 +977,7 @@ std::shared_ptr WorkerPool::PopWorker( // There is an idle dedicated worker for this task. worker = std::move(it->second); state.idle_dedicated_workers.erase(it); - // Because we found a worker that can perform this task, - // we can remove it from dedicated_workers_to_tasks. - state.dedicated_workers_to_tasks.erase(worker->GetProcess()); - state.tasks_with_dedicated_workers.erase(task_spec.TaskId()); - } else if (!HasPendingWorkerForTask(task_spec.GetLanguage(), task_spec.TaskId())) { + } else { // We are not pending a registration from a worker for this task, // so start a new worker process for this task. std::vector dynamic_options = {}; @@ -933,34 +987,32 @@ std::shared_ptr WorkerPool::PopWorker( // create runtime env. if (task_spec.HasRuntimeEnv()) { - state.tasks_with_pending_runtime_envs.emplace(task_spec.TaskId()); agent_manager_->CreateRuntimeEnv( task_spec.JobId(), task_spec.SerializedRuntimeEnv(), - [this, start_worker_process_fn, &state, task_spec, dynamic_options, + [start_worker_process_fn, callback, &state, task_spec, dynamic_options, allocated_instances_serialized_json]( bool success, const std::string &serialized_runtime_env_context) { - state.tasks_with_pending_runtime_envs.erase(task_spec.TaskId()); if (success) { - start_worker_process_fn( - task_spec, state, dynamic_options, true, GetRuntimeEnvHash(task_spec), - task_spec.SerializedRuntimeEnv(), serialized_runtime_env_context); + start_worker_process_fn(task_spec, state, dynamic_options, true, + task_spec.SerializedRuntimeEnv(), + serialized_runtime_env_context, callback); } else { RAY_LOG(WARNING) << "Couldn't create a runtime environment for task " << task_spec.TaskId() << ". The runtime environment was " << task_spec.SerializedRuntimeEnv() << "."; - runtime_env_setup_failed_callback_(task_spec.TaskId()); + callback(nullptr, PopWorkerStatus::RuntimeEnvCreationFailed); } }); } else { - proc = - start_worker_process_fn(task_spec, state, dynamic_options, true, 0, "", ""); + start_worker_process_fn(task_spec, state, dynamic_options, true, "", "", + callback); } } } else { // Find an available worker which is already assigned to this job and which has // the specified runtime env. // Try to pop the most recently pushed worker. - const int runtime_env_hash = GetRuntimeEnvHash(task_spec); + const int runtime_env_hash = task_spec.GetRuntimeEnvHash(); for (auto it = idle_of_all_languages_.rbegin(); it != idle_of_all_languages_.rend(); it++) { if (task_spec.GetLanguage() != it->first->GetLanguage() || @@ -992,53 +1044,32 @@ std::shared_ptr WorkerPool::PopWorker( // There are no more non-actor workers available to execute this task. // Start a new worker process. if (task_spec.HasRuntimeEnv()) { - // Create runtime env. If the env creation is already in progress on this - // node, skip this to prevent unnecessary CreateRuntimeEnv calls, which would - // unnecessarily start new worker processes. - auto it = runtime_env_statuses_.find(runtime_env_hash); - if (it == runtime_env_statuses_.end() || it->second == RuntimeEnvStatus::DONE) { - if (it == runtime_env_statuses_.end()) { - runtime_env_statuses_[runtime_env_hash] = RuntimeEnvStatus::PENDING; - } - agent_manager_->CreateRuntimeEnv( - task_spec.JobId(), task_spec.SerializedRuntimeEnv(), - [this, start_worker_process_fn, &state, task_spec, runtime_env_hash]( - bool successful, const std::string &serialized_runtime_env_context) { - runtime_env_statuses_[runtime_env_hash] = RuntimeEnvStatus::DONE; - if (successful) { - start_worker_process_fn(task_spec, state, {}, false, runtime_env_hash, - task_spec.SerializedRuntimeEnv(), - serialized_runtime_env_context); - } else { - RAY_LOG(WARNING) - << "Couldn't create a runtime environment for task " - << task_spec.TaskId() << ". The runtime environment was " - << task_spec.SerializedRuntimeEnv() << "."; - runtime_env_setup_failed_callback_(task_spec.TaskId()); - } - }); - } else { - RAY_LOG(DEBUG) << "PopWorker called for task " << task_spec.TaskId() - << " but the desired runtime env " - << task_spec.SerializedRuntimeEnv() - << " is pending installation. " - "No worker process will be started in this call."; - } + // create runtime env. + agent_manager_->CreateRuntimeEnv( + task_spec.JobId(), task_spec.SerializedRuntimeEnv(), + [start_worker_process_fn, callback, &state, task_spec]( + bool successful, const std::string &serialized_runtime_env_context) { + if (successful) { + start_worker_process_fn(task_spec, state, {}, false, + task_spec.SerializedRuntimeEnv(), + serialized_runtime_env_context, callback); + } else { + RAY_LOG(WARNING) << "Couldn't create a runtime environment for task " + << task_spec.TaskId() << ". The runtime environment was " + << task_spec.SerializedRuntimeEnv() << "."; + callback(nullptr, PopWorkerStatus::RuntimeEnvCreationFailed); + } + }); } else { - proc = start_worker_process_fn(task_spec, state, {}, false, runtime_env_hash, "", - ""); + start_worker_process_fn(task_spec, state, {}, false, "", "", callback); } } } - if (worker == nullptr && proc.IsValid()) { - WarnAboutSize(); - } - if (worker) { RAY_CHECK(worker->GetAssignedJobId() == task_spec.JobId()); + PopWorkerCallbackAsync(callback, worker); } - return worker; } void WorkerPool::PrestartWorkers(const TaskSpecification &task_spec, @@ -1071,8 +1102,9 @@ void WorkerPool::PrestartWorkers(const TaskSpecification &task_spec, RAY_LOG(DEBUG) << "Prestarting " << num_needed << " workers given task backlog size " << backlog_size << " and soft limit " << num_workers_soft_limit_; for (int i = 0; i < num_needed; i++) { + PopWorkerStatus status; StartWorkerProcess(task_spec.GetLanguage(), rpc::WorkerType::WORKER, - task_spec.JobId()); + task_spec.JobId(), &status); } } } @@ -1223,17 +1255,6 @@ void WorkerPool::WarnAboutSize() { } } -bool WorkerPool::HasPendingWorkerForTask(const Language &language, - const TaskID &task_id) { - auto &state = GetStateForLanguage(language); - auto runtime_env_it = state.tasks_with_pending_runtime_envs.find(task_id); - if (runtime_env_it != state.tasks_with_pending_runtime_envs.end()) { - return true; - } - auto it = state.tasks_with_dedicated_workers.find(task_id); - return it != state.tasks_with_dedicated_workers.end(); -} - void WorkerPool::TryStartIOWorkers(const Language &language) { TryStartIOWorkers(language, rpc::WorkerType::RESTORE_WORKER); TryStartIOWorkers(language, rpc::WorkerType::SPILL_WORKER); @@ -1260,7 +1281,9 @@ void WorkerPool::TryStartIOWorkers(const Language &language, expected_workers_num = max_workers_to_start; } for (; expected_workers_num > 0; expected_workers_num--) { - Process proc = StartWorkerProcess(ray::Language::PYTHON, worker_type, JobID::Nil()); + PopWorkerStatus status; + Process proc = + StartWorkerProcess(ray::Language::PYTHON, worker_type, JobID::Nil(), &status); if (!proc.IsValid()) { // We may hit the maximum worker start up concurrency limit. Stop. return; diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 4572a42454a95..e93af6d59bcc6 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -18,6 +18,7 @@ #include #include +#include #include #include #include @@ -40,13 +41,30 @@ namespace raylet { using WorkerCommandMap = std::unordered_map, std::hash>; -enum class RuntimeEnvStatus { - /// This runtime env is currently being installed. - PENDING, - /// This runtime env has completed installation (either successfully or not) - DONE +enum PopWorkerStatus { + // OK. + // A registered worker will be returned with callback. + OK = 0, + // Job config is not found. + // A nullptr worker will be returned with callback. + JobConfigMissing = 1, + // Worker process startup rate is limited. + // A nullptr worker will be returned with callback. + TooManyStartingWorkerProcesses = 2, + // Worker process has been started, but the worker did not register at the raylet within + // the timeout. + // A nullptr worker will be returned with callback. + WorkerPendingRegistration = 3, + // Any fails of runtime env creation. + // A nullptr worker will be returned with callback. + RuntimeEnvCreationFailed = 4, }; +/// \Return true if the worker was used. Otherwise, return false and the worker will be +/// returned to the worker pool. +using PopWorkerCallback = std::function worker, PopWorkerStatus status)>; + /// \class WorkerPoolInterface /// /// Used for new scheduler unit tests. @@ -56,14 +74,25 @@ class WorkerPoolInterface { /// the worker back onto the pool once the worker has completed its work. /// /// \param task_spec The returned worker must be able to execute this task. + /// \param callback The callback function that executed when gets the result of + /// worker popping. + /// The callback will be executed with an empty worker in following cases: + /// Case 1: Job config not found. + /// Case 2: Worker process startup rate limited. + /// Case 3: Worker process has been started, but the worker registered back to raylet + /// timeout. + // Case 4: Any fails of runtime env creation. + /// Of course, the callback will also be executed when a valid worker found in following + /// cases: + /// Case 1: An suitable worker was found in idle worker pool. + /// Case 2: An suitable worker registered to raylet. /// \param allocated_instances_serialized_json The allocated resource instances /// json string, it contains resource ID which assigned to this worker. /// Instance resource value will be like {"GPU":[10000,0,10000]}, non-instance /// resource value will be {"CPU":20000}. - /// \return An idle worker with the requested task spec. Returns nullptr if no - /// such worker exists. - virtual std::shared_ptr PopWorker( - const TaskSpecification &task_spec, + /// \return Void. + virtual void PopWorker( + const TaskSpecification &task_spec, const PopWorkerCallback &callback, const std::string &allocated_instances_serialized_json = "{}") = 0; /// Add an idle worker to the pool. /// @@ -144,7 +173,6 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { std::shared_ptr gcs_client, const WorkerCommandMap &worker_commands, std::function starting_worker_timeout_callback, - std::function runtime_env_setup_failed_callback, int ray_debugger_external, const std::function get_time); /// Destructor responsible for freeing a set of workers owned by this class. @@ -194,6 +222,7 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// announces its port. /// /// \param[in] worker The worker which is started. + /// \return void void OnWorkerStarted(const std::shared_ptr &worker); /// Register a new driver. @@ -281,22 +310,12 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { void PushUtilWorker(const std::shared_ptr &worker); void PopUtilWorker(std::function)> callback); - /// Add an idle worker to the pool. - /// - /// \param The idle worker to add. + /// See interface. void PushWorker(const std::shared_ptr &worker); - /// Pop an idle worker from the pool. The caller is responsible for pushing - /// the worker back onto the pool once the worker has completed its work. - /// - /// \param task_spec The returned worker must be able to execute this task. - /// \param allocated_instances_serialized_json The allocated resouce instances - /// json string. - /// \return An idle worker with the requested task spec. Returns nullptr if no - /// such worker exists. - std::shared_ptr PopWorker( - const TaskSpecification &task_spec, - const std::string &allocated_instances_serialized_json = "{}"); + /// See interface. + void PopWorker(const TaskSpecification &task_spec, const PopWorkerCallback &callback, + const std::string &allocated_instances_serialized_json = "{}"); /// Try to prestart a number of workers suitable the given task spec. Prestarting /// is needed since core workers request one lease at a time, if starting is slow, @@ -369,7 +388,9 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// worker pool abstraction. Outside this class, workers /// will have rpc::WorkerType instead. /// \param job_id The ID of the job to which the started worker process belongs. + /// \param status The output status of work process starting. /// \param dynamic_options The dynamic options that we should add for worker command. + /// \param runtime_env_hash The hash of runtime env. /// \param serialized_runtime_env The runtime environment for the started worker /// \param allocated_instances_serialized_json The allocated resource instances // json string. @@ -377,6 +398,7 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// it means we didn't start a process. Process StartWorkerProcess( const Language &language, const rpc::WorkerType worker_type, const JobID &job_id, + PopWorkerStatus *status /*output*/, const std::vector &dynamic_options = {}, const int runtime_env_hash = 0, const std::string &serialized_runtime_env = "{}", std::unordered_map override_environment_variables = {}, @@ -397,6 +419,11 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// Push an warning message to user if worker pool is getting to big. virtual void WarnAboutSize(); + /// Make this synchronized function for unit test. + void PopWorkerCallbackInternal(const PopWorkerCallback &callback, + std::shared_ptr worker, + PopWorkerStatus status); + struct IOWorkerState { /// The pool of idle I/O workers. std::queue> idle_io_workers; @@ -419,6 +446,13 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { rpc::WorkerType worker_type; }; + struct TaskWaitingForWorkerInfo { + /// The id of task. + TaskID task_id; + /// The callback function which should be called when worker registered. + PopWorkerCallback callback; + }; + /// An internal data structure that maintains the pool state per language. struct State { /// The commands and arguments used to start the worker process @@ -446,13 +480,13 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// the process. The shim process PID is the same with worker process PID, except /// starting worker process in container. std::unordered_map starting_worker_processes; + /// A map for looking up the task by the pid of starting worker process. + std::unordered_map starting_workers_to_tasks; /// A map for looking up the task with dynamic options by the pid of - /// worker. Note that this is used for the dedicated worker processes. - std::unordered_map dedicated_workers_to_tasks; - /// All tasks that have associated dedicated workers. - std::unordered_set tasks_with_dedicated_workers; - /// All tasks that have pending runtime envs. - std::unordered_set tasks_with_pending_runtime_envs; + /// starting worker process. Note that this is used for the dedicated worker + /// processes. + std::unordered_map + starting_dedicated_workers_to_tasks; /// We'll push a warning to the user every time a multiple of this many /// worker processes has been started. int multiple_for_warning; @@ -535,6 +569,30 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// worker types (SPILL_WORKER and RESTORE_WORKER and UTIL_WORKER). bool IsIOWorkerType(const rpc::WorkerType &worker_type); + /// Call the `PopWorkerCallback` function asynchronously to make sure executed in + /// different stack. + virtual void PopWorkerCallbackAsync(const PopWorkerCallback &callback, + std::shared_ptr worker, + PopWorkerStatus status = PopWorkerStatus::OK); + + /// Try to find a task that is associated with the given worker process from the given + /// queue. If found, invoke its PopWorkerCallback. + /// \param workers_to_tasks The queue of tasks which waiting for workers. + /// \param proc The process which the worker belongs to. + /// \param worker A new idle worker. If the worker is empty, we could also callback + /// to the task. + /// \param status The pop worker status which will be forwarded to + /// `PopWorkerCallback`. + /// \param found Whether the related task found or not. + /// \param worker_used Whether the worker is used by the task, only valid when found is + /// true. + /// \param task_id The related task id. + void InvokePopWorkerCallbackForProcess( + std::unordered_map &workers_to_tasks, + const Process &proc, const std::shared_ptr &worker, + const PopWorkerStatus &status, bool *found /* output */, + bool *worker_used /* output */, TaskID *task_id /* output */); + /// For Process class for managing subprocesses (e.g. reaping zombies). instrumented_io_context *io_service_; /// Node ID of the current node. @@ -582,9 +640,6 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface { /// Set of jobs whose drivers have exited. absl::flat_hash_set finished_jobs_; - /// Maps runtime env hash to its status. - std::unordered_map runtime_env_statuses_; - /// This map stores the same data as `idle_of_all_languages_`, but in a map structure /// for lookup performance. std::unordered_map, int64_t> diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index 5149aaca1c3c7..70935d6e67ede 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -31,6 +31,7 @@ int MAXIMUM_STARTUP_CONCURRENCY = 5; int MAX_IO_WORKER_SIZE = 2; int POOL_SIZE_SOFT_LIMIT = 5; JobID JOB_ID = JobID::FromInt(1); +std::string BAD_RUNTIME_ENV = "bad runtime env"; std::vector LANGUAGES = {Language::PYTHON, Language::JAVA}; @@ -80,7 +81,11 @@ class MockRuntimeEnvAgentClient : public rpc::RuntimeEnvAgentClientInterface { void CreateRuntimeEnv(const rpc::CreateRuntimeEnvRequest &request, const rpc::ClientCallback &callback) { rpc::CreateRuntimeEnvReply reply; - reply.set_status(rpc::AGENT_RPC_STATUS_OK); + if (request.serialized_runtime_env() == BAD_RUNTIME_ENV) { + reply.set_status(rpc::AGENT_RPC_STATUS_FAILED); + } else { + reply.set_status(rpc::AGENT_RPC_STATUS_OK); + } callback(Status::OK(), reply); }; @@ -92,38 +97,20 @@ class MockRuntimeEnvAgentClient : public rpc::RuntimeEnvAgentClientInterface { }; }; -class MockAgentManager : public AgentManager { - public: - MockAgentManager(AgentManager::Options options, DelayExecutorFn delay_executor, - RuntimeEnvAgentClientFactoryFn runtime_env_agent_client_factory, - bool start_agent = true) - : AgentManager(options, delay_executor, runtime_env_agent_client_factory, - start_agent){}; - void CreateRuntimeEnv(const JobID &job_id, const std::string &serialized_runtime_env, - CreateRuntimeEnvCallback callback) override { - queued_callbacks.push(callback); - }; - - void PopAndInvokeCallback() { - if (queued_callbacks.size() > 0) { - CreateRuntimeEnvCallback callback = queued_callbacks.front(); - queued_callbacks.pop(); - callback(/*success=*/true, /*serialized_runtime_env_context=*/""); - } - } - - std::queue queued_callbacks; -}; - class WorkerPoolMock : public WorkerPool { public: explicit WorkerPoolMock(instrumented_io_context &io_service, - const WorkerCommandMap &worker_commands) + const WorkerCommandMap &worker_commands, + absl::flat_hash_map> + &mock_worker_rpc_clients) : WorkerPool(io_service, NodeID::FromRandom(), "", POOL_SIZE_SOFT_LIMIT, 0, MAXIMUM_STARTUP_CONCURRENCY, 0, 0, {}, nullptr, worker_commands, - []() {}, [](const TaskID &) {}, 0, - [this]() { return current_time_ms_; }), - last_worker_process_() { + []() {}, 0, [this]() { return current_time_ms_; }), + last_worker_process_(), + instrumented_io_service_(io_service), + error_message_type_(1), + client_call_manager_(instrumented_io_service_), + mock_worker_rpc_clients_(mock_worker_rpc_clients) { SetNodeManagerPort(1); } @@ -134,6 +121,15 @@ class WorkerPoolMock : public WorkerPool { using WorkerPool::StartWorkerProcess; // we need this to be public for testing + using WorkerPool::PopWorkerCallbackInternal; + + // Mock `PopWorkerCallbackAsync` to synchronized function. + void PopWorkerCallbackAsync(const PopWorkerCallback &callback, + std::shared_ptr worker, + PopWorkerStatus status = PopWorkerStatus::OK) override { + PopWorkerCallbackInternal(callback, worker, status); + } + Process StartProcess(const std::vector &worker_command_args, const ProcessEnvironment &env) override { // Use a bogus process ID that won't conflict with those in the system @@ -185,6 +181,8 @@ class WorkerPoolMock : public WorkerPool { return worker_commands_by_proc_; } + void ClearProcesses() { worker_commands_by_proc_.clear(); } + void SetCurrentTimeMs(double current_time) { current_time_ms_ = current_time; } size_t GetIdleWorkerSize() { return idle_of_all_languages_.size(); } @@ -193,24 +191,6 @@ class WorkerPoolMock : public WorkerPool { return idle_of_all_languages_; } - private: - Process last_worker_process_; - // The worker commands by process. - std::unordered_map> worker_commands_by_proc_; - double current_time_ms_ = 0; -}; - -class WorkerPoolTest : public ::testing::Test { - public: - WorkerPoolTest() : error_message_type_(1), client_call_manager_(io_service_) { - RayConfig::instance().initialize( - R"({"object_spilling_config": "dummy", "max_io_workers": )" + - std::to_string(MAX_IO_WORKER_SIZE) + "}"); - SetWorkerCommands({{Language::PYTHON, {"dummy_py_worker_command"}}, - {Language::JAVA, - {"java", "RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER", "MainClass"}}}); - } - std::shared_ptr CreateWorker( Process proc, const Language &language = Language::PYTHON, const JobID &job_id = JOB_ID, @@ -225,7 +205,7 @@ class WorkerPoolTest : public ::testing::Test { const std::vector &message) { HandleMessage(client, message_type, message); }; - local_stream_socket socket(io_service_); + local_stream_socket socket(instrumented_io_service_); auto client = ClientConnection::Create(client_handler, message_handler, std::move(socket), "worker", {}, error_message_type_); @@ -234,7 +214,7 @@ class WorkerPoolTest : public ::testing::Test { "127.0.0.1", client, client_call_manager_); std::shared_ptr worker = std::dynamic_pointer_cast(worker_); - auto rpc_client = std::make_shared(io_service_); + auto rpc_client = std::make_shared(instrumented_io_service_); worker->Connect(rpc_client); mock_worker_rpc_clients_.emplace(worker->WorkerId(), rpc_client); if (!proc.IsNull()) { @@ -244,27 +224,148 @@ class WorkerPoolTest : public ::testing::Test { return worker; } + void PushAvailableWorker(const std::shared_ptr &worker) { + if (worker->GetWorkerType() == rpc::WorkerType::SPILL_WORKER) { + PushSpillWorker(worker); + return; + } + if (worker->GetWorkerType() == rpc::WorkerType::RESTORE_WORKER) { + PushRestoreWorker(worker); + return; + } + if (worker->GetWorkerType() == rpc::WorkerType::UTIL_WORKER) { + PushUtilWorker(worker); + return; + } + PushWorker(worker); + } + + // Create workers for processes and push them to worker pool. + void PushWorkers() { + auto processes = GetProcesses(); + for (auto it = processes.begin(); it != processes.end(); ++it) { + auto pushed_it = pushedProcesses_.find(it->first); + if (pushed_it == pushedProcesses_.end()) { + int runtime_env_hash = 0; + bool is_java = false; + // Parses runtime env hash to make sure the pushed workers can be popped out. + for (auto command_args : it->second) { + std::string runtime_env_key = "--runtime-env-hash="; + auto pos = command_args.find(runtime_env_key); + if (pos != std::string::npos) { + runtime_env_hash = + std::stoi(command_args.substr(pos + runtime_env_key.size())); + } + pos = command_args.find("java"); + if (pos != std::string::npos) { + is_java = true; + } + } + // TODO(guyang.sgy): support C++ language workers. + int num_workers = is_java ? NUM_WORKERS_PER_PROCESS_JAVA : 1; + for (int i = 0; i < num_workers; i++) { + auto worker = + CreateWorker(it->first, is_java ? Language::JAVA : Language::PYTHON, JOB_ID, + rpc::WorkerType::WORKER, runtime_env_hash); + OnWorkerStarted(worker); + PushAvailableWorker(worker); + } + pushedProcesses_[it->first] = it->second; + } + } + } + + // We have mocked worker starting and runtime env creation to make the execution of pop + // worker synchronously. + // \param[in] push_workers If true, tries to push the workers from the started + // processes. + std::shared_ptr PopWorkerSync( + const TaskSpecification &task_spec, bool push_workers = true, + PopWorkerStatus *worker_status = nullptr) { + std::shared_ptr popped_worker = nullptr; + std::promise promise; + this->PopWorker(task_spec, + [&popped_worker, worker_status, &promise]( + const std::shared_ptr worker, + PopWorkerStatus status) -> bool { + popped_worker = worker; + if (worker_status != nullptr) { + *worker_status = status; + } + promise.set_value(true); + return true; + }); + if (push_workers) { + PushWorkers(); + } + promise.get_future().get(); + return popped_worker; + } + + private: + Process last_worker_process_; + // The worker commands by process. + std::unordered_map> worker_commands_by_proc_; + double current_time_ms_ = 0; + std::unordered_map> pushedProcesses_; + instrumented_io_context &instrumented_io_service_; + int64_t error_message_type_; + rpc::ClientCallManager client_call_manager_; + absl::flat_hash_map> + &mock_worker_rpc_clients_; + void HandleNewClient(ClientConnection &){}; + void HandleMessage(std::shared_ptr, int64_t, + const std::vector &){}; +}; + +class WorkerPoolTest : public ::testing::Test { + public: + WorkerPoolTest() { + RayConfig::instance().initialize( + R"({"worker_register_timeout_seconds": 3, "object_spilling_config": "dummy", "max_io_workers": )" + + std::to_string(MAX_IO_WORKER_SIZE) + "}"); + SetWorkerCommands({{Language::PYTHON, {"dummy_py_worker_command"}}, + {Language::JAVA, + {"java", "RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER", "MainClass"}}}); + std::promise promise; + thread_io_service_.reset(new std::thread([this, &promise] { + std::unique_ptr work( + new boost::asio::io_service::work(io_service_)); + promise.set_value(true); + io_service_.run(); + })); + promise.get_future().get(); + StartMockAgent(); + } + + ~WorkerPoolTest() { + io_service_.stop(); + thread_io_service_->join(); + } + std::shared_ptr CreateSpillWorker(Process proc) { - return CreateWorker(proc, Language::PYTHON, JobID::Nil(), - rpc::WorkerType::SPILL_WORKER); + return worker_pool_->CreateWorker(proc, Language::PYTHON, JobID::Nil(), + rpc::WorkerType::SPILL_WORKER); } std::shared_ptr CreateRestoreWorker(Process proc) { - return CreateWorker(proc, Language::PYTHON, JobID::Nil(), - rpc::WorkerType::RESTORE_WORKER); + return worker_pool_->CreateWorker(proc, Language::PYTHON, JobID::Nil(), + rpc::WorkerType::RESTORE_WORKER); } std::shared_ptr RegisterDriver( const Language &language = Language::PYTHON, const JobID &job_id = JOB_ID, const rpc::JobConfig &job_config = rpc::JobConfig()) { - auto driver = CreateWorker(Process::CreateNewDummy(), Language::PYTHON, job_id); + auto driver = + worker_pool_->CreateWorker(Process::CreateNewDummy(), Language::PYTHON, job_id); driver->AssignTaskId(TaskID::ForDriverTask(job_id)); RAY_CHECK_OK(worker_pool_->RegisterDriver(driver, job_config, [](Status, int) {})); return driver; } void SetWorkerCommands(const WorkerCommandMap &worker_commands) { - worker_pool_ = std::make_unique(io_service_, worker_commands); + worker_pool_ = std::make_unique(io_service_, worker_commands, + mock_worker_rpc_clients_); rpc::JobConfig job_config; job_config.set_num_java_workers_per_process(NUM_WORKERS_PER_PROCESS_JAVA); RegisterDriver(Language::PYTHON, JOB_ID, job_config); @@ -278,7 +379,9 @@ class WorkerPoolTest : public ::testing::Test { static_cast(desired_initial_worker_process_count)); Process last_started_worker_process; for (int i = 0; i < desired_initial_worker_process_count; i++) { - worker_pool_->StartWorkerProcess(language, rpc::WorkerType::WORKER, JOB_ID); + PopWorkerStatus status; + worker_pool_->StartWorkerProcess(language, rpc::WorkerType::WORKER, JOB_ID, + &status); ASSERT_TRUE(worker_pool_->NumWorkerProcessesStarting() <= expected_worker_process_count); Process prev = worker_pool_->LastStartedWorkerProcess(); @@ -326,60 +429,32 @@ class WorkerPoolTest : public ::testing::Test { worker_pool_->SetAgentManager(agent_manager); } - // Create workers for processes and push them to worker pool. - void PushWorkers() { - auto processes = worker_pool_->GetProcesses(); - for (auto it = processes.begin(); it != processes.end(); ++it) { - auto pushed_it = pushedProcesses_.find(it->first); - if (pushed_it == pushedProcesses_.end()) { - int runtime_env_hash = 0; - // Parses runtime env hash to make sure the pushed workers can be popped out. - for (auto command_args : it->second) { - std::string runtime_env_key = "--runtime-env-hash="; - auto pos = command_args.find(runtime_env_key); - if (pos != std::string::npos) { - runtime_env_hash = - std::stoi(command_args.substr(pos + runtime_env_key.size())); - } - } - worker_pool_->PushWorker(CreateWorker(it->first, Language::PYTHON, JOB_ID, - rpc::WorkerType::WORKER, runtime_env_hash)); - pushedProcesses_[it->first] = it->second; - } - } - } - absl::flat_hash_map> mock_worker_rpc_clients_; protected: instrumented_io_context io_service_; + std::unique_ptr thread_io_service_; std::unique_ptr worker_pool_; - int64_t error_message_type_; - rpc::ClientCallManager client_call_manager_; - std::unordered_map> pushedProcesses_; - - private: - void HandleNewClient(ClientConnection &){}; - void HandleMessage(std::shared_ptr, int64_t, - const std::vector &){}; }; static inline TaskSpecification ExampleTaskSpec( const ActorID actor_id = ActorID::Nil(), const Language &language = Language::PYTHON, const JobID &job_id = JOB_ID, const ActorID actor_creation_id = ActorID::Nil(), const std::vector &dynamic_worker_options = {}, - const TaskID &task_id = TaskID::Nil(), + const TaskID &task_id = TaskID::ForFakeTask(), const std::string serialized_runtime_env = "") { rpc::TaskSpec message; message.set_job_id(job_id.Binary()); message.set_language(language); + // Make sure no reduplicative task id. + RAY_CHECK(!task_id.IsNil()); + message.set_task_id(task_id.Binary()); if (!actor_id.IsNil()) { message.set_type(TaskType::ACTOR_TASK); message.mutable_actor_task_spec()->set_actor_id(actor_id.Binary()); } else if (!actor_creation_id.IsNil()) { message.set_type(TaskType::ACTOR_CREATION_TASK); - message.set_task_id(task_id.Binary()); message.mutable_actor_creation_task_spec()->set_actor_id(actor_creation_id.Binary()); for (const auto &option : dynamic_worker_options) { message.mutable_actor_creation_task_spec()->add_dynamic_worker_options(option); @@ -406,11 +481,12 @@ TEST_F(WorkerPoolTest, CompareWorkerProcessObjects) { } TEST_F(WorkerPoolTest, HandleWorkerRegistration) { - Process proc = - worker_pool_->StartWorkerProcess(Language::JAVA, rpc::WorkerType::WORKER, JOB_ID); + PopWorkerStatus status; + Process proc = worker_pool_->StartWorkerProcess(Language::JAVA, rpc::WorkerType::WORKER, + JOB_ID, &status); std::vector> workers; for (int i = 0; i < NUM_WORKERS_PER_PROCESS_JAVA; i++) { - workers.push_back(CreateWorker(Process(), Language::JAVA)); + workers.push_back(worker_pool_->CreateWorker(Process(), Language::JAVA)); } for (const auto &worker : workers) { // Check that there's still a starting worker process @@ -435,7 +511,7 @@ TEST_F(WorkerPoolTest, HandleWorkerRegistration) { } TEST_F(WorkerPoolTest, HandleUnknownWorkerRegistration) { - auto worker = CreateWorker(Process(), Language::PYTHON); + auto worker = worker_pool_->CreateWorker(Process(), Language::PYTHON); auto status = worker_pool_->RegisterWorker(worker, 1234, 1234, [](Status, int) {}); ASSERT_FALSE(status.ok()); } @@ -474,48 +550,48 @@ TEST_F(WorkerPoolTest, TestPrestartingWorkers) { } TEST_F(WorkerPoolTest, HandleWorkerPushPop) { - // Try to pop a worker from the empty pool and make sure we don't get one. std::shared_ptr popped_worker; const auto task_spec = ExampleTaskSpec(); - popped_worker = worker_pool_->PopWorker(task_spec); - ASSERT_EQ(popped_worker, nullptr); - // Create some workers. std::unordered_set> workers; - workers.insert(CreateWorker(Process::CreateNewDummy())); - workers.insert(CreateWorker(Process::CreateNewDummy())); + workers.insert(worker_pool_->CreateWorker(Process::CreateNewDummy())); + workers.insert(worker_pool_->CreateWorker(Process::CreateNewDummy())); // Add the workers to the pool. for (auto &worker : workers) { worker_pool_->PushWorker(worker); } - // Pop two workers and make sure they're one of the workers we created. - popped_worker = worker_pool_->PopWorker(task_spec); + popped_worker = worker_pool_->PopWorkerSync(task_spec); ASSERT_NE(popped_worker, nullptr); ASSERT_TRUE(workers.count(popped_worker) > 0); - popped_worker = worker_pool_->PopWorker(task_spec); + popped_worker = worker_pool_->PopWorkerSync(task_spec); ASSERT_NE(popped_worker, nullptr); ASSERT_TRUE(workers.count(popped_worker) > 0); - popped_worker = worker_pool_->PopWorker(task_spec); - ASSERT_EQ(popped_worker, nullptr); + // Pop a worker from the empty pool and make sure it isn't one of the workers we + // created. + popped_worker = worker_pool_->PopWorkerSync(task_spec); + ASSERT_NE(popped_worker, nullptr); + ASSERT_TRUE(workers.count(popped_worker) == 0); } -TEST_F(WorkerPoolTest, PopWorkersOfMultipleLanguages) { +TEST_F(WorkerPoolTest, PopWorkerSyncsOfMultipleLanguages) { // Create a Python Worker, and add it to the pool - auto py_worker = CreateWorker(Process::CreateNewDummy(), Language::PYTHON); + auto py_worker = + worker_pool_->CreateWorker(Process::CreateNewDummy(), Language::PYTHON); worker_pool_->PushWorker(py_worker); - // Check that no worker will be popped if the given task is a Java task + // Check that the Python worker will not be popped if the given task is a Java task const auto java_task_spec = ExampleTaskSpec(ActorID::Nil(), Language::JAVA); - ASSERT_EQ(worker_pool_->PopWorker(java_task_spec), nullptr); - // Check that the worker can be popped if the given task is a Python task + ASSERT_NE(worker_pool_->PopWorkerSync(java_task_spec), py_worker); + // Check that the Python worker can be popped if the given task is a Python task const auto py_task_spec = ExampleTaskSpec(ActorID::Nil(), Language::PYTHON); - ASSERT_NE(worker_pool_->PopWorker(py_task_spec), nullptr); + ASSERT_EQ(worker_pool_->PopWorkerSync(py_task_spec), py_worker); // Create a Java Worker, and add it to the pool - auto java_worker = CreateWorker(Process::CreateNewDummy(), Language::JAVA); + auto java_worker = + worker_pool_->CreateWorker(Process::CreateNewDummy(), Language::JAVA); worker_pool_->PushWorker(java_worker); - // Check that the worker will be popped now for Java task - ASSERT_NE(worker_pool_->PopWorker(java_task_spec), nullptr); + // Check that the Java worker will be popped now for Java task + ASSERT_EQ(worker_pool_->PopWorkerSync(java_task_spec), java_worker); } TEST_F(WorkerPoolTest, StartWorkerWithDynamicOptionsCommand) { @@ -537,7 +613,7 @@ TEST_F(WorkerPoolTest, StartWorkerWithDynamicOptionsCommand) { job_config.add_jvm_options("-Dmy-job.foo=bar"); worker_pool_->HandleJobStarted(JOB_ID, job_config); - ASSERT_EQ(worker_pool_->PopWorker(task_spec), nullptr); + ASSERT_NE(worker_pool_->PopWorkerSync(task_spec), nullptr); const auto real_command = worker_pool_->GetWorkerCommand(worker_pool_->LastStartedWorkerProcess()); @@ -575,7 +651,8 @@ TEST_F(WorkerPoolTest, PopWorkerMultiTenancy) { // Register 2 workers for each job. for (auto job_id : job_ids) { for (int i = 0; i < 2; i++) { - auto worker = CreateWorker(Process::CreateNewDummy(), Language::PYTHON, job_id); + auto worker = + worker_pool_->CreateWorker(Process::CreateNewDummy(), Language::PYTHON, job_id); worker_pool_->PushWorker(worker); } } @@ -589,7 +666,7 @@ TEST_F(WorkerPoolTest, PopWorkerMultiTenancy) { // Pop workers for actor creation tasks. auto task_spec = ExampleTaskSpec(/*actor_id=*/ActorID::Nil(), Language::PYTHON, job_id, actor_creation_id); - auto worker = worker_pool_->PopWorker(task_spec); + auto worker = worker_pool_->PopWorkerSync(task_spec); ASSERT_TRUE(worker); ASSERT_EQ(worker->GetAssignedJobId(), job_id); workers.push_back(worker); @@ -598,7 +675,7 @@ TEST_F(WorkerPoolTest, PopWorkerMultiTenancy) { // Pop workers for normal tasks. for (auto job_id : job_ids) { auto task_spec = ExampleTaskSpec(ActorID::Nil(), Language::PYTHON, job_id); - auto worker = worker_pool_->PopWorker(task_spec); + auto worker = worker_pool_->PopWorkerSync(task_spec); ASSERT_TRUE(worker); ASSERT_EQ(worker->GetAssignedJobId(), job_id); workers.push_back(worker); @@ -624,8 +701,9 @@ TEST_F(WorkerPoolTest, MaximumStartupConcurrency) { // Try to pop some workers. Some worker processes will be started. for (int i = 0; i < MAXIMUM_STARTUP_CONCURRENCY; i++) { - auto worker = worker_pool_->PopWorker(task_spec); - RAY_CHECK(!worker); + worker_pool_->PopWorker(task_spec, + [](const std::shared_ptr worker, + PopWorkerStatus status) -> bool { return true; }); auto last_process = worker_pool_->LastStartedWorkerProcess(); RAY_CHECK(last_process.IsValid()); started_processes.push_back(last_process); @@ -633,13 +711,15 @@ TEST_F(WorkerPoolTest, MaximumStartupConcurrency) { // Can't start a new worker process at this point. ASSERT_EQ(MAXIMUM_STARTUP_CONCURRENCY, worker_pool_->NumWorkerProcessesStarting()); - RAY_CHECK(!worker_pool_->PopWorker(task_spec)); + worker_pool_->PopWorker(task_spec, + [](const std::shared_ptr worker, + PopWorkerStatus status) -> bool { return true; }); ASSERT_EQ(MAXIMUM_STARTUP_CONCURRENCY, worker_pool_->NumWorkerProcessesStarting()); std::vector> workers; // Call `RegisterWorker` to emulate worker registration. for (const auto &process : started_processes) { - auto worker = CreateWorker(Process()); + auto worker = worker_pool_->CreateWorker(Process()); RAY_CHECK_OK(worker_pool_->RegisterWorker(worker, process.GetId(), process.GetId(), [](Status, int) {})); // Calling `RegisterWorker` won't affect the counter of starting worker processes. @@ -649,7 +729,9 @@ TEST_F(WorkerPoolTest, MaximumStartupConcurrency) { // Can't start a new worker process at this point. ASSERT_EQ(MAXIMUM_STARTUP_CONCURRENCY, worker_pool_->NumWorkerProcessesStarting()); - RAY_CHECK(!worker_pool_->PopWorker(task_spec)); + worker_pool_->PopWorker(task_spec, + [](const std::shared_ptr worker, + PopWorkerStatus status) -> bool { return true; }); ASSERT_EQ(MAXIMUM_STARTUP_CONCURRENCY, worker_pool_->NumWorkerProcessesStarting()); // Call `OnWorkerStarted` to emulate worker port announcement. @@ -661,6 +743,7 @@ TEST_F(WorkerPoolTest, MaximumStartupConcurrency) { } ASSERT_EQ(0, worker_pool_->NumWorkerProcessesStarting()); + worker_pool_->ClearProcesses(); } TEST_F(WorkerPoolTest, HandleIOWorkersPushPop) { @@ -885,10 +968,11 @@ TEST_F(WorkerPoolTest, DeleteWorkerPushPop) { TEST_F(WorkerPoolTest, NoPopOnCrashedWorkerProcess) { // Start a Java worker process. - Process proc = - worker_pool_->StartWorkerProcess(Language::JAVA, rpc::WorkerType::WORKER, JOB_ID); - auto worker1 = CreateWorker(Process(), Language::JAVA); - auto worker2 = CreateWorker(Process(), Language::JAVA); + PopWorkerStatus status; + Process proc = worker_pool_->StartWorkerProcess(Language::JAVA, rpc::WorkerType::WORKER, + JOB_ID, &status); + auto worker1 = worker_pool_->CreateWorker(Process(), Language::JAVA); + auto worker2 = worker_pool_->CreateWorker(Process(), Language::JAVA); // We now imitate worker process crashing while core worker initializing. @@ -914,7 +998,8 @@ TEST_F(WorkerPoolTest, NoPopOnCrashedWorkerProcess) { // 5. Let's try to pop a worker to execute a task. Worker 2 shouldn't be popped because // the process has crashed. const auto task_spec = ExampleTaskSpec(); - ASSERT_EQ(worker_pool_->PopWorker(task_spec), nullptr); + ASSERT_NE(worker_pool_->PopWorkerSync(task_spec), worker1); + ASSERT_NE(worker_pool_->PopWorkerSync(task_spec), worker2); // 6. Now Raylet disconnects with worker 2. worker_pool_->DisconnectWorker( @@ -933,9 +1018,10 @@ TEST_F(WorkerPoolTest, TestWorkerCapping) { std::vector> workers; int num_workers = POOL_SIZE_SOFT_LIMIT + 2; for (int i = 0; i < num_workers; i++) { - Process proc = worker_pool_->StartWorkerProcess(Language::PYTHON, - rpc::WorkerType::WORKER, job_id); - auto worker = CreateWorker(Process(), Language::PYTHON, job_id); + PopWorkerStatus status; + Process proc = worker_pool_->StartWorkerProcess( + Language::PYTHON, rpc::WorkerType::WORKER, job_id, &status); + auto worker = worker_pool_->CreateWorker(Process(), Language::PYTHON, job_id); workers.push_back(worker); RAY_CHECK_OK(worker_pool_->RegisterWorker(worker, proc.GetId(), proc.GetId(), [](Status, int) {})); @@ -943,7 +1029,6 @@ TEST_F(WorkerPoolTest, TestWorkerCapping) { ASSERT_EQ(worker_pool_->GetRegisteredWorker(worker->Connection()), worker); worker_pool_->PushWorker(worker); } - /// /// Pop 2 workers for a task and actor. /// @@ -954,7 +1039,7 @@ TEST_F(WorkerPoolTest, TestWorkerCapping) { // Pop workers for actor creation tasks. auto task_spec = ExampleTaskSpec(/*actor_id=*/ActorID::Nil(), Language::PYTHON, job_id, actor_creation_id); - auto worker = worker_pool_->PopWorker(task_spec); + auto worker = worker_pool_->PopWorkerSync(task_spec, false); popped_workers.push_back(worker); ASSERT_TRUE(worker); ASSERT_EQ(worker->GetAssignedJobId(), job_id); @@ -1025,8 +1110,9 @@ TEST_F(WorkerPoolTest, TestWorkerCapping) { // Start two IO workers. These don't count towards the limit. { + PopWorkerStatus status; Process proc = worker_pool_->StartWorkerProcess( - Language::PYTHON, rpc::WorkerType::SPILL_WORKER, job_id); + Language::PYTHON, rpc::WorkerType::SPILL_WORKER, job_id, &status); auto worker = CreateSpillWorker(Process()); RAY_CHECK_OK(worker_pool_->RegisterWorker(worker, proc.GetId(), proc.GetId(), [](Status, int) {})); @@ -1035,8 +1121,9 @@ TEST_F(WorkerPoolTest, TestWorkerCapping) { worker_pool_->PushSpillWorker(worker); } { + PopWorkerStatus status; Process proc = worker_pool_->StartWorkerProcess( - Language::PYTHON, rpc::WorkerType::RESTORE_WORKER, job_id); + Language::PYTHON, rpc::WorkerType::RESTORE_WORKER, job_id, &status); auto worker = CreateRestoreWorker(Process()); RAY_CHECK_OK(worker_pool_->RegisterWorker(worker, proc.GetId(), proc.GetId(), [](Status, int) {})); @@ -1057,6 +1144,7 @@ TEST_F(WorkerPoolTest, TestWorkerCapping) { worker_pool_->PopSpillWorker(callback); worker_pool_->PopRestoreWorker(callback); ASSERT_EQ(num_callbacks, 2); + worker_pool_->ClearProcesses(); } TEST_F(WorkerPoolTest, TestWorkerCappingLaterNWorkersNotOwningObjects) { @@ -1075,9 +1163,10 @@ TEST_F(WorkerPoolTest, TestWorkerCappingLaterNWorkersNotOwningObjects) { std::vector> workers; int num_workers = POOL_SIZE_SOFT_LIMIT * 2; for (int i = 0; i < num_workers; i++) { - Process proc = worker_pool_->StartWorkerProcess(Language::PYTHON, - rpc::WorkerType::WORKER, job_id); - auto worker = CreateWorker(Process(), Language::PYTHON, job_id); + PopWorkerStatus status; + Process proc = worker_pool_->StartWorkerProcess( + Language::PYTHON, rpc::WorkerType::WORKER, job_id, &status); + auto worker = worker_pool_->CreateWorker(Process(), Language::PYTHON, job_id); workers.push_back(worker); RAY_CHECK_OK(worker_pool_->RegisterWorker(worker, proc.GetId(), proc.GetId(), [](Status, int) {})); @@ -1122,46 +1211,36 @@ TEST_F(WorkerPoolTest, TestWorkerCappingLaterNWorkersNotOwningObjects) { } TEST_F(WorkerPoolTest, PopWorkerWithRuntimeEnv) { - StartMockAgent(); ASSERT_EQ(worker_pool_->GetProcessSize(), 0); auto actor_creation_id = ActorID::Of(JOB_ID, TaskID::ForDriverTask(JOB_ID), 1); const auto actor_creation_task_spec = ExampleTaskSpec(ActorID::Nil(), Language::PYTHON, JOB_ID, actor_creation_id, - {"XXX=YYY"}, TaskID::Nil(), "{\"uris\": \"XXX\"}"); + {"XXX=YYY"}, TaskID::ForFakeTask(), "{\"uris\": \"XXX\"}"); const auto normal_task_spec = ExampleTaskSpec(ActorID::Nil(), Language::PYTHON, JOB_ID, ActorID::Nil(), - {"XXX=YYY"}, TaskID::Nil(), "{\"uris\": \"XXX\"}"); - const auto normal_task_spec_without_runtime_env = ExampleTaskSpec( - ActorID::Nil(), Language::PYTHON, JOB_ID, ActorID::Nil(), {}, TaskID::Nil(), ""); - // Pop worker for actor creation task. - auto popped_worker = worker_pool_->PopWorker(actor_creation_task_spec); - // No idle workers. - ASSERT_EQ(popped_worker, nullptr); - ASSERT_EQ(worker_pool_->GetProcessSize(), 1); - // Push new process to worker pool. - PushWorkers(); - // Pop worker for normal task. - popped_worker = worker_pool_->PopWorker(normal_task_spec); - // No idle workers for normal task. - ASSERT_EQ(popped_worker, nullptr); - // Pop worker for normal task without runtime env. - popped_worker = worker_pool_->PopWorker(normal_task_spec_without_runtime_env); - // No idle workers for normal task without runtime env. - ASSERT_EQ(popped_worker, nullptr); + {"XXX=YYY"}, TaskID::ForFakeTask(), "{\"uris\": \"XXX\"}"); + const auto normal_task_spec_without_runtime_env = + ExampleTaskSpec(ActorID::Nil(), Language::PYTHON, JOB_ID, ActorID::Nil(), {}); // Pop worker for actor creation task again. - popped_worker = worker_pool_->PopWorker(actor_creation_task_spec); - // Got a worker. + auto popped_worker = worker_pool_->PopWorkerSync(actor_creation_task_spec); + // Got a worker with correct runtime env hash. ASSERT_NE(popped_worker, nullptr); - // Push new processes to worker pool. - PushWorkers(); - // Pop worker for normal task again. - popped_worker = worker_pool_->PopWorker(normal_task_spec); - // Got a worker. + ASSERT_EQ(popped_worker->GetRuntimeEnvHash(), + actor_creation_task_spec.GetRuntimeEnvHash()); + ASSERT_EQ(worker_pool_->GetProcessSize(), 1); + // Pop worker for normal task. + popped_worker = worker_pool_->PopWorkerSync(normal_task_spec); + // Got a worker with correct runtime env hash. ASSERT_NE(popped_worker, nullptr); - // Pop worker for normal task without runtime env again. - popped_worker = worker_pool_->PopWorker(normal_task_spec_without_runtime_env); - // Got a worker. + ASSERT_EQ(popped_worker->GetRuntimeEnvHash(), normal_task_spec.GetRuntimeEnvHash()); + ASSERT_EQ(worker_pool_->GetProcessSize(), 2); + // Pop worker for normal task without runtime env. + popped_worker = worker_pool_->PopWorkerSync(normal_task_spec_without_runtime_env); + // Got a worker with correct runtime env hash. ASSERT_NE(popped_worker, nullptr); + ASSERT_EQ(popped_worker->GetRuntimeEnvHash(), + normal_task_spec_without_runtime_env.GetRuntimeEnvHash()); + ASSERT_EQ(worker_pool_->GetProcessSize(), 3); } TEST_F(WorkerPoolTest, CacheWorkersByRuntimeEnvHash) { @@ -1170,72 +1249,60 @@ TEST_F(WorkerPoolTest, CacheWorkersByRuntimeEnvHash) { /// worker available whose runtime env matches the runtime env /// in the task spec. /// - StartMockAgent(); ASSERT_EQ(worker_pool_->GetProcessSize(), 0); auto actor_creation_id = ActorID::Of(JOB_ID, TaskID::ForDriverTask(JOB_ID), 1); - const auto actor_creation_task_spec_1 = - ExampleTaskSpec(ActorID::Nil(), Language::PYTHON, JOB_ID, actor_creation_id, - /*dynamic_options=*/{}, TaskID::Nil(), "mock_runtime_env_1"); - const auto task_spec_1 = - ExampleTaskSpec(ActorID::Nil(), Language::PYTHON, JOB_ID, ActorID::Nil(), - /*dynamic_options=*/{}, TaskID::Nil(), "mock_runtime_env_1"); - const auto task_spec_2 = - ExampleTaskSpec(ActorID::Nil(), Language::PYTHON, JOB_ID, ActorID::Nil(), - /*dynamic_options=*/{}, TaskID::Nil(), "mock_runtime_env_2"); + const auto actor_creation_task_spec_1 = ExampleTaskSpec( + ActorID::Nil(), Language::PYTHON, JOB_ID, actor_creation_id, + /*dynamic_options=*/{}, TaskID::ForFakeTask(), "mock_runtime_env_1"); + const auto task_spec_1 = ExampleTaskSpec( + ActorID::Nil(), Language::PYTHON, JOB_ID, ActorID::Nil(), + /*dynamic_options=*/{}, TaskID::ForFakeTask(), "mock_runtime_env_1"); + const auto task_spec_2 = ExampleTaskSpec( + ActorID::Nil(), Language::PYTHON, JOB_ID, ActorID::Nil(), + /*dynamic_options=*/{}, TaskID::ForFakeTask(), "mock_runtime_env_2"); const WorkerCacheKey env1 = { /*override_environment_variables=*/{}, "mock_runtime_env_1", {}}; const int runtime_env_hash_1 = env1.IntHash(); - // Try to pop worker for task with runtime env 1. - auto popped_worker = worker_pool_->PopWorker(task_spec_1); - // Check that no worker is available for task with runtime env 1. - ASSERT_EQ(popped_worker, nullptr); - // Push worker with runtime env 1. - worker_pool_->PushWorker(CreateWorker(Process::CreateNewDummy(), Language::PYTHON, - JOB_ID, rpc::WorkerType::WORKER, - runtime_env_hash_1)); + auto worker = + worker_pool_->CreateWorker(Process::CreateNewDummy(), Language::PYTHON, JOB_ID, + rpc::WorkerType::WORKER, runtime_env_hash_1); + worker_pool_->PushWorker(worker); // Try to pop worker for task with runtime env 2. - popped_worker = worker_pool_->PopWorker(task_spec_2); - // Check that no worker is available for task with runtime env 2. - ASSERT_EQ(popped_worker, nullptr); + auto popped_worker = worker_pool_->PopWorkerSync(task_spec_2); + // Check that popped worker isn't the one we pushed. + ASSERT_NE(popped_worker, nullptr); + ASSERT_NE(popped_worker, worker); // Try to pop the worker for task with runtime env 1. - popped_worker = worker_pool_->PopWorker(task_spec_1); - // Check that we got a worker. - ASSERT_NE(popped_worker, nullptr); + popped_worker = worker_pool_->PopWorkerSync(task_spec_1); + // Check that we got the pushed worker. + ASSERT_EQ(popped_worker, worker); // Push another worker with runtime env 1. - worker_pool_->PushWorker(CreateWorker(Process::CreateNewDummy(), Language::PYTHON, - JOB_ID, rpc::WorkerType::WORKER, - runtime_env_hash_1)); + worker = worker_pool_->CreateWorker(Process::CreateNewDummy(), Language::PYTHON, JOB_ID, + rpc::WorkerType::WORKER, runtime_env_hash_1); + worker_pool_->PushWorker(worker); // Try to pop the worker for an actor with runtime env 1. - popped_worker = worker_pool_->PopWorker(actor_creation_task_spec_1); - // Check that we got a worker. - ASSERT_NE(popped_worker, nullptr); + popped_worker = worker_pool_->PopWorkerSync(actor_creation_task_spec_1); + // Check that we got the pushed worker. + ASSERT_EQ(popped_worker, worker); + worker_pool_->ClearProcesses(); } TEST_F(WorkerPoolTest, StartWorkWithDifferentShimPid) { auto task_spec = ExampleTaskSpec(); - auto worker = worker_pool_->PopWorker(task_spec); - RAY_CHECK(!worker); + auto worker = worker_pool_->PopWorkerSync(task_spec); + ASSERT_NE(worker, nullptr); auto last_process = worker_pool_->LastStartedWorkerProcess(); - - // Register worker with different worker PID pid_t shim_pid = last_process.GetId(); - worker = CreateWorker(Process()); - RAY_CHECK_OK(worker_pool_->RegisterWorker(worker, shim_pid + 1000, shim_pid, - [](Status, int) {})); - ASSERT_EQ(1, worker_pool_->NumWorkerProcessesStarting()); - - // After worker finished starting, starting_worker_processes will erase this process. - worker_pool_->OnWorkerStarted(worker); - ASSERT_EQ(0, worker_pool_->NumWorkerProcessesStarting()); + ASSERT_EQ(shim_pid, worker->GetShimProcess().GetId()); - // test dedicated workers with different shim pid + // test dedicated worker std::vector actor_jvm_options; actor_jvm_options.insert( actor_jvm_options.end(), @@ -1244,78 +1311,127 @@ TEST_F(WorkerPoolTest, StartWorkWithDifferentShimPid) { auto actor_id = ActorID::Of(JOB_ID, task_id, 1); TaskSpecification java_task_spec = ExampleTaskSpec( ActorID::Nil(), Language::JAVA, JOB_ID, actor_id, actor_jvm_options, task_id); - ASSERT_EQ(worker_pool_->PopWorker(java_task_spec), nullptr); + worker = worker_pool_->PopWorkerSync(java_task_spec); + ASSERT_NE(worker, nullptr); last_process = worker_pool_->LastStartedWorkerProcess(); - pid_t java_shim_pid = last_process.GetId(); - pid_t java_pid = java_shim_pid + 1000; - auto java_worker = CreateWorker(Process(), Language::JAVA); - RAY_CHECK_OK(worker_pool_->RegisterWorker(java_worker, java_pid, java_shim_pid, - [](Status, int) {})); - // Add the workers to the pool. - worker_pool_->PushWorker(java_worker); - ASSERT_TRUE(worker_pool_->PopWorker(java_task_spec) != nullptr); + shim_pid = last_process.GetId(); + ASSERT_EQ(shim_pid, worker->GetShimProcess().GetId()); + worker_pool_->ClearProcesses(); } -TEST_F(WorkerPoolTest, NoSpuriousWorkerStartupDuringEnvInstall) { - std::vector agent_commands = {}; - const NodeID node_id = NodeID::FromRandom(); - auto options = AgentManager::Options({node_id, agent_commands}); - auto agent_manager = std::make_shared( - std::move(options), - /*delay_executor=*/ - [this](std::function task, uint32_t delay_ms) { - return execute_after(io_service_, task, delay_ms); - }, - /*runtime_env_agent_factory=*/ - [](const std::string &ip_address, int port) { - return std::shared_ptr( - new MockRuntimeEnvAgentClient()); - }, - false); - worker_pool_->SetAgentManager(agent_manager); - - ASSERT_EQ(worker_pool_->GetProcessSize(), 0); - const auto normal_task_spec = - ExampleTaskSpec(ActorID::Nil(), Language::PYTHON, JOB_ID, ActorID::Nil(), - /*dynamic_options=*/{}, TaskID::Nil(), "mock_runtime_env_1"); - - // Pop worker for a task with runtime env. - auto popped_worker = worker_pool_->PopWorker(normal_task_spec); +TEST_F(WorkerPoolTest, WorkerNoLeaks) { + std::shared_ptr popped_worker; + const auto task_spec = ExampleTaskSpec(); - // No idle workers. - ASSERT_EQ(popped_worker, nullptr); + // Pop a worker and don't dispatch. + worker_pool_->PopWorker( + task_spec, + [](const std::shared_ptr worker, PopWorkerStatus status) -> bool { + // Don't dispatch this worker. + return false; + }); + // One worker process has been started. + ASSERT_EQ(worker_pool_->GetProcessSize(), 1); + // No idle workers because no workers pushed. + ASSERT_EQ(worker_pool_->GetIdleWorkerSize(), 0); + // push workers. + worker_pool_->PushWorkers(); + // The worker has been pushed but not dispatched. + ASSERT_EQ(worker_pool_->GetIdleWorkerSize(), 1); + // Pop a worker and don't dispatch. + worker_pool_->PopWorker( + task_spec, + [](const std::shared_ptr worker, PopWorkerStatus status) -> bool { + // Don't dispatch this worker. + return false; + }); + // The worker is popped but not dispatched. + ASSERT_EQ(worker_pool_->GetIdleWorkerSize(), 1); + ASSERT_EQ(worker_pool_->GetProcessSize(), 1); + // Pop a worker and dispatch. + worker_pool_->PopWorker( + task_spec, + [](const std::shared_ptr worker, PopWorkerStatus status) -> bool { + // Dispatch this worker. + return true; + }); + // The worker is popped and dispatched. + ASSERT_EQ(worker_pool_->GetIdleWorkerSize(), 0); + ASSERT_EQ(worker_pool_->GetProcessSize(), 1); + worker_pool_->ClearProcesses(); +} - // Worker does not start because CreateRuntimeEnvCallback has not yet been invoked. - ASSERT_EQ(worker_pool_->GetProcessSize(), 0); +TEST_F(WorkerPoolTest, PopWorkerStatus) { + std::shared_ptr popped_worker; + PopWorkerStatus status; - // Simulate the following situation: before runtime env is done installing, - // PopWorker is called several more times for the same task. - for (int i = 0; i < 10; i++) { - worker_pool_->PopWorker(normal_task_spec); + /* Test PopWorkerStatus TooManyStartingWorkerProcesses */ + // Startup worker processes to maximum. + for (int i = 0; i < MAXIMUM_STARTUP_CONCURRENCY; i++) { + auto task_spec = ExampleTaskSpec(); + worker_pool_->PopWorker(task_spec, + [](const std::shared_ptr worker, + PopWorkerStatus status) -> bool { return true; }); } + ASSERT_EQ(MAXIMUM_STARTUP_CONCURRENCY, worker_pool_->NumWorkerProcessesStarting()); - // Still, no workers should have started. - ASSERT_EQ(worker_pool_->GetProcessSize(), 0); - - // AgentManager::CreateRuntimeEnv() should have only been called once. - ASSERT_EQ(agent_manager->queued_callbacks.size(), 1); + // PopWorker failed and the status is `TooManyStartingWorkerProcesses`. + auto task_spec = ExampleTaskSpec(); + popped_worker = worker_pool_->PopWorkerSync(task_spec, false, &status); + ASSERT_EQ(popped_worker, nullptr); + ASSERT_EQ(status, PopWorkerStatus::TooManyStartingWorkerProcesses); - // Finish installing runtime env, call CreateRuntimeEnvCallback. - agent_manager->PopAndInvokeCallback(); + // PopWorker success after push workers and reduce the starting processes. + worker_pool_->PushWorkers(); + ASSERT_EQ(0, worker_pool_->NumWorkerProcessesStarting()); + popped_worker = worker_pool_->PopWorkerSync(task_spec, true, &status); + ASSERT_NE(popped_worker, nullptr); + ASSERT_EQ(status, PopWorkerStatus::OK); + + /* Test PopWorkerStatus JobConfigMissing */ + // Create a task by unregistered job id. + auto job_id = JobID::FromInt(123); + task_spec = ExampleTaskSpec(ActorID::Nil(), Language::JAVA, job_id); + popped_worker = worker_pool_->PopWorkerSync(task_spec, true, &status); + // PopWorker failed and the status is `JobConfigMissing`. + ASSERT_EQ(popped_worker, nullptr); + ASSERT_EQ(status, PopWorkerStatus::JobConfigMissing); - // Only one worker process is started. - ASSERT_EQ(worker_pool_->GetProcessSize(), 1); + // Register driver fot the job. + RegisterDriver(Language::PYTHON, job_id); + popped_worker = worker_pool_->PopWorkerSync(task_spec, true, &status); + // PopWorker success. + ASSERT_NE(popped_worker, nullptr); + ASSERT_EQ(status, PopWorkerStatus::OK); + + /* Test PopWorkerStatus RuntimeEnvCreationFailed */ + // Create a task with bad runtime env. + const auto task_spec_with_bad_runtime_env = + ExampleTaskSpec(ActorID::Nil(), Language::PYTHON, job_id, ActorID::Nil(), + {"XXX=YYY"}, TaskID::ForFakeTask(), BAD_RUNTIME_ENV); + popped_worker = + worker_pool_->PopWorkerSync(task_spec_with_bad_runtime_env, true, &status); + // PopWorker failed and the status is `RuntimeEnvCreationFailed`. + ASSERT_EQ(popped_worker, nullptr); + ASSERT_EQ(status, PopWorkerStatus::RuntimeEnvCreationFailed); + + // Create a task with available runtime env. + const auto task_spec_with_runtime_env = + ExampleTaskSpec(ActorID::Nil(), Language::PYTHON, job_id, ActorID::Nil(), + {"XXX=YYY"}, TaskID::ForFakeTask(), "{\"uris\": \"XXX\"}"); + popped_worker = worker_pool_->PopWorkerSync(task_spec_with_runtime_env, true, &status); + // PopWorker success. + ASSERT_NE(popped_worker, nullptr); + ASSERT_EQ(status, PopWorkerStatus::OK); - // Now that the env has been created, we should be able to start workers for this env - // in parallel. The soft limit of the number of workers is 5, so just add 4 more. - for (int i = 0; i < 4; i++) { - worker_pool_->PopWorker(normal_task_spec); - } - ASSERT_EQ(agent_manager->queued_callbacks.size(), 4); - for (int i = 0; i < 4; i++) { - agent_manager->PopAndInvokeCallback(); - } - ASSERT_EQ(worker_pool_->GetProcessSize(), 5); + /* Test PopWorkerStatus RuntimeEnvCreationFailed */ + // Create a task without push worker. + popped_worker = worker_pool_->PopWorkerSync(task_spec, false, &status); + ASSERT_EQ(popped_worker, nullptr); + // PopWorker failed while the timer was triggered and the status is + // `RuntimeEnvCreationFailed`. + ASSERT_EQ(status, PopWorkerStatus::WorkerPendingRegistration); + worker_pool_->ClearProcesses(); } } // namespace raylet