Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] make 'PopWorker' to be an async function #17202

Merged
merged 36 commits into from
Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
8ab39aa
make 'PopWorker' to be an async function
SongGuyang Jul 20, 2021
dfea937
pop worker async works
SongGuyang Jul 23, 2021
f13dd6b
Merge branch 'master' into dev_pop_worker
SongGuyang Jul 23, 2021
7378327
fix
SongGuyang Jul 23, 2021
d95c92e
address comments
SongGuyang Jul 27, 2021
9c582bb
bugfix
SongGuyang Jul 28, 2021
639d0fb
Merge branch 'master' into dev_pop_worker
SongGuyang Jul 28, 2021
9409d69
fix cluster_task_manager_test
SongGuyang Jul 29, 2021
74f08b8
Merge branch 'master' into dev_pop_worker
SongGuyang Jul 29, 2021
bbed86f
fix
SongGuyang Jul 29, 2021
06c8067
bugfix of detached actor
SongGuyang Jul 29, 2021
3702a93
address comments
SongGuyang Jul 29, 2021
83c29be
fix
SongGuyang Jul 29, 2021
619d15d
address comments
SongGuyang Jul 30, 2021
ad33937
Merge branch 'master' into dev_pop_worker
SongGuyang Jul 30, 2021
041b983
fix aioredis
SongGuyang Jul 30, 2021
de26099
Revert "fix aioredis"
SongGuyang Jul 31, 2021
d77af8f
Merge branch 'master' into dev_pop_worker
SongGuyang Jul 31, 2021
fa70027
bug fix
SongGuyang Jul 31, 2021
10ea16d
fix
SongGuyang Jul 31, 2021
8d21bcf
Merge branch 'master' into dev_pop_worker
SongGuyang Aug 2, 2021
74d7bc9
fix test_step_resources test
SongGuyang Aug 6, 2021
2806900
format
SongGuyang Aug 6, 2021
3f030a3
Merge branch 'master' into dev_pop_worker
SongGuyang Aug 6, 2021
221656e
add unit test
SongGuyang Aug 6, 2021
4eca794
fix
SongGuyang Aug 6, 2021
b26a2db
add test case PopWorkerStatus
SongGuyang Aug 6, 2021
3ddbdbb
Merge branch 'master' into dev_pop_worker
SongGuyang Aug 6, 2021
80b2291
address commit
SongGuyang Aug 7, 2021
a69b9fb
fix lint
SongGuyang Aug 7, 2021
1d4090b
address comments
SongGuyang Aug 9, 2021
fd9016d
add python test
SongGuyang Aug 10, 2021
ac08ff3
Merge branch 'master' into dev_pop_worker
SongGuyang Aug 10, 2021
9421cbd
address comments
SongGuyang Aug 10, 2021
e2b1d96
make an independent function
SongGuyang Aug 10, 2021
f396382
Update test_basic_3.py
raulchen Aug 10, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@ def test_init_twice_2(call_ray_start, reset_workflow, tmp_path):
}], indirect=True)
def test_step_resources(workflow_start_regular, tmp_path):
lock_path = str(tmp_path / "lock")
# We use signal actor here because we can't guarantee the order of tasks
# sent from worker to raylet.
signal_actor = ray.test_utils.SignalActor.remote()

@workflow.step
def step_run():
ray.wait([signal_actor.send.remote()])
with FileLock(lock_path):
return None

Expand All @@ -45,6 +49,7 @@ def remote_run():
lock = FileLock(lock_path)
lock.acquire()
ret = step_run.options(num_cpus=2).step().run_async()
ray.wait([signal_actor.wait.remote()])
obj = remote_run.remote()
with pytest.raises(ray.exceptions.GetTimeoutError):
ray.get(obj, timeout=2)
Expand Down
59 changes: 59 additions & 0 deletions python/ray/tests/test_basic_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
from ray.test_utils import (
dicts_equal,
wait_for_pid_to_exit,
wait_for_condition,
)
from pathlib import Path

import ray

Expand Down Expand Up @@ -226,5 +228,62 @@ def get(self):
ray.get([a.get.remote()])


