Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] make 'PopWorker' to be an async function #17202

Merged
merged 36 commits into from
Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
8ab39aa
make 'PopWorker' to be an async function
SongGuyang Jul 20, 2021
dfea937
pop worker async works
SongGuyang Jul 23, 2021
f13dd6b
Merge branch 'master' into dev_pop_worker
SongGuyang Jul 23, 2021
7378327
fix
SongGuyang Jul 23, 2021
d95c92e
address comments
SongGuyang Jul 27, 2021
9c582bb
bugfix
SongGuyang Jul 28, 2021
639d0fb
Merge branch 'master' into dev_pop_worker
SongGuyang Jul 28, 2021
9409d69
fix cluster_task_manager_test
SongGuyang Jul 29, 2021
74f08b8
Merge branch 'master' into dev_pop_worker
SongGuyang Jul 29, 2021
bbed86f
fix
SongGuyang Jul 29, 2021
06c8067
bugfix of detached actor
SongGuyang Jul 29, 2021
3702a93
address comments
SongGuyang Jul 29, 2021
83c29be
fix
SongGuyang Jul 29, 2021
619d15d
address comments
SongGuyang Jul 30, 2021
ad33937
Merge branch 'master' into dev_pop_worker
SongGuyang Jul 30, 2021
041b983
fix aioredis
SongGuyang Jul 30, 2021
de26099
Revert "fix aioredis"
SongGuyang Jul 31, 2021
d77af8f
Merge branch 'master' into dev_pop_worker
SongGuyang Jul 31, 2021
fa70027
bug fix
SongGuyang Jul 31, 2021
10ea16d
fix
SongGuyang Jul 31, 2021
8d21bcf
Merge branch 'master' into dev_pop_worker
SongGuyang Aug 2, 2021
74d7bc9
fix test_step_resources test
SongGuyang Aug 6, 2021
2806900
format
SongGuyang Aug 6, 2021
3f030a3
Merge branch 'master' into dev_pop_worker
SongGuyang Aug 6, 2021
221656e
add unit test
SongGuyang Aug 6, 2021
4eca794
fix
SongGuyang Aug 6, 2021
b26a2db
add test case PopWorkerStatus
SongGuyang Aug 6, 2021
3ddbdbb
Merge branch 'master' into dev_pop_worker
SongGuyang Aug 6, 2021
80b2291
address commit
SongGuyang Aug 7, 2021
a69b9fb
fix lint
SongGuyang Aug 7, 2021
1d4090b
address comments
SongGuyang Aug 9, 2021
fd9016d
add python test
SongGuyang Aug 10, 2021
ac08ff3
Merge branch 'master' into dev_pop_worker
SongGuyang Aug 10, 2021
9421cbd
address comments
SongGuyang Aug 10, 2021
e2b1d96
make an independent function
SongGuyang Aug 10, 2021
f396382
Update test_basic_3.py
raulchen Aug 10, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@ def test_init_twice_2(call_ray_start, reset_workflow, tmp_path):
}], indirect=True)
def test_step_resources(workflow_start_regular, tmp_path):
lock_path = str(tmp_path / "lock")
# We use signal actor here because we can't guarantee the order of tasks
# sent from worker to raylet.
signal_actor = ray.test_utils.SignalActor.remote()

@workflow.step
def step_run():
ray.wait([signal_actor.send.remote()])
with FileLock(lock_path):
return None

