Skip to content

Commit

Permalink
[core] make 'PopWorker' to be an async function (ray-project#17202)
Browse files Browse the repository at this point in the history
* make 'PopWorker' to be an async function

* pop worker async works

* fix

* address comments

* bugfix

* fix cluster_task_manager_test

* fix

* bugfix of detached actor

* address comments

* fix

* address comments

* fix aioredis

* Revert "fix aioredis"

This reverts commit 041b983.

* bug fix

* fix

* fix test_step_resources test

* format

* add unit test

* fix

* add test case PopWorkerStatus

* address commit

* fix lint

* address comments

* add python test

* address comments

* make an independent function

* Update test_basic_3.py

Co-authored-by: Hao Chen <chenh1024@gmail.com>
  • Loading branch information
2 people authored and clarkzinzow committed Aug 12, 2021
1 parent 879a954 commit b3b5016
Show file tree
Hide file tree
Showing 12 changed files with 1,143 additions and 624 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@ def test_init_twice_2(call_ray_start, reset_workflow, tmp_path):
}], indirect=True)
def test_step_resources(workflow_start_regular, tmp_path):
lock_path = str(tmp_path / "lock")
# We use signal actor here because we can't guarantee the order of tasks
# sent from worker to raylet.
signal_actor = ray.test_utils.SignalActor.remote()

@workflow.step
def step_run():
ray.wait([signal_actor.send.remote()])
with FileLock(lock_path):
return None

Expand All @@ -45,6 +49,7 @@ def remote_run():
lock = FileLock(lock_path)
lock.acquire()
ret = step_run.options(num_cpus=2).step().run_async()
ray.wait([signal_actor.wait.remote()])
obj = remote_run.remote()
with pytest.raises(ray.exceptions.GetTimeoutError):
ray.get(obj, timeout=2)
Expand Down
102 changes: 102 additions & 0 deletions python/ray/tests/test_basic_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
from ray.test_utils import (
dicts_equal,
wait_for_pid_to_exit,
wait_for_condition,
)
from pathlib import Path

import ray

Expand Down Expand Up @@ -184,5 +186,105 @@ def f():
assert len(ready) == 1000, len(ready)


def test_actor_killing(shutdown_only):
# This is to test create and kill an actor immediately
import ray
ray.init(num_cpus=1)

@ray.remote(num_cpus=1)
class Actor:
def foo(self):
return None

worker_1 = Actor.remote()
ray.kill(worker_1)
worker_2 = Actor.remote()
assert ray.get(worker_2.foo.remote()) is None
ray.kill(worker_2)

worker_1 = Actor.options(max_restarts=1).remote()
ray.kill(worker_1, no_restart=False)
assert ray.get(worker_1.foo.remote()) is None

ray.kill(worker_1, no_restart=False)
worker_2 = Actor.remote()
assert ray.get(worker_2.foo.remote()) is None


def test_actor_scheduling(shutdown_only):
ray.init()

@ray.remote
class A:
def run_fail(self):
ray.actor.exit_actor()

def get(self):
return 1

a = A.remote()
a.run_fail.remote()
with pytest.raises(Exception):
ray.get([a.get.remote()])


@pytest.mark.parametrize(
"ray_start_cluster", [{
"_system_config": {
"event_stats_print_interval_ms": 100,
"debug_dump_period_milliseconds": 100,
"event_stats": True
}
}],
indirect=True)
def test_worker_startup_count(ray_start_cluster):
"""Test that no extra workers started while no available cpu resources
in cluster."""

cluster = ray_start_cluster
# Cluster total cpu resources is 4.
cluster.add_node(num_cpus=4, )
ray.init(address=cluster.address)

# A slow function never returns. It will hold cpu resources all the way.
@ray.remote
def slow_function():
while True:
time.sleep(1000)

# Flood a large scale lease worker requests.
for i in range(10000):
# Use random cpu resources to make sure that all tasks are sent
# to the raylet. Because core worker will cache tasks with the
# same resource shape.
num_cpus = 0.24 + np.random.uniform(0, 0.01)
slow_function.options(num_cpus=num_cpus).remote()

# Check "debug_state.txt" to ensure no extra workers were started.
session_dir = ray.worker.global_worker.node.address_info["session_dir"]
session_path = Path(session_dir)
debug_state_path = session_path / "debug_state.txt"

def get_num_workers():
with open(debug_state_path) as f:
for line in f.readlines():
num_workers_prefix = "- num PYTHON workers: "
if num_workers_prefix in line:
return int(line[len(num_workers_prefix):])
return None

# Wait for "debug_state.txt" to be updated to reflect the started worker.
start = time.time()
wait_for_condition(lambda: get_num_workers() == 16)
time_waited = time.time() - start
print(f"Waited {time_waited} for debug_state.txt to be updated")

# Check that no more workers started for a while.
for i in range(100):
num = get_num_workers()
assert num == 16
time.sleep(0.1)


if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))
1 change: 1 addition & 0 deletions src/ray/gcs/gcs_server/gcs_actor_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ void GcsActorScheduler::CancelOnLeasing(const NodeID &node_id, const ActorID &ac
const TaskID &task_id) {
// NOTE: This method will cancel the outstanding lease request and remove leasing
// information from the internal state.
RAY_LOG(DEBUG) << "Canceling worker leasing of task " << task_id;
auto node_it = node_to_actors_when_leasing_.find(node_id);
RAY_CHECK(node_it != node_to_actors_when_leasing_.end());
node_it->second.erase(actor_id);
Expand Down
15 changes: 5 additions & 10 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,6 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self
config.worker_commands,
/*starting_worker_timeout_callback=*/
[this] { cluster_task_manager_->ScheduleAndDispatchTasks(); },
/*runtime_env_setup_failed_callback=*/
[this](const TaskID &task_id) {
RAY_CHECK(cluster_task_manager_->CancelTask(
task_id, /*runtime_env_setup_failed=*/true));
},
config.ray_debugger_external,
/*get_time=*/[]() { return absl::GetCurrentTimeNanos() / 1e6; }),
client_call_manager_(io_service),
Expand Down Expand Up @@ -299,11 +294,6 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self
auto get_node_info_func = [this](const NodeID &node_id) {
return gcs_client_->Nodes().Get(node_id);
};
auto is_owner_alive = [this](const WorkerID &owner_worker_id,
const NodeID &owner_node_id) {
return !(failed_workers_cache_.count(owner_worker_id) > 0 ||
failed_nodes_cache_.count(owner_node_id) > 0);
};
auto announce_infeasible_task = [this](const RayTask &task) {
PublishInfeasibleTaskError(task);
};
Expand All @@ -320,6 +310,11 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self
"return values are greater than the remaining capacity.";
max_task_args_memory = 0;
}
auto is_owner_alive = [this](const WorkerID &owner_worker_id,
const NodeID &owner_node_id) {
return !(failed_workers_cache_.count(owner_worker_id) > 0 ||
failed_nodes_cache_.count(owner_node_id) > 0);
};
cluster_task_manager_ = std::shared_ptr<ClusterTaskManager>(new ClusterTaskManager(
self_node_id_,
std::dynamic_pointer_cast<ClusterResourceScheduler>(cluster_resource_scheduler_),
Expand Down
Loading

0 comments on commit b3b5016

Please sign in to comment.