Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] make 'PopWorker' to be an async function #17202

Merged
merged 36 commits into from
Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
8ab39aa
make 'PopWorker' to be an async function
SongGuyang Jul 20, 2021
dfea937
pop worker async works
SongGuyang Jul 23, 2021
f13dd6b
Merge branch 'master' into dev_pop_worker
SongGuyang Jul 23, 2021
7378327
fix
SongGuyang Jul 23, 2021
d95c92e
address comments
SongGuyang Jul 27, 2021
9c582bb
bugfix
SongGuyang Jul 28, 2021
639d0fb
Merge branch 'master' into dev_pop_worker
SongGuyang Jul 28, 2021
9409d69
fix cluster_task_manager_test
SongGuyang Jul 29, 2021
74f08b8
Merge branch 'master' into dev_pop_worker
SongGuyang Jul 29, 2021
bbed86f
fix
SongGuyang Jul 29, 2021
06c8067
bugfix of detached actor
SongGuyang Jul 29, 2021
3702a93
address comments
SongGuyang Jul 29, 2021
83c29be
fix
SongGuyang Jul 29, 2021
619d15d
address comments
SongGuyang Jul 30, 2021
ad33937
Merge branch 'master' into dev_pop_worker
SongGuyang Jul 30, 2021
041b983
fix aioredis
SongGuyang Jul 30, 2021
de26099
Revert "fix aioredis"
SongGuyang Jul 31, 2021
d77af8f
Merge branch 'master' into dev_pop_worker
SongGuyang Jul 31, 2021
fa70027
bug fix
SongGuyang Jul 31, 2021
10ea16d
fix
SongGuyang Jul 31, 2021
8d21bcf
Merge branch 'master' into dev_pop_worker
SongGuyang Aug 2, 2021
74d7bc9
fix test_step_resources test
SongGuyang Aug 6, 2021
2806900
format
SongGuyang Aug 6, 2021
3f030a3
Merge branch 'master' into dev_pop_worker
SongGuyang Aug 6, 2021
221656e
add unit test
SongGuyang Aug 6, 2021
4eca794
fix
SongGuyang Aug 6, 2021
b26a2db
add test case PopWorkerStatus
SongGuyang Aug 6, 2021
3ddbdbb
Merge branch 'master' into dev_pop_worker
SongGuyang Aug 6, 2021
80b2291
address commit
SongGuyang Aug 7, 2021
a69b9fb
fix lint
SongGuyang Aug 7, 2021
1d4090b
address comments
SongGuyang Aug 9, 2021
fd9016d
add python test
SongGuyang Aug 10, 2021
ac08ff3
Merge branch 'master' into dev_pop_worker
SongGuyang Aug 10, 2021
9421cbd
address comments
SongGuyang Aug 10, 2021
e2b1d96
make an independent function
SongGuyang Aug 10, 2021
f396382
Update test_basic_3.py
raulchen Aug 10, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1159,8 +1159,9 @@ void NodeManager::ProcessAnnounceWorkerPortMessage(
int port = message->port();
worker->Connect(port);
if (is_worker) {
worker_pool_.OnWorkerStarted(worker);
HandleWorkerAvailable(worker->Connection());
if (worker_pool_.OnWorkerStarted(worker)) {
HandleWorkerAvailable(worker->Connection());
}
}
}