Expand All @@ -45,6 +49,7 @@ def remote_run():
lock = FileLock(lock_path)
lock.acquire()
ret = step_run.options(num_cpus=2).step().run_async()
ray.wait([signal_actor.wait.remote()])
obj = remote_run.remote()
with pytest.raises(ray.exceptions.GetTimeoutError):
ray.get(obj, timeout=2)
Expand Down
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/gcs_actor_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ void GcsActorScheduler::CancelOnLeasing(const NodeID &node_id, const ActorID &ac
const TaskID &task_id) {
// NOTE: This method will cancel the outstanding lease request and remove leasing
// information from the internal state.
RAY_LOG(DEBUG) << "Canceling worker leasing of task " << task_id;
auto node_it = node_to_actors_when_leasing_.find(node_id);
RAY_CHECK(node_it != node_to_actors_when_leasing_.end());
node_it->second.erase(actor_id);
Expand Down
15 changes: 5 additions & 10 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,6 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self
config.worker_commands,
/*starting_worker_timeout_callback=*/
[this] { cluster_task_manager_->ScheduleAndDispatchTasks(); },
/*runtime_env_setup_failed_callback=*/
[this](const TaskID &task_id) {
RAY_CHECK(cluster_task_manager_->CancelTask(
task_id, /*runtime_env_setup_failed=*/true));
},
config.ray_debugger_external,
/*get_time=*/[]() { return absl::GetCurrentTimeNanos() / 1e6; }),
client_call_manager_(io_service),
Expand Down Expand Up @@ -299,11 +294,6 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self
auto get_node_info_func = [this](const NodeID &node_id) {
return gcs_client_->Nodes().Get(node_id);
};
auto is_owner_alive = [this](const WorkerID &owner_worker_id,
const NodeID &owner_node_id) {
return !(failed_workers_cache_.count(owner_worker_id) > 0 ||
failed_nodes_cache_.count(owner_node_id) > 0);
};
auto announce_infeasible_task = [this](const Task &task) {
PublishInfeasibleTaskError(task);
};
Expand All @@ -320,6 +310,11 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self
"return values are greater than the remaining capacity.";
max_task_args_memory = 0;
}
auto is_owner_alive = [this](const WorkerID &owner_worker_id,
const NodeID &owner_node_id) {
return !(failed_workers_cache_.count(owner_worker_id) > 0 ||
failed_nodes_cache_.count(owner_node_id) > 0);
};
cluster_task_manager_ = std::shared_ptr<ClusterTaskManager>(new ClusterTaskManager(
self_node_id_,
std::dynamic_pointer_cast<ClusterResourceScheduler>(cluster_resource_scheduler_),
Expand Down
246 changes: 171 additions & 75 deletions src/ray/raylet/scheduling/cluster_task_manager.cc

Large diffs are not rendered by default.

51 changes: 35 additions & 16 deletions src/ray/raylet/scheduling/cluster_task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,28 @@ namespace raylet {
/// Work represents all the information needed to make a scheduling decision.
/// This includes the task, the information we need to communicate to
/// dispatch/spillback and the callback to trigger it.
typedef std::tuple<Task, rpc::RequestWorkerLeaseReply *, std::function<void(void)>> Work;
enum WorkStatus {
WAITING,
WAITING_FOR_WORKER,
CANCELLED,
};

struct Work {
Task task;
rpc::RequestWorkerLeaseReply *reply;
std::function<void(void)> callback;
std::shared_ptr<TaskResourceInstances> allocated_instances;
WorkStatus status = WorkStatus::WAITING;
Work(Task task, rpc::RequestWorkerLeaseReply *reply, std::function<void(void)> callback,
WorkStatus status = WorkStatus::WAITING)
: task(task),
reply(reply),
callback(callback),
allocated_instances(nullptr),
status(status){};
Work(const Work &Work) = delete;
Work &operator=(const Work &work) = delete;
};

typedef std::function<boost::optional<rpc::GcsNodeInfo>(const NodeID &node_id)>
NodeInfoGetter;
Expand Down Expand Up @@ -206,13 +227,7 @@ class ClusterTaskManager : public ClusterTaskManagerInterface {
/// \returns true if the task was spilled. The task may not be spilled if the
/// spillback policy specifies the local node (which may happen if no other nodes have
/// the available resources).
bool TrySpillback(const Work &spec, bool &is_infeasible);

/// Helper method to try dispatching a single task from the queue to an
/// available worker. Returns whether the task should be removed from the
/// queue and whether the worker was successfully leased to execute the work.
bool AttemptDispatchWork(const Work &work, std::shared_ptr<WorkerInterface> &worker,
bool *worker_leased);
bool TrySpillback(const std::shared_ptr<Work> &work, bool &is_infeasible);

/// Reiterate all local infeasible tasks and register them to task_to_schedule_ if it
/// becomes feasible to schedule.
Expand Down Expand Up @@ -241,7 +256,8 @@ class ClusterTaskManager : public ClusterTaskManagerInterface {
/// through queues to cancel tasks, etc.
/// Queue of lease requests that are waiting for resources to become available.
/// Tasks move from scheduled -> dispatch | waiting.
std::unordered_map<SchedulingClass, std::deque<Work>> tasks_to_schedule_;
std::unordered_map<SchedulingClass, std::deque<std::shared_ptr<Work>>>
tasks_to_schedule_;

/// Queue of lease requests that should be scheduled onto workers.
/// Tasks move from scheduled | waiting -> dispatch.
Expand All @@ -250,7 +266,8 @@ class ClusterTaskManager : public ClusterTaskManagerInterface {
/// All tasks in this map that have dependencies should be registered with
/// the dependency manager, in case a dependency gets evicted while the task
/// is still queued.
std::unordered_map<SchedulingClass, std::deque<Work>> tasks_to_dispatch_;
std::unordered_map<SchedulingClass, std::deque<std::shared_ptr<Work>>>
tasks_to_dispatch_;

/// Tasks waiting for arguments to be transferred locally.
/// Tasks move from waiting -> dispatch.
Expand All @@ -268,14 +285,16 @@ class ClusterTaskManager : public ClusterTaskManagerInterface {
/// in this queue may not match the order in which we initially received the
/// tasks. This also means that the PullManager may request dependencies for
/// these tasks in a different order than the waiting task queue.
std::list<Work> waiting_task_queue_;
std::list<std::shared_ptr<Work>> waiting_task_queue_;

/// An index for the above queue.
absl::flat_hash_map<TaskID, std::list<Work>::iterator> waiting_tasks_index_;
absl::flat_hash_map<TaskID, std::list<std::shared_ptr<Work>>::iterator>
waiting_tasks_index_;

/// Queue of lease requests that are infeasible.
/// Tasks go between scheduling <-> infeasible.
std::unordered_map<SchedulingClass, std::deque<Work>> infeasible_tasks_;
std::unordered_map<SchedulingClass, std::deque<std::shared_ptr<Work>>>
infeasible_tasks_;

/// Track the cumulative backlog of all workers requesting a lease to this raylet.
std::unordered_map<SchedulingClass, int> backlog_tracker_;
Expand Down Expand Up @@ -319,15 +338,15 @@ class ClusterTaskManager : public ClusterTaskManagerInterface {
/// or placed on a wait queue.
///
/// \return True if the work can be immediately dispatched.
bool WaitForTaskArgsRequests(Work work);
bool WaitForTaskArgsRequests(std::shared_ptr<Work> work);

void Dispatch(
std::shared_ptr<WorkerInterface> worker,
std::unordered_map<WorkerID, std::shared_ptr<WorkerInterface>> &leased_workers_,
std::shared_ptr<TaskResourceInstances> &allocated_instances, const Task &task,
const std::shared_ptr<TaskResourceInstances> &allocated_instances, const Task &task,
rpc::RequestWorkerLeaseReply *reply, std::function<void(void)> send_reply_callback);

void Spillback(const NodeID &spillback_to, const Work &work);
void Spillback(const NodeID &spillback_to, const std::shared_ptr<Work> &work);

void AddToBacklogTracker(const Task &task);
void RemoveFromBacklogTracker(const Task &task);
Expand Down
Loading