Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconstruct failed actors without sending tasks. #5161

Merged
merged 8 commits into from
Jul 15, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions python/ray/tests/test_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import signal
import sys
import time
from pyarrow import plasma

import ray
import ray.ray_constants as ray_constants
Expand All @@ -19,6 +20,7 @@
from ray.tests.conftest import generate_internal_config_map
from ray.tests.utils import (
relevant_errors,
wait_for_contition,
wait_for_errors,
)

Expand Down Expand Up @@ -2162,6 +2164,40 @@ def get_pid(self):
ray.get(actor.increase.remote())


def test_actor_reconstruction_without_task(ray_start_regular):
"""Test a dead actor can be reconstructed without sending task to it."""

def object_exists(obj_id):
"""Check wether an object exists in plasma store."""
plasma_client = ray.worker.global_worker.plasma_client
plasma_id = plasma.ObjectID(obj_id.binary())
return plasma_client.get(
plasma_id, timeout_ms=10) != plasma.ObjectNotAvailable

@ray.remote(max_reconstructions=1)
class ReconstructableActor(object):
def __init__(self, obj_ids):
for obj_id in obj_ids:
# Every time the actor gets constructed,
# put a new object in plasma store.
if not object_exists(obj_id):
print("put", obj_id)
ray.worker.global_worker.put_object(obj_id, 1)
break

def get_pid(self):
return os.getpid()

obj_ids = [ray.ObjectID.from_random() for _ in range(2)]
actor = ReconstructableActor.remote(obj_ids)
# Kill the actor.
pid = ray.get(actor.get_pid.remote())
os.kill(pid, signal.SIGKILL)
# Wait until the actor is reconstructed.
assert wait_for_contition(
lambda: object_exists(obj_ids[1]), timeout_ms=2000)


def test_actor_reconstruction_on_node_failure(ray_start_cluster_head):
"""Test actor reconstruction when node dies unexpectedly."""
cluster = ray_start_cluster_head
Expand Down
22 changes: 22 additions & 0 deletions python/ray/tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,25 @@ def wait_for_errors(error_type, num_errors, timeout=10):
return
time.sleep(0.1)
raise Exception("Timing out of wait.")


def wait_for_contition(condition_predictor,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def wait_for_contition(condition_predictor,
def wait_for_condition(condition_predictor,

timeout_ms=1000,
retry_interval_ms=100):
"""A helper function that wait until a conition is met.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""A helper function that wait until a conition is met.
"""A helper function that waits until a condition is met.


Args:
condition_predictor: A function that predicts the condition.
timeout_ms: Maximum timeout in milliseconds.
retry_interval_ms: Retry interval in milliseconds.

Return:
Whether the condition is met within the timeout.
"""
time_elapsed = 0
while time_elapsed <= timeout_ms:
if condition_predictor():
return True
time_elapsed += retry_interval_ms
time.sleep(retry_interval_ms / 1000.0)
return False
12 changes: 10 additions & 2 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,10 @@ void NodeManager::HandleActorStateTransition(const ActorID &actor_id,
<< actor_registration.GetRemainingReconstructions();

if (actor_registration.GetState() == ActorTableData::ALIVE) {
// The actor is now alive (created for the first time or reconstructed). We can
// stop listening for the actor creation task. This is needed because we use
// `ListenAndMaybeReconstruct` to reconstruct the actor.
reconstruction_policy_.Cancel(actor_registration.GetActorCreationDependency());
// The actor's location is now known. Dequeue any methods that were
// submitted before the actor's location was known.
// (See design_docs/task_states.rst for the state transition diagram.)
Expand Down Expand Up @@ -678,6 +682,10 @@ void NodeManager::HandleActorStateTransition(const ActorID &actor_id,
} else {
RAY_CHECK(actor_registration.GetState() == ActorTableData::RECONSTRUCTING);
RAY_LOG(DEBUG) << "Actor is being reconstructed: " << actor_id;
// The actor is dead and needs reconstruction. Attempting to reconstruct its
// creation task.
reconstruction_policy_.ListenAndMaybeReconstruct(
actor_registration.GetActorCreationDependency());
// When an actor fails but can be reconstructed, resubmit all of the queued
// tasks for that actor. This will mark the tasks as waiting for actor
// creation.
Expand Down Expand Up @@ -2026,8 +2034,8 @@ void NodeManager::HandleTaskReconstruction(const TaskID &task_id) {
// The task was not in the GCS task table. It must therefore be in the
// lineage cache.
RAY_CHECK(lineage_cache_.ContainsTask(task_id))
<< "Task metadata not found in either GCS or lineage cache. It may have been "
"evicted "
<< "Metadata of task " << task_id
<< " not found in either GCS or lineage cache. It may have been evicted "
<< "by the redis LRU configuration. Consider increasing the memory "
"allocation via "
<< "ray.init(redis_max_memory=<max_memory_bytes>).";
Expand Down