Expand Down
88 changes: 62 additions & 26 deletions src/ray/raylet/scheduling/cluster_task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -220,36 +220,66 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers(
// scheduler will make the same decision.
break;
}
if (!spec.GetDependencies().empty()) {
task_dependency_manager_.RemoveTaskDependencies(
task.GetTaskSpecification().TaskId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment explaining what this check is?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just copied previous code.

}
work_it = dispatch_queue.erase(work_it);
} else {
// The local node has the available resources to run the task, so we should run
// it.
std::shared_ptr<WorkerInterface> worker = worker_pool_.PopWorker(spec);
if (!worker) {
RAY_LOG(DEBUG) << "This node has available resources, but no worker processes "
"to grant the lease.";
// We've already acquired resources so we need to release them to avoid
// double-acquiring when the next invocation of this function tries to schedule
// this task.
cluster_resource_scheduler_->ReleaseWorkerResources(allocated_instances);
// No worker available, we won't be able to schedule any kind of task.
// Worker processes spin up pretty quickly, so it's not worth trying to spill
// this task.
ReleaseTaskArgs(task_id);
return;
if (tasks_waiting_workers_popped_.find(task_id) !=
tasks_waiting_workers_popped_.end()) {
// Skip the tasks that are waiting for workers popped.
continue;
}

RAY_LOG(DEBUG) << "Dispatching task " << task_id << " to worker "
<< worker->WorkerId();
auto reply = std::get<1>(*work_it);
auto callback = std::get<2>(*work_it);
Dispatch(worker, leased_workers_, allocated_instances, task, reply, callback);
}

if (!spec.GetDependencies().empty()) {
task_dependency_manager_.RemoveTaskDependencies(
task.GetTaskSpecification().TaskId());
tasks_waiting_workers_popped_.emplace(
task_id, std::make_tuple(false, shapes_it->first, work));
worker_pool_.PopWorker(spec, [this, allocated_instances, task_id](
const std::shared_ptr<WorkerInterface> worker) {
auto it = tasks_waiting_workers_popped_.find(task_id);
RAY_CHECK(it != tasks_waiting_workers_popped_.end());
auto canceled = std::get<0>(it->second);
auto schedulingClass = std::get<1>(it->second);
auto &work = std::get<2>(it->second);
auto reply = std::get<1>(work);
auto callback = std::get<2>(work);
if (canceled) {
RAY_LOG(WARNING) << "Task " << task_id
<< " not found yet when worker popped, maybe the task has "
"been cancelled.";
reply->set_canceled(true);
callback();
} else if (!worker) {
RAY_LOG(DEBUG)
<< "This node has available resources, but no worker processes "
"to grant the lease "
<< task_id << ".";
// We've already acquired resources so we need to release them to avoid
// double-acquiring when the next invocation of this function tries to
// schedule this task.
cluster_resource_scheduler_->ReleaseWorkerResources(allocated_instances);
// No worker available, we won't be able to schedule any kind of task.
// Worker processes spin up pretty quickly, so it's not worth trying to spill
// this task.
ReleaseTaskArgs(task_id);
// Push this task back to dispatch queue and make it can be re-dispatched.
tasks_to_dispatch_[schedulingClass].push_back(work);
} else {
RAY_LOG(DEBUG) << "Dispatching task " << task_id << " to worker "
<< worker->WorkerId();
const auto &task = std::get<0>(work);
const auto &spec = task.GetTaskSpecification();
Dispatch(worker, leased_workers_, allocated_instances, task, reply, callback);
if (!spec.GetDependencies().empty()) {
task_dependency_manager_.RemoveTaskDependencies(
task.GetTaskSpecification().TaskId());
}
}
tasks_waiting_workers_popped_.erase(task_id);
});
work_it = dispatch_queue.erase(work_it);
}
work_it = dispatch_queue.erase(work_it);
}
if (is_infeasible) {
infeasible_tasks_[shapes_it->first] = std::move(shapes_it->second);
Expand Down Expand Up @@ -488,6 +518,12 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id) {
}
}

auto it = tasks_waiting_workers_popped_.find(task_id);
if (it != tasks_waiting_workers_popped_.end()) {
std::get<0>(it->second) = true;
return true;
}

