Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] make 'PopWorker' to be an async function #17202

Merged
merged 36 commits into from
Aug 11, 2021

Conversation

SongGuyang
Copy link
Contributor

@SongGuyang SongGuyang commented Jul 20, 2021

Why are these changes needed?

  • 'PopWorker' is a synchronous function now. If there are no idle workers, we should call 'PopWorker' repeatedly. And if lease worker with runtime env, we can not catch the failure of runtime env creation and make some other decision, such as rescheduling the task.
  • In current PR, we make 'PopWorker' to be an async function.

Closes #17154

@SongGuyang SongGuyang requested review from kfstorm and raulchen July 20, 2021 03:41
@SongGuyang SongGuyang requested a review from jovany-wang July 20, 2021 03:53
@SongGuyang
Copy link
Contributor Author

@kfstorm @jovany-wang Please help to review this change. I have not modified the worker pool test now. But you can review this PR concurrently with my updates.

@rkooo567 rkooo567 self-assigned this Jul 20, 2021
Copy link
Contributor

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great if we could avoid adding a new queue. I think we could eliminate the dispatch queue if we ensured pops never failed (or you can have an async failure callback handler).

@ericl ericl self-assigned this Jul 20, 2021
@ericl ericl added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Jul 20, 2021
@SongGuyang
Copy link
Contributor Author

@ericl @kfstorm @jovany-wang This PR has been updated a lot. Please help to review again. @ericl And to avoid adding a new queue, I have added some status to Work and reused dispatch queue.

@SongGuyang SongGuyang changed the title [WIP] make 'PopWorker' to be an async function [core] make 'PopWorker' to be an async function Jul 23, 2021
@SongGuyang
Copy link
Contributor Author

@simon-mo I remove blocked_runtime_env_to_skip in current PR because Popworker is to be a async function. Please help to review.

@ericl ericl removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Jul 23, 2021
ericl
ericl previously requested changes Jul 23, 2021
Task task;
rpc::RequestWorkerLeaseReply *reply;
std::function<void(void)> callback;
bool waiting_worker_popped;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
bool waiting_worker_popped;
bool waiting_worker_popped = false;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make this and dispatch_finished into a single enum instead of having two separate flags? This seems less brittle, since we only have one state to check. I don't think it's valid to have waiting_worker_popped && dispatch_finished anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 I think we can just do

enum {
    WAITING # default
    CANCELLED # cancel = true
    SCHEDULED # waiting_worker_popped = true
    DISPATCHED # dispatch_finished = true
}

Are there conditions that multiple of them co-exist?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add SPILLED as a state? I think the semantics are the same as cancelled, but it would be more clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rkooo567 Great! Dose the semantics of SCHEDULED make confusing?In raylet, SCHEDULED means node selected and resources occupied?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't the worker popped only when it is scheduled though? Or does the pop mean "it will be eventually be scheduled"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can make it explicit and call it WAITING_FOR_WORKER instead of SCHEDULED. That seems pretty clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wuisawesome I found we don't need SPILLED because the item of Work will be removed after spilling.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just use three status:

enum WorkStatus {
  WAITING,
  WAITING_FOR_WORKER,
  CANCELLED,
};

@simon-mo
Copy link
Contributor

simon-mo commented Jul 23, 2021

@SongGuyang as long as

def test_env_installation_nonblocking(shutdown_only):

is passed you can definitely remove it.

cc @wuisawesome we might want to run scalability test for this PR

Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly some questions about corner cases.

ReplyCancelled(*work_it);
if (!task.GetTaskSpecification().GetDependencies().empty()) {
task_dependency_manager_.RemoveTaskDependencies(
task.GetTaskSpecification().TaskId());
}
(*work_it)->canceled = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment to explain when the task actually gets erased?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or rather, the task is being removed from the queue here, but is may already be captured in a callback, so it shouldn't dispatch even if the callback gives it a worker. (+1 to putting this in a comment though).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stephanie-wang I have erased this task in next line?

work_queue.erase(work_it);

Copy link
Contributor

@stephanie-wang stephanie-wang Jul 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, I meant to add a comment to explain when *work_it gets erased. I think that happens when the dispatch loop gets triggered again, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have two place to erase task now: CancelTask and erase_from_dispatch_queue_fn.

