-
Notifications
You must be signed in to change notification settings - Fork 6.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core] make 'PopWorker' to be an async function #17202
Conversation
@kfstorm @jovany-wang Please help to review this change. I have not modified the worker pool test now. But you can review this PR concurrently with my updates. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be great if we could avoid adding a new queue. I think we could eliminate the dispatch queue if we ensured pops never failed (or you can have an async failure callback handler).
@ericl @kfstorm @jovany-wang This PR has been updated a lot. Please help to review again. @ericl And to avoid adding a new queue, I have added some status to |
@simon-mo I remove |
Task task; | ||
rpc::RequestWorkerLeaseReply *reply; | ||
std::function<void(void)> callback; | ||
bool waiting_worker_popped; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bool waiting_worker_popped; | |
bool waiting_worker_popped = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we make this and dispatch_finished into a single enum instead of having two separate flags? This seems less brittle, since we only have one state to check. I don't think it's valid to have waiting_worker_popped && dispatch_finished anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 I think we can just do
enum {
WAITING # default
CANCELLED # cancel = true
SCHEDULED # waiting_worker_popped = true
DISPATCHED # dispatch_finished = true
}
Are there conditions that multiple of them co-exist?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also add SPILLED
as a state? I think the semantics are the same as cancelled, but it would be more clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rkooo567 Great! Dose the semantics of SCHEDULED
make confusing?In raylet, SCHEDULED
means node selected and resources occupied?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't the worker popped only when it is scheduled though? Or does the pop mean "it will be eventually be scheduled"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can make it explicit and call it WAITING_FOR_WORKER
instead of SCHEDULED
. That seems pretty clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wuisawesome I found we don't need SPILLED
because the item of Work
will be removed after spilling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just use three status:
enum WorkStatus {
WAITING,
WAITING_FOR_WORKER,
CANCELLED,
};
@SongGuyang as long as
is passed you can definitely remove it. cc @wuisawesome we might want to run scalability test for this PR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mainly some questions about corner cases.
ReplyCancelled(*work_it); | ||
if (!task.GetTaskSpecification().GetDependencies().empty()) { | ||
task_dependency_manager_.RemoveTaskDependencies( | ||
task.GetTaskSpecification().TaskId()); | ||
} | ||
(*work_it)->canceled = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment to explain when the task actually gets erased?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or rather, the task is being removed from the queue here, but is may already be captured in a callback, so it shouldn't dispatch even if the callback gives it a worker. (+1 to putting this in a comment though).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stephanie-wang I have erased this task in next line?
work_queue.erase(work_it); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, I meant to add a comment to explain when *work_it gets erased. I think that happens when the dispatch loop gets triggered again, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have two place to erase task now: CancelTask
and erase_from_dispatch_queue_fn
.
Task task; | ||
rpc::RequestWorkerLeaseReply *reply; | ||
std::function<void(void)> callback; | ||
bool waiting_worker_popped; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we make this and dispatch_finished into a single enum instead of having two separate flags? This seems less brittle, since we only have one state to check. I don't think it's valid to have waiting_worker_popped && dispatch_finished anyway.
src/ray/raylet/worker_pool.cc
Outdated
oss << "Worker not started, " << starting_workers << " workers of language type " | ||
<< static_cast<int>(language) << " pending registration"; | ||
RAY_LOG(DEBUG) << oss.str(); | ||
PopWorkerCallbackExecution(callback, nullptr, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not wait to call the callback until the starting workers register?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this the case where we aren't starting a worker for this callback, therefore no worker will be registered that's assigned to this callback?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either way, can we add a unit test for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, when the starting workers number greater than maximum_startup_concurrency_, we will not start an new worker process. We should re-dispatch this task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me see how to test it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I guess I thought that now that the callback is async, we should just wait until a worker was available. Right now, the logic will loop:
- Trigger dispatch/PopWorker.
- If startup concurrency maxed out, callback with empty worker.
But we could also avoid looping if we did this: - Trigger dispatch/PopWorker.
- If startup concurrency maxed out, wait until a worker is pushed or a worker can be started, then callback.
I don't think it's a good idea to do it in this PR, but it would be good to do it in the future. Can you add as a TODO?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, Let me add this TODO.
src/ray/raylet/worker_pool.h
Outdated
const TaskSpecification &task_spec, | ||
const std::string &allocated_instances_serialized_json = "{}"); | ||
/// \param callback The callback function that executed when gets the result of | ||
/// worker popping. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a bit confused about the possible triggers for the callback. Can you comment and say what they are? In particular, I'm not sure about what happens when:
- There's no job config available yet.
- There's no available worker (there seems to be multiple cases here?)
I might be wrong, but it also doesn't seem like the unit tests in worker_pool_tests check whether we return the right error codes and at the right time in these cases. Can we add tests for these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there's no job config available or the starting workers number greater than maximum_startup_concurrency_, we will callback an empty worker to trigger re-dispatch. And If runtime env creation failed, we should also callback empty worker but trigger re-schedule to other nodes or mark task dead. But I haven't implemented this in current PR, just add some status codes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, can we make sure to document what the cases are where will trigger the callback? I noticed you added status codes, but it would be good to explain what they mean in the comments.
// spill this task. | ||
ReleaseTaskArgs(task_id); | ||
// TODO(guyang.sgy): re-schedule the task by status code. | ||
work->waiting_worker_popped = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couldn't this hang? Since there is no guarantee DispatchScheduledTasksToWorkers will get called again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good comment! If the status code is JobNotStarted or WorkerPendingRegistration, maybe DispatchScheduledTasksToWorkers
will be triggered by other events? Such as job register or worker register. But If the status code is RuntimeEnvCreationFailed, this will hang. I should do some work here. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm now that I'm thinking about this more, this might be okay since the previous version of the code also didn't handle the case.
Could you add a NOTE:
comment to explain that it's assumed that DispatchScheduledTasksToWorkers will get called again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have solved this problem now. If RuntimeEnvCreationFailed
happens, we will fail the task directly.
src/ray/common/status.cc
Outdated
@@ -102,6 +106,9 @@ std::string Status::CodeAsString() const { | |||
{StatusCode::ObjectAlreadySealed, STATUS_CODE_OBJECT_STORE_ALREADY_SEALED}, | |||
{StatusCode::ObjectStoreFull, STATUS_CODE_OBJECT_STORE_FULL}, | |||
{StatusCode::TransientObjectStoreFull, STATUS_CODE_TRANSIENT_OBJECT_STORE_FULL}, | |||
{StatusCode::JobNotStarted, STATUS_CODE_JOB_NOT_STARTED}, | |||
{StatusCode::WorkerPendingRegistration, STATUS_CODE_WORKER_PENDING_REGISTRATION}, | |||
{StatusCode::RuntimeEnvCreationFailed, STATUS_CODE_RUNTIME_ENV_CREATION_FAILED}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we don't catch all these status inside a caller and do special action. Why don't we just use one status and write better error message inside? (e.g., WorkerPoolStartupNotReady Status?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code updated. I have added some different branches by status.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like the idea of using a callback for for the worker pool, but I think there are lots of edge cases to handle here, and we should have a lot more test coverage for this.
I pointed out a few specific places, but in general, can we also:
- In the cluster task manager tests, assert that we're only attempting to send 1 reply?
- In the worker pool, assert that we always call the callback immediately within PopWorker, except when we actually start/register a worker process (in which case we should assert the callback isn't called immediately) where we should assert it's only called after registration is complete?
@@ -237,6 +232,11 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers( | |||
// scheduler will make the same decision. | |||
break; | |||
} | |||
if (!spec.GetDependencies().empty()) { | |||
task_dependency_manager_.RemoveTaskDependencies( | |||
task.GetTaskSpecification().TaskId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment explaining what this check is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just copied previous code.
ReplyCancelled(*work_it); | ||
if (!task.GetTaskSpecification().GetDependencies().empty()) { | ||
task_dependency_manager_.RemoveTaskDependencies( | ||
task.GetTaskSpecification().TaskId()); | ||
} | ||
(*work_it)->canceled = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or rather, the task is being removed from the queue here, but is may already be captured in a callback, so it shouldn't dispatch even if the callback gives it a worker. (+1 to putting this in a comment though).
Task task; | ||
rpc::RequestWorkerLeaseReply *reply; | ||
std::function<void(void)> callback; | ||
bool waiting_worker_popped; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also add SPILLED
as a state? I think the semantics are the same as cancelled, but it would be more clear.
src/ray/raylet/worker_pool.cc
Outdated
oss << "Worker not started, " << starting_workers << " workers of language type " | ||
<< static_cast<int>(language) << " pending registration"; | ||
RAY_LOG(DEBUG) << oss.str(); | ||
PopWorkerCallbackExecution(callback, nullptr, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this the case where we aren't starting a worker for this callback, therefore no worker will be registered that's assigned to this callback?
src/ray/raylet/worker_pool.cc
Outdated
<< static_cast<int>(language) << " pending registration"; | ||
RAY_LOG(DEBUG) << oss.str(); | ||
PopWorkerCallbackExecution(callback, nullptr, | ||
Status::WorkerPendingRegistration(oss.str())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this status be something like WorkerStartupRateLimit
? We aren't starting a worker process here are we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I have renamed this status to SoManyStartingWorkerProcesses
. Do you think WorkerStartupRateLimit
is a better name?
worker_pool_.PopWorker( | ||
spec, | ||
[this, allocated_instances, task_id, runtime_env_worker_key]( | ||
const std::shared_ptr<WorkerInterface> worker, Status status) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like it's possible for the same task to receive multiple worker processes now. Do we need to check for that? If so, can you add a unit test for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand. In my latest code, I bind task to an worker process. Not receive multiple worker processes?
src/ray/raylet/worker_pool.cc
Outdated
oss << "Worker not started, " << starting_workers << " workers of language type " | ||
<< static_cast<int>(language) << " pending registration"; | ||
RAY_LOG(DEBUG) << oss.str(); | ||
PopWorkerCallbackExecution(callback, nullptr, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either way, can we add a unit test for it?
I am down to merge it after @wuisawesome has a final go. But please let me merge the PR so that we can make sure the nightly tests are reliable after this PR is merged. |
@rkooo567 are you going to manually run a nightly test against this PR? |
I will just merge it on the day we don’t merge a big core changes. (We have other huge core change that could be merged soon, so I’d like to make sure they have enough gaps before merged) |
@rkooo567 so are you going to merge this PR tomorrow? We don't want to hold this PR for too long, as it keeps getting conflicts. |
Yeah if the ci test result is good and Alex gives the final approval, I will merge it tmrw |
Btw, sorry for the delay in merging it :(. We've had bad incidents on this worker pool layer recently, so we were being more careful in reviewing the PR this time (this PR actually comes at the nice timing because we've been discussing about this path needs to be re-written). Also, I am not sure if it was possible in this case, but it'd be great if we could split the PR into multiple of them because if the PR contains 1000+ lines of code, it becomes very hard to provide quality code review (this is something our end also should improve), which means it can take even longer time to merge. Based on my experience if the PR has more than 1000 lines of code change, it'd expect to take normally 2~3 weeks unless it is just refactoring. |
@rkooo567 Understood. Originally, we thought this was going to be a small change (just making a function async). But it turned out that a lot of related code (cluster_task_manager for example) needed to be changed as well. And task dispatching and worker managing logic became slightly different. Also, another reason why it's huge is that a lot of sync testing code needed to be async now. |
The core changes only about 300 lines. Most of the change lines are tests code and new tests. And I can't split this PR because I just do one thing: make |
All checks have passed. |
Plan to merge at 5pm today. |
Thanks for the PR again @SongGuyang! I will have a close look at the nightly test and lyk how this goes! |
@rkooo567 Thank you! |
* make 'PopWorker' to be an async function * pop worker async works * fix * address comments * bugfix * fix cluster_task_manager_test * fix * bugfix of detached actor * address comments * fix * address comments * fix aioredis * Revert "fix aioredis" This reverts commit 041b983. * bug fix * fix * fix test_step_resources test * format * add unit test * fix * add test case PopWorkerStatus * address commit * fix lint * address comments * add python test * address comments * make an independent function * Update test_basic_3.py Co-authored-by: Hao Chen <chenh1024@gmail.com>
* make 'PopWorker' to be an async function * pop worker async works * fix * address comments * bugfix * fix cluster_task_manager_test * fix * bugfix of detached actor * address comments * fix * address comments * fix aioredis * Revert "fix aioredis" This reverts commit 041b983. * bug fix * fix * fix test_step_resources test * format * add unit test * fix * add test case PopWorkerStatus * address commit * fix lint * address comments * add python test * address comments * make an independent function * Update test_basic_3.py Co-authored-by: Hao Chen <chenh1024@gmail.com>
* [Core] Support ConcurrentGroup part1 (#16795) * Core change and Java change. * Fix void call. * Address comments and fix cases. * Fix asyncio * Change core worker C++ namespace to ray::core (#17610) * [C++ Worker] Replace `Ray::xxx` with `ray::xxx` and update namespaces (#17388) * [core] make 'PopWorker' to be an async function (#17202) * make 'PopWorker' to be an async function * pop worker async works * fix * address comments * bugfix * fix cluster_task_manager_test * fix * bugfix of detached actor * address comments * fix * address comments * fix aioredis * Revert "fix aioredis" This reverts commit 041b983. * bug fix * fix * fix test_step_resources test * format * add unit test * fix * add test case PopWorkerStatus * address commit * fix lint * address comments * add python test * address comments * make an independent function * Update test_basic_3.py Co-authored-by: Hao Chen <chenh1024@gmail.com> Co-authored-by: Qing Wang <kingchin1218@gmail.com> Co-authored-by: Hao Chen <chenh1024@gmail.com> Co-authored-by: qicosmos <383121719@qq.com> Co-authored-by: SongGuyang <guyang.sgy@antfin.com>
This reverts commit 63c15d7.
This reverts commit 63c15d7.
This reverts commit 63c15d7.
…)"" This reverts commit 67e57b2.
@@ -1244,78 +1311,127 @@ TEST_F(WorkerPoolTest, StartWorkWithDifferentShimPid) { | |||
auto actor_id = ActorID::Of(JOB_ID, task_id, 1); | |||
TaskSpecification java_task_spec = ExampleTaskSpec( | |||
ActorID::Nil(), Language::JAVA, JOB_ID, actor_id, actor_jvm_options, task_id); | |||
ASSERT_EQ(worker_pool_->PopWorker(java_task_spec), nullptr); | |||
worker = worker_pool_->PopWorkerSync(java_task_spec); | |||
ASSERT_NE(worker, nullptr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this change didn't make sense, because this test is aiming to test shim pid mechanism.
Since we integrated startup token mechanism, this case should be removed.
Why are these changes needed?
Closes #17154