for (auto shapes_it = infeasible_tasks_.begin(); shapes_it != infeasible_tasks_.end();
shapes_it++) {
auto &work_queue = shapes_it->second;
Expand Down Expand Up @@ -845,7 +881,7 @@ void ClusterTaskManager::TryLocalInfeasibleTaskScheduling() {
void ClusterTaskManager::Dispatch(
std::shared_ptr<WorkerInterface> worker,
std::unordered_map<WorkerID, std::shared_ptr<WorkerInterface>> &leased_workers,
std::shared_ptr<TaskResourceInstances> &allocated_instances, const Task &task,
std::shared_ptr<TaskResourceInstances> allocated_instances, const Task &task,
rpc::RequestWorkerLeaseReply *reply, std::function<void(void)> send_reply_callback) {
metric_tasks_dispatched_++;
const auto &task_spec = task.GetTaskSpecification();
Expand Down
10 changes: 9 additions & 1 deletion src/ray/raylet/scheduling/cluster_task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ namespace raylet {
/// dispatch/spillback and the callback to trigger it.
typedef std::tuple<Task, rpc::RequestWorkerLeaseReply *, std::function<void(void)>> Work;

// The information of Work instance which waiting worker popped.
typedef std::tuple<bool, SchedulingClass, Work> WorkWaitingWorkerPopped;

typedef std::function<boost::optional<rpc::GcsNodeInfo>(const NodeID &node_id)>
NodeInfoGetter;

Expand Down Expand Up @@ -236,6 +239,11 @@ class ClusterTaskManager : public ClusterTaskManagerInterface {
/// is still queued.
std::unordered_map<SchedulingClass, std::deque<Work>> tasks_to_dispatch_;

/// Map of lease requests that waiting for workers popped.
/// Tasks move from dispatch to this map.
/// Tasks can also move from this map to dispatch if workers can not be popped.
std::unordered_map<TaskID, WorkWaitingWorkerPopped> tasks_waiting_workers_popped_;

/// Tasks waiting for arguments to be transferred locally.
/// Tasks move from waiting -> dispatch.
/// Tasks can also move from dispatch -> waiting if one of their arguments is
Expand Down Expand Up @@ -308,7 +316,7 @@ class ClusterTaskManager : public ClusterTaskManagerInterface {
void Dispatch(
std::shared_ptr<WorkerInterface> worker,
std::unordered_map<WorkerID, std::shared_ptr<WorkerInterface>> &leased_workers_,
std::shared_ptr<TaskResourceInstances> &allocated_instances, const Task &task,
std::shared_ptr<TaskResourceInstances> allocated_instances, const Task &task,
rpc::RequestWorkerLeaseReply *reply, std::function<void(void)> send_reply_callback);

void Spillback(const NodeID &spillback_to, const Work &work);
Expand Down
81 changes: 57 additions & 24 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,19 +151,27 @@ void WorkerPool::SetAgentManager(std::shared_ptr<AgentManager> agent_manager) {
agent_manager_ = agent_manager;
}

void WorkerPool::PopWorkerCallbackAsync(const PopWorkerCallback callback,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this helper method is unnecessary. I saw that some callback invocations do not leverage this method.

std::shared_ptr<WorkerInterface> worker) {
if (callback) {
io_service_->post([callback, worker]() { callback(worker); });
}
}

Process WorkerPool::StartWorkerProcess(
const Language &language, const rpc::WorkerType worker_type, const JobID &job_id,
const std::vector<std::string> &dynamic_options, const int runtime_env_hash,
const std::string &serialized_runtime_env,
const TaskID &task_id, const std::vector<std::string> &dynamic_options,
const int runtime_env_hash, const std::string &serialized_runtime_env,
std::unordered_map<std::string, std::string> override_environment_variables,
const std::string &serialized_runtime_env_context) {
const std::string &serialized_runtime_env_context, const PopWorkerCallback callback) {
rpc::JobConfig *job_config = nullptr;
if (!IsIOWorkerType(worker_type)) {
RAY_CHECK(!job_id.IsNil());
auto it = all_jobs_.find(job_id);
if (it == all_jobs_.end()) {
RAY_LOG(DEBUG) << "Job config of job " << job_id << " are not local yet.";
// Will reschedule ready tasks in `NodeManager::HandleJobStarted`.
PopWorkerCallbackAsync(callback);
return Process();
}
job_config = &it->second;
Expand All @@ -185,6 +193,7 @@ Process WorkerPool::StartWorkerProcess(
RAY_LOG(DEBUG) << "Worker not started, " << starting_workers
<< " workers of language type " << static_cast<int>(language)
<< " pending registration";
PopWorkerCallbackAsync(callback);
return Process();
}
// Either there are no workers pending registration or the worker start is being forced.
Expand Down Expand Up @@ -340,7 +349,8 @@ Process WorkerPool::StartWorkerProcess(
<< " worker(s) with pid " << proc.GetId();
MonitorStartingWorkerProcess(proc, language, worker_type);
state.starting_worker_processes.emplace(
proc, StartingWorkerProcessInfo{workers_to_start, workers_to_start, worker_type});
proc, StartingWorkerProcessInfo{workers_to_start, workers_to_start, worker_type,
task_id, callback});
if (IsIOWorkerType(worker_type)) {
auto &io_worker_state = GetIOWorkerStateFromWorkerType(worker_type, state);
io_worker_state.num_starting_io_workers++;
Expand All @@ -365,6 +375,13 @@ void WorkerPool::MonitorStartingWorkerProcess(const Process &proc,
if (it != state.starting_worker_processes.end()) {
RAY_LOG(INFO) << "Some workers of the worker process(" << proc.GetId()
<< ") have not registered to raylet within timeout.";
auto task_id = it->second.task_id;
auto callback = it->second.callback;
if (!task_id.IsNil() && callback) {
callback(nullptr);
it->second.task_id = TaskID::Nil();
it->second.callback = PopWorkerCallback();
}
state.starting_worker_processes.erase(it);
if (IsIOWorkerType(worker_type)) {
// Mark the I/O worker as failed.
Expand Down Expand Up @@ -497,13 +514,22 @@ Status WorkerPool::RegisterWorker(const std::shared_ptr<WorkerInterface> &worker
return Status::OK();
}

void WorkerPool::OnWorkerStarted(const std::shared_ptr<WorkerInterface> &worker) {
bool WorkerPool::OnWorkerStarted(const std::shared_ptr<WorkerInterface> &worker) {
auto &state = GetStateForLanguage(worker->GetLanguage());
const auto &shim_process = worker->GetShimProcess();
RAY_CHECK(shim_process.IsValid());

bool worker_available = true;
auto it = state.starting_worker_processes.find(shim_process);
if (it != state.starting_worker_processes.end()) {
auto task_id = it->second.task_id;
auto callback = it->second.callback;
if (!task_id.IsNil() && callback) {
callback(worker);
worker_available = false;
it->second.task_id = TaskID::Nil();
it->second.callback = PopWorkerCallback();
}
it->second.num_starting_workers--;
if (it->second.num_starting_workers == 0) {
state.starting_worker_processes.erase(it);
Expand Down Expand Up @@ -531,6 +557,7 @@ void WorkerPool::OnWorkerStarted(const std::shared_ptr<WorkerInterface> &worker)
}
}
}
return worker_available;
}

Status WorkerPool::RegisterDriver(const std::shared_ptr<WorkerInterface> &driver,
Expand Down Expand Up @@ -862,21 +889,24 @@ int GetRuntimeEnvHash(const TaskSpecification &task_spec) {
return env.IntHash();
}

std::shared_ptr<WorkerInterface> WorkerPool::PopWorker(
const TaskSpecification &task_spec) {
void WorkerPool::PopWorker(const TaskSpecification &task_spec,
const PopWorkerCallback callback) {
RAY_LOG(DEBUG) << "Pop worker for task " << task_spec.TaskId();
auto &state = GetStateForLanguage(task_spec.GetLanguage());

std::shared_ptr<WorkerInterface> worker = nullptr;
Process proc;
auto start_worker_process_fn =
[this](const TaskSpecification &task_spec, State &state,
std::vector<std::string> dynamic_options, bool dedicated,
const int runtime_env_hash, const std::string &serialized_runtime_env,
const std::string &serialized_runtime_env_context) -> Process {
auto start_worker_process_fn = [this](const TaskSpecification &task_spec, State &state,
std::vector<std::string> dynamic_options,
bool dedicated, const int runtime_env_hash,
const std::string &serialized_runtime_env,
const std::string &serialized_runtime_env_context,
const PopWorkerCallback callback) -> Process {
Process proc = StartWorkerProcess(
task_spec.GetLanguage(), rpc::WorkerType::WORKER, task_spec.JobId(),
dynamic_options, runtime_env_hash, serialized_runtime_env,
task_spec.OverrideEnvironmentVariables(), serialized_runtime_env_context);
task_spec.TaskId(), dynamic_options, runtime_env_hash, serialized_runtime_env,
task_spec.OverrideEnvironmentVariables(), serialized_runtime_env_context,
callback);
if (proc.IsValid()) {
WarnAboutSize();
if (dedicated) {
Expand Down Expand Up @@ -917,23 +947,25 @@ std::shared_ptr<WorkerInterface> WorkerPool::PopWorker(
state.tasks_with_pending_runtime_envs.emplace(task_spec.TaskId());
agent_manager_->CreateRuntimeEnv(
task_spec.SerializedRuntimeEnv(),
[start_worker_process_fn, &state, task_spec, dynamic_options](
[start_worker_process_fn, callback, &state, task_spec, dynamic_options](
bool done, const std::string &serialized_runtime_env_context) {
state.tasks_with_pending_runtime_envs.erase(task_spec.TaskId());
if (!done) {
// TODO(guyang.sgy): Reschedule to other nodes when create runtime env
// failed.
RAY_LOG(ERROR) << "Create runtime env(for dedicated actor) rpc failed. "
"Wait for next time to retry or reschedule.";
callback(nullptr);
return;
}
start_worker_process_fn(
task_spec, state, dynamic_options, true, GetRuntimeEnvHash(task_spec),
task_spec.SerializedRuntimeEnv(), serialized_runtime_env_context);
start_worker_process_fn(task_spec, state, dynamic_options, true,
GetRuntimeEnvHash(task_spec),
task_spec.SerializedRuntimeEnv(),
serialized_runtime_env_context, callback);
});
} else {
proc =
start_worker_process_fn(task_spec, state, dynamic_options, true, 0, "", "");
proc = start_worker_process_fn(task_spec, state, dynamic_options, true, 0, "", "",
callback);
}
}
} else {
Expand Down Expand Up @@ -976,20 +1008,21 @@ std::shared_ptr<WorkerInterface> WorkerPool::PopWorker(
// create runtime env.
agent_manager_->CreateRuntimeEnv(
task_spec.SerializedRuntimeEnv(),
[start_worker_process_fn, &state, task_spec, runtime_env_hash](
[start_worker_process_fn, callback, &state, task_spec, runtime_env_hash](
bool successful, const std::string &serialized_runtime_env_context) {
if (!successful) {
// TODO(guyang.sgy): Reschedule to other nodes when create runtime env
// failed.
callback(nullptr);
return;
}
start_worker_process_fn(task_spec, state, {}, false, runtime_env_hash,
task_spec.SerializedRuntimeEnv(),
serialized_runtime_env_context);
serialized_runtime_env_context, callback);
});
} else {
proc = start_worker_process_fn(task_spec, state, {}, false, runtime_env_hash, "",
"");
"", callback);
}
}
}
Expand All @@ -1000,8 +1033,8 @@ std::shared_ptr<WorkerInterface> WorkerPool::PopWorker(

if (worker) {
RAY_CHECK(worker->GetAssignedJobId() == task_spec.JobId());
PopWorkerCallbackAsync(callback, worker);
}
return worker;
}

void WorkerPool::PrestartWorkers(const TaskSpecification &task_spec,
Expand Down
Loading