Task task;
rpc::RequestWorkerLeaseReply *reply;
std::function<void(void)> callback;
bool waiting_worker_popped;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make this and dispatch_finished into a single enum instead of having two separate flags? This seems less brittle, since we only have one state to check. I don't think it's valid to have waiting_worker_popped && dispatch_finished anyway.

oss << "Worker not started, " << starting_workers << " workers of language type "
<< static_cast<int>(language) << " pending registration";
RAY_LOG(DEBUG) << oss.str();
PopWorkerCallbackExecution(callback, nullptr,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not wait to call the callback until the starting workers register?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this the case where we aren't starting a worker for this callback, therefore no worker will be registered that's assigned to this callback?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either way, can we add a unit test for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, when the starting workers number greater than maximum_startup_concurrency_, we will not start an new worker process. We should re-dispatch this task.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me see how to test it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I guess I thought that now that the callback is async, we should just wait until a worker was available. Right now, the logic will loop:

  1. Trigger dispatch/PopWorker.
  2. If startup concurrency maxed out, callback with empty worker.
    But we could also avoid looping if we did this:
  3. Trigger dispatch/PopWorker.
  4. If startup concurrency maxed out, wait until a worker is pushed or a worker can be started, then callback.

I don't think it's a good idea to do it in this PR, but it would be good to do it in the future. Can you add as a TODO?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, Let me add this TODO.

const TaskSpecification &task_spec,
const std::string &allocated_instances_serialized_json = "{}");
/// \param callback The callback function that executed when gets the result of
/// worker popping.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit confused about the possible triggers for the callback. Can you comment and say what they are? In particular, I'm not sure about what happens when:

  1. There's no job config available yet.
  2. There's no available worker (there seems to be multiple cases here?)

I might be wrong, but it also doesn't seem like the unit tests in worker_pool_tests check whether we return the right error codes and at the right time in these cases. Can we add tests for these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's no job config available or the starting workers number greater than maximum_startup_concurrency_, we will callback an empty worker to trigger re-dispatch. And If runtime env creation failed, we should also callback empty worker but trigger re-schedule to other nodes or mark task dead. But I haven't implemented this in current PR, just add some status codes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, can we make sure to document what the cases are where will trigger the callback? I noticed you added status codes, but it would be good to explain what they mean in the comments.

// spill this task.
ReleaseTaskArgs(task_id);
// TODO(guyang.sgy): re-schedule the task by status code.
work->waiting_worker_popped = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't this hang? Since there is no guarantee DispatchScheduledTasksToWorkers will get called again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good comment! If the status code is JobNotStarted or WorkerPendingRegistration, maybe DispatchScheduledTasksToWorkers will be triggered by other events? Such as job register or worker register. But If the status code is RuntimeEnvCreationFailed, this will hang. I should do some work here. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm now that I'm thinking about this more, this might be okay since the previous version of the code also didn't handle the case.

Could you add a NOTE: comment to explain that it's assumed that DispatchScheduledTasksToWorkers will get called again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have solved this problem now. If RuntimeEnvCreationFailed happens, we will fail the task directly.

