Skip to content

Commit

Permalink
Revert "[core] make 'PopWorker' to be an async function (#17202)"
Browse files Browse the repository at this point in the history
This reverts commit 63c15d7.
  • Loading branch information
wuisawesome authored Aug 17, 2021
1 parent 880797d commit 74fe7c0
Show file tree
Hide file tree
Showing 12 changed files with 624 additions and 1,101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,9 @@ def test_init_twice_2(call_ray_start, reset_workflow, tmp_path):
}], indirect=True)
def test_step_resources(workflow_start_regular, tmp_path):
lock_path = str(tmp_path / "lock")
# We use signal actor here because we can't guarantee the order of tasks
# sent from worker to raylet.
signal_actor = ray.test_utils.SignalActor.remote()

@workflow.step
def step_run():
ray.wait([signal_actor.send.remote()])
with FileLock(lock_path):
return None

Expand All @@ -49,7 +45,6 @@ def remote_run():
lock = FileLock(lock_path)
lock.acquire()
ret = step_run.options(num_cpus=2).step().run_async()
ray.wait([signal_actor.wait.remote()])
obj = remote_run.remote()
with pytest.raises(ray.exceptions.GetTimeoutError):
ray.get(obj, timeout=2)
Expand Down
60 changes: 0 additions & 60 deletions python/ray/tests/test_basic_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@
from ray.test_utils import (
dicts_equal,
wait_for_pid_to_exit,
wait_for_condition,
)
from pathlib import Path

import ray

Expand Down Expand Up @@ -228,63 +226,5 @@ def get(self):
ray.get([a.get.remote()])


@pytest.mark.parametrize(
"ray_start_cluster", [{
"_system_config": {
"event_stats_print_interval_ms": 100,
"debug_dump_period_milliseconds": 100,
"event_stats": True
}
}],
indirect=True)
def test_worker_startup_count(ray_start_cluster):
"""Test that no extra workers started while no available cpu resources
in cluster."""

cluster = ray_start_cluster
# Cluster total cpu resources is 4.
cluster.add_node(num_cpus=4, )
ray.init(address=cluster.address)

# A slow function never returns. It will hold cpu resources all the way.
@ray.remote
def slow_function():
while True:
time.sleep(1000)

# Flood a large scale lease worker requests.
for i in range(10000):
# Use random cpu resources to make sure that all tasks are sent
# to the raylet. Because core worker will cache tasks with the
# same resource shape.
num_cpus = 0.24 + np.random.uniform(0, 0.01)
slow_function.options(num_cpus=num_cpus).remote()

# Check "debug_state.txt" to ensure no extra workers were started.
session_dir = ray.worker.global_worker.node.address_info["session_dir"]
session_path = Path(session_dir)
debug_state_path = session_path / "debug_state.txt"

def get_num_workers():
with open(debug_state_path) as f:
for line in f.readlines():
num_workers_prefix = "- num PYTHON workers: "
if num_workers_prefix in line:
return int(line[len(num_workers_prefix):])
return None

# Wait for "debug_state.txt" to be updated to reflect the started worker.
start = time.time()
wait_for_condition(lambda: get_num_workers() == 16)
time_waited = time.time() - start
print(f"Waited {time_waited} for debug_state.txt to be updated")

# Check that no more workers started for a while.
for i in range(100):
num = get_num_workers()
assert num == 16
time.sleep(0.1)


if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))
1 change: 0 additions & 1 deletion src/ray/gcs/gcs_server/gcs_actor_scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ void GcsActorScheduler::CancelOnLeasing(const NodeID &node_id, const ActorID &ac
const TaskID &task_id) {
// NOTE: This method will cancel the outstanding lease request and remove leasing
// information from the internal state.
RAY_LOG(DEBUG) << "Canceling worker leasing of task " << task_id;
auto node_it = node_to_actors_when_leasing_.find(node_id);
RAY_CHECK(node_it != node_to_actors_when_leasing_.end());
node_it->second.erase(actor_id);
Expand Down
15 changes: 10 additions & 5 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self
config.worker_commands,
/*starting_worker_timeout_callback=*/
[this] { cluster_task_manager_->ScheduleAndDispatchTasks(); },
/*runtime_env_setup_failed_callback=*/
[this](const TaskID &task_id) {
RAY_CHECK(cluster_task_manager_->CancelTask(
task_id, /*runtime_env_setup_failed=*/true));
},
config.ray_debugger_external,
/*get_time=*/[]() { return absl::GetCurrentTimeNanos() / 1e6; }),
client_call_manager_(io_service),
Expand Down Expand Up @@ -294,6 +299,11 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self
auto get_node_info_func = [this](const NodeID &node_id) {
return gcs_client_->Nodes().Get(node_id);
};
auto is_owner_alive = [this](const WorkerID &owner_worker_id,
const NodeID &owner_node_id) {
return !(failed_workers_cache_.count(owner_worker_id) > 0 ||
failed_nodes_cache_.count(owner_node_id) > 0);
};
auto announce_infeasible_task = [this](const RayTask &task) {
PublishInfeasibleTaskError(task);
};
Expand All @@ -310,11 +320,6 @@ NodeManager::NodeManager(instrumented_io_context &io_service, const NodeID &self
"return values are greater than the remaining capacity.";
max_task_args_memory = 0;
}
auto is_owner_alive = [this](const WorkerID &owner_worker_id,
const NodeID &owner_node_id) {
return !(failed_workers_cache_.count(owner_worker_id) > 0 ||
failed_nodes_cache_.count(owner_node_id) > 0);
};
cluster_task_manager_ = std::shared_ptr<ClusterTaskManager>(new ClusterTaskManager(
self_node_id_,
std::dynamic_pointer_cast<ClusterResourceScheduler>(cluster_resource_scheduler_),
Expand Down
Loading

0 comments on commit 74fe7c0

Please sign in to comment.