@pytest.mark.parametrize(
"ray_start_cluster", [{
"_system_config": {
"event_stats_print_interval_ms": 100,
"debug_dump_period_milliseconds": 100,
"event_stats": True
}
}],
indirect=True)
def test_worker_startup_count(ray_start_cluster):
"""Test that no extra workers started while no available cpu resources
in cluster."""

cluster = ray_start_cluster
# Cluster total cpu resources is 4.
cluster.add_node(num_cpus=4, )
ray.init(address=cluster.address)

# A slow function never returns. It will hold cpu resources all the way.
@ray.remote
def slow_function():
while True:
time.sleep(1000)

# Flood a large scale lease worker requests.
for i in range(10000):
# Random cpu resources to ensure no cache and reuse.
num_cpus = 0.24 + np.random.uniform(0, 0.01)
print(num_cpus)
slow_function.options(num_cpus=num_cpus).remote()

# Check "debug_state.txt" to ensure no extra workers were started.
session_dir = ray.worker.global_worker.node.address_info["session_dir"]
session_path = Path(session_dir)
debug_state_path = session_path / "debug_state.txt"

def get_num_workers():
with open(debug_state_path) as f:
for line in f.readlines():
num_workers_prefix = "- num PYTHON workers: "
if num_workers_prefix in line:
return int(line[len(num_workers_prefix):])
return None

# Wait for "debug_state.txt" to be updated to reflect the started worker.
start = time.time()
wait_for_condition(lambda: get_num_workers() == 16)
time_waited = time.time() - start
print(f"Waited {time_waited} for debug_state.txt to be updated")

# Check that no more workers started for a while.
for i in range(100):
num = get_num_workers()
assert num == 16
time.sleep(0.1)


if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/gcs_actor_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ void GcsActorScheduler::CancelOnLeasing(const NodeID &node_id, const ActorID &ac
const TaskID &task_id) {
// NOTE: This method will cancel the outstanding lease request and remove leasing
// information from the internal state.
RAY_LOG(DEBUG) << "Canceling worker leasing of task " << task_id;
auto node_it = node_to_actors_when_leasing_.find(node_id);
RAY_CHECK(node_it != node_to_actors_when_leasing_.end());
node_it->second.erase(actor_id);
Expand Down
15 changes: 5 additions & 10 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,6 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self
config.worker_commands,
/*starting_worker_timeout_callback=*/
[this] { cluster_task_manager_->ScheduleAndDispatchTasks(); },
/*runtime_env_setup_failed_callback=*/
[this](const TaskID &task_id) {
RAY_CHECK(cluster_task_manager_->CancelTask(
task_id, /*runtime_env_setup_failed=*/true));
},
config.ray_debugger_external,
/*get_time=*/[]() { return absl::GetCurrentTimeNanos() / 1e6; }),
client_call_manager_(io_service),
Expand Down Expand Up @@ -299,11 +294,6 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self
auto get_node_info_func = [this](const NodeID &node_id) {
return gcs_client_->Nodes().Get(node_id);
};
auto is_owner_alive = [this](const WorkerID &owner_worker_id,
const NodeID &owner_node_id) {
return !(failed_workers_cache_.count(owner_worker_id) > 0 ||
failed_nodes_cache_.count(owner_node_id) > 0);
};
auto announce_infeasible_task = [this](const RayTask &task) {
PublishInfeasibleTaskError(task);
};
Expand All @@ -320,6 +310,11 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self
"return values are greater than the remaining capacity.";
max_task_args_memory = 0;
}
auto is_owner_alive = [this](const WorkerID &owner_worker_id,
const NodeID &owner_node_id) {
return !(failed_workers_cache_.count(owner_worker_id) > 0 ||
failed_nodes_cache_.count(owner_node_id) > 0);
};
cluster_task_manager_ = std::shared_ptr<ClusterTaskManager>(new ClusterTaskManager(
self_node_id_,
std::dynamic_pointer_cast<ClusterResourceScheduler>(cluster_resource_scheduler_),
Expand Down
Loading