@@ -102,6 +106,9 @@ std::string Status::CodeAsString() const {
{StatusCode::ObjectAlreadySealed, STATUS_CODE_OBJECT_STORE_ALREADY_SEALED},
{StatusCode::ObjectStoreFull, STATUS_CODE_OBJECT_STORE_FULL},
{StatusCode::TransientObjectStoreFull, STATUS_CODE_TRANSIENT_OBJECT_STORE_FULL},
{StatusCode::JobNotStarted, STATUS_CODE_JOB_NOT_STARTED},
{StatusCode::WorkerPendingRegistration, STATUS_CODE_WORKER_PENDING_REGISTRATION},
{StatusCode::RuntimeEnvCreationFailed, STATUS_CODE_RUNTIME_ENV_CREATION_FAILED},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we don't catch all these status inside a caller and do special action. Why don't we just use one status and write better error message inside? (e.g., WorkerPoolStartupNotReady Status?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code updated. I have added some different branches by status.

Copy link
Contributor

@wuisawesome wuisawesome left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like the idea of using a callback for for the worker pool, but I think there are lots of edge cases to handle here, and we should have a lot more test coverage for this.

I pointed out a few specific places, but in general, can we also:

  1. In the cluster task manager tests, assert that we're only attempting to send 1 reply?
  2. In the worker pool, assert that we always call the callback immediately within PopWorker, except when we actually start/register a worker process (in which case we should assert the callback isn't called immediately) where we should assert it's only called after registration is complete?

@@ -237,6 +232,11 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers(
// scheduler will make the same decision.
break;
}
if (!spec.GetDependencies().empty()) {
task_dependency_manager_.RemoveTaskDependencies(
task.GetTaskSpecification().TaskId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment explaining what this check is?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just copied previous code.

ReplyCancelled(*work_it);
if (!task.GetTaskSpecification().GetDependencies().empty()) {
task_dependency_manager_.RemoveTaskDependencies(
task.GetTaskSpecification().TaskId());
}
(*work_it)->canceled = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or rather, the task is being removed from the queue here, but is may already be captured in a callback, so it shouldn't dispatch even if the callback gives it a worker. (+1 to putting this in a comment though).

Task task;
rpc::RequestWorkerLeaseReply *reply;
std::function<void(void)> callback;
bool waiting_worker_popped;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add SPILLED as a state? I think the semantics are the same as cancelled, but it would be more clear.

oss << "Worker not started, " << starting_workers << " workers of language type "
<< static_cast<int>(language) << " pending registration";
RAY_LOG(DEBUG) << oss.str();
PopWorkerCallbackExecution(callback, nullptr,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this the case where we aren't starting a worker for this callback, therefore no worker will be registered that's assigned to this callback?

<< static_cast<int>(language) << " pending registration";
RAY_LOG(DEBUG) << oss.str();
PopWorkerCallbackExecution(callback, nullptr,
Status::WorkerPendingRegistration(oss.str()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this status be something like WorkerStartupRateLimit? We aren't starting a worker process here are we?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have renamed this status to SoManyStartingWorkerProcesses. Do you think WorkerStartupRateLimit is a better name?

worker_pool_.PopWorker(
spec,
[this, allocated_instances, task_id, runtime_env_worker_key](
const std::shared_ptr<WorkerInterface> worker, Status status) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like it's possible for the same task to receive multiple worker processes now. Do we need to check for that? If so, can you add a unit test for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand. In my latest code, I bind task to an worker process. Not receive multiple worker processes?

oss << "Worker not started, " << starting_workers << " workers of language type "
<< static_cast<int>(language) << " pending registration";
RAY_LOG(DEBUG) << oss.str();
PopWorkerCallbackExecution(callback, nullptr,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either way, can we add a unit test for it?

@rkooo567
Copy link
Contributor

I am down to merge it after @wuisawesome has a final go. But please let me merge the PR so that we can make sure the nightly tests are reliable after this PR is merged.

@raulchen
Copy link
Contributor

@rkooo567 are you going to manually run a nightly test against this PR?

@rkooo567
Copy link
Contributor

I will just merge it on the day we don’t merge a big core changes. (We have other huge core change that could be merged soon, so I’d like to make sure they have enough gaps before merged)

@raulchen
Copy link
Contributor

@rkooo567 so are you going to merge this PR tomorrow? We don't want to hold this PR for too long, as it keeps getting conflicts.

@rkooo567
Copy link
Contributor

rkooo567 commented Aug 10, 2021

Yeah if the ci test result is good and Alex gives the final approval, I will merge it tmrw

@rkooo567
Copy link
Contributor

rkooo567 commented Aug 10, 2021

Btw, sorry for the delay in merging it :(. We've had bad incidents on this worker pool layer recently, so we were being more careful in reviewing the PR this time (this PR actually comes at the nice timing because we've been discussing about this path needs to be re-written). Also, I am not sure if it was possible in this case, but it'd be great if we could split the PR into multiple of them because if the PR contains 1000+ lines of code, it becomes very hard to provide quality code review (this is something our end also should improve), which means it can take even longer time to merge. Based on my experience if the PR has more than 1000 lines of code change, it'd expect to take normally 2~3 weeks unless it is just refactoring.

@raulchen
Copy link
Contributor

@rkooo567 Understood. Originally, we thought this was going to be a small change (just making a function async). But it turned out that a lot of related code (cluster_task_manager for example) needed to be changed as well. And task dispatching and worker managing logic became slightly different. Also, another reason why it's huge is that a lot of sync testing code needed to be async now.

@SongGuyang
Copy link
Contributor Author

The core changes only about 300 lines. Most of the change lines are tests code and new tests. And I can't split this PR because I just do one thing: make PopWorker to be async. No other irrelevant code.

@SongGuyang
Copy link
Contributor Author

All checks have passed.

@rkooo567
Copy link
Contributor

Plan to merge at 5pm today.

@rkooo567 rkooo567 merged commit 63c15d7 into ray-project:master Aug 11, 2021
@rkooo567
Copy link
Contributor

Thanks for the PR again @SongGuyang! I will have a close look at the nightly test and lyk how this goes!

@jovany-wang jovany-wang deleted the dev_pop_worker branch August 11, 2021 00:17
@SongGuyang
Copy link
Contributor Author

@rkooo567 Thank you!

sven1977 pushed a commit that referenced this pull request Aug 11, 2021
* make 'PopWorker' to be an async function

* pop worker async works

* fix

* address comments

* bugfix

* fix cluster_task_manager_test

* fix

* bugfix of detached actor

* address comments

* fix

* address comments

* fix aioredis

* Revert "fix aioredis"

This reverts commit 041b983.

* bug fix

* fix

* fix test_step_resources test

* format

* add unit test

* fix

* add test case PopWorkerStatus

* address commit

* fix lint

* address comments

* add python test

* address comments

* make an independent function

* Update test_basic_3.py

Co-authored-by: Hao Chen <chenh1024@gmail.com>
clarkzinzow pushed a commit to clarkzinzow/ray that referenced this pull request Aug 12, 2021
* make 'PopWorker' to be an async function

* pop worker async works

* fix

* address comments

* bugfix

* fix cluster_task_manager_test

* fix

* bugfix of detached actor

* address comments

* fix

* address comments

* fix aioredis

* Revert "fix aioredis"

This reverts commit 041b983.

* bug fix

* fix

* fix test_step_resources test

* format

* add unit test

* fix

* add test case PopWorkerStatus

* address commit

* fix lint

* address comments

* add python test

* address comments

* make an independent function

* Update test_basic_3.py

Co-authored-by: Hao Chen <chenh1024@gmail.com>
clarkzinzow added a commit that referenced this pull request Aug 12, 2021
* [Core] Support ConcurrentGroup part1 (#16795)

* Core change and Java change.

* Fix void call.

* Address comments and fix cases.

* Fix asyncio

* Change core worker C++ namespace to ray::core (#17610)

* [C++ Worker] Replace `Ray::xxx` with `ray::xxx` and update namespaces (#17388)

* [core] make 'PopWorker' to be an async function (#17202)

* make 'PopWorker' to be an async function

* pop worker async works

* fix

* address comments

* bugfix

* fix cluster_task_manager_test

* fix

* bugfix of detached actor

* address comments

* fix

* address comments

* fix aioredis

* Revert "fix aioredis"

This reverts commit 041b983.

* bug fix

* fix

* fix test_step_resources test

* format

* add unit test

* fix

* add test case PopWorkerStatus

* address commit

* fix lint

* address comments

* add python test

* address comments

* make an independent function

* Update test_basic_3.py

Co-authored-by: Hao Chen <chenh1024@gmail.com>

Co-authored-by: Qing Wang <kingchin1218@gmail.com>
Co-authored-by: Hao Chen <chenh1024@gmail.com>
Co-authored-by: qicosmos <383121719@qq.com>
Co-authored-by: SongGuyang <guyang.sgy@antfin.com>
wuisawesome added a commit that referenced this pull request Aug 17, 2021
wuisawesome pushed a commit that referenced this pull request Aug 19, 2021
wuisawesome pushed a commit that referenced this pull request Aug 19, 2021
wuisawesome pushed a commit that referenced this pull request Aug 20, 2021
@@ -1244,78 +1311,127 @@ TEST_F(WorkerPoolTest, StartWorkWithDifferentShimPid) {
auto actor_id = ActorID::Of(JOB_ID, task_id, 1);
TaskSpecification java_task_spec = ExampleTaskSpec(
ActorID::Nil(), Language::JAVA, JOB_ID, actor_id, actor_jvm_options, task_id);
ASSERT_EQ(worker_pool_->PopWorker(java_task_spec), nullptr);
worker = worker_pool_->PopWorkerSync(java_task_spec);
ASSERT_NE(worker, nullptr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this change didn't make sense, because this test is aiming to test shim pid mechanism.

Since we integrated startup token mechanism, this case should be removed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Worker pool/task manager issues [Core] Graceful failure for tasks/actors with invalid runtime envs.
10 participants