Skip to content

Commit

Permalink
[core] Task backend - Add worker died info to failed tasks when job e…
Browse files Browse the repository at this point in the history
…xits. (#34166)

This adds the additional error_type + error_message info to non-terminal tasks (not finished and not failed) when a job exits.
  • Loading branch information
rickyyx authored Apr 11, 2023
1 parent 3c22ad6 commit d1e7629
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 16 deletions.
3 changes: 3 additions & 0 deletions python/ray/tests/test_task_events_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,9 @@ def verify():
task["state"] == "FAILED"
), f"task {task['func_or_class_name']} has wrong state"

assert task["error_type"] == "WORKER_DIED"
assert "Job finishes" in task["error_message"]

duration_ms = task["end_time_ms"] - task["start_time_ms"]
assert (
# It takes time for the job to run
Expand Down
32 changes: 22 additions & 10 deletions src/ray/gcs/gcs_server/gcs_task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,15 @@ const rpc::TaskEvents &GcsTaskManager::GcsTaskManagerStorage::GetTaskEvent(
}

void GcsTaskManager::GcsTaskManagerStorage::MarkTaskAttemptFailed(
const TaskAttempt &task_attempt, int64_t failed_ts) {
const TaskAttempt &task_attempt,
int64_t failed_ts,
const rpc::RayErrorInfo &error_info) {
auto &task_event = GetTaskEvent(task_attempt);
if (!task_event.has_state_updates()) {
return;
}
task_event.mutable_state_updates()->set_failed_ts(failed_ts);
// We could mark the task as failed even if might not have state updates yet (i.e. only
// profiling events are reported).
auto state_updates = task_event.mutable_state_updates();
state_updates->set_failed_ts(failed_ts);
state_updates->mutable_error_info()->CopyFrom(error_info);
}

bool GcsTaskManager::GcsTaskManagerStorage::IsTaskTerminated(
Expand All @@ -159,18 +162,25 @@ absl::optional<int64_t> GcsTaskManager::GcsTaskManagerStorage::GetTaskStatusUpda
: absl::nullopt;
}

void GcsTaskManager::GcsTaskManagerStorage::MarkTasksFailed(const JobID &job_id,
int64_t job_finish_time_ns) {
void GcsTaskManager::GcsTaskManagerStorage::MarkTasksFailedOnJobEnds(
const JobID &job_id, int64_t job_finish_time_ns) {
auto task_attempts_itr = job_to_task_attempt_index_.find(job_id);
if (task_attempts_itr == job_to_task_attempt_index_.end()) {
// No tasks in the job.
return;
}

rpc::RayErrorInfo error_info;
error_info.set_error_type(rpc::ErrorType::WORKER_DIED);
std::stringstream error_message;
error_message << "Job finishes (" << job_id.Hex()
<< ") as driver exits. Marking all non-terminal tasks as failed.";
error_info.set_error_message(error_message.str());

// Iterate all task attempts from the job.
for (const auto &task_attempt : task_attempts_itr->second) {
if (!IsTaskTerminated(task_attempt.first)) {
MarkTaskAttemptFailed(task_attempt, job_finish_time_ns);
MarkTaskAttemptFailed(task_attempt, job_finish_time_ns, error_info);
}
}
}
Expand All @@ -181,7 +191,8 @@ void GcsTaskManager::GcsTaskManagerStorage::MarkTaskFailed(const TaskID &task_id
if (!latest_task_attempt.has_value()) {
return;
}
MarkTaskAttemptFailed(*latest_task_attempt, failed_ts);
// TODO(rickyx): we will fix it in the next PR.
MarkTaskAttemptFailed(*latest_task_attempt, failed_ts, rpc::RayErrorInfo());
}

void GcsTaskManager::GcsTaskManagerStorage::MarkTaskTreeFailedIfNeeded(
Expand Down Expand Up @@ -515,7 +526,8 @@ void GcsTaskManager::OnJobFinished(const JobID &job_id, int64_t job_finish_time_
}
// If there are any non-terminated tasks from the job, mark them failed since all
// workers associated with the job will be killed.
task_event_storage_->MarkTasksFailed(job_id, job_finish_time_ms * 1000 * 1000);
task_event_storage_->MarkTasksFailedOnJobEnds(job_id,
job_finish_time_ms * 1000 * 1000);
});
}

Expand Down
9 changes: 6 additions & 3 deletions src/ray/gcs/gcs_server/gcs_task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,12 @@ class GcsTaskManager : public rpc::TaskInfoHandler {
std::vector<rpc::TaskEvents> GetTaskEvents(
const absl::flat_hash_set<TaskAttempt> &task_attempts) const;

/// Mark tasks from a job as failed.
/// Mark tasks from a job as failed as job ends with a delay.
///
/// \param job_id Job ID
/// \param job_finish_time_ns job finished time in nanoseconds, which will be the task
/// failed time.
void MarkTasksFailed(const JobID &job_id, int64_t job_finish_time_ns);
void MarkTasksFailedOnJobEnds(const JobID &job_id, int64_t job_finish_time_ns);

private:
/// Mark the task tree containing this task attempt as failure if necessary.
Expand Down Expand Up @@ -254,7 +254,10 @@ class GcsTaskManager : public rpc::TaskInfoHandler {
///
/// \param task_attempt Task attempt.
/// \param failed_ts The failure timestamp.
void MarkTaskAttemptFailed(const TaskAttempt &task_attempt, int64_t failed_ts);
/// \param error_info The error info.
void MarkTaskAttemptFailed(const TaskAttempt &task_attempt,
int64_t failed_ts,
const rpc::RayErrorInfo &error_info);

/// Get the latest task attempt for the task.
///
Expand Down
18 changes: 15 additions & 3 deletions src/ray/gcs/gcs_server/test/gcs_task_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,15 @@ class GcsTaskManagerTest : public ::testing::Test {
const std::vector<TaskID> &tasks,
const std::vector<std::pair<rpc::TaskStatus, int64_t>> &status_timestamps,
const TaskID &parent_task_id = TaskID::Nil(),
int job_id = 0) {
int job_id = 0,
absl::optional<rpc::RayErrorInfo> error_info = absl::nullopt) {
auto events = GenTaskEvents(tasks,
/* attempt_number */ 0,
/* job_id */ job_id,
/* profile event */ absl::nullopt,
GenStateUpdate(status_timestamps),
GenTaskInfo(JobID::FromInt(job_id), parent_task_id));
GenTaskInfo(JobID::FromInt(job_id), parent_task_id),
error_info);
auto events_data = Mocker::GenTaskEventsData(events);
SyncAddTaskEventData(events_data);
}
Expand Down Expand Up @@ -183,7 +185,8 @@ class GcsTaskManagerTest : public ::testing::Test {
int32_t job_id = 0,
absl::optional<rpc::ProfileEvents> profile_events = absl::nullopt,
absl::optional<rpc::TaskStateUpdate> state_update = absl::nullopt,
absl::optional<rpc::TaskInfoEntry> task_info = absl::nullopt) {
absl::optional<rpc::TaskInfoEntry> task_info = absl::nullopt,
absl::optional<rpc::RayErrorInfo> error_info = absl::nullopt) {
std::vector<rpc::TaskEvents> result;
for (auto const &task_id : task_ids) {
rpc::TaskEvents events;
Expand All @@ -195,6 +198,10 @@ class GcsTaskManagerTest : public ::testing::Test {
events.mutable_state_updates()->CopyFrom(*state_update);
}

if (error_info.has_value()) {
events.mutable_state_updates()->mutable_error_info()->CopyFrom(*error_info);
}

if (profile_events.has_value()) {
auto new_events = events.mutable_profile_events();
new_events->CopyFrom(*profile_events);
Expand Down Expand Up @@ -571,6 +578,11 @@ TEST_F(GcsTaskManagerTest, TestJobFinishesFailAllRunningTasks) {
EXPECT_EQ(reply.events_by_task_size(), 10);
for (const auto &task_event : reply.events_by_task()) {
EXPECT_EQ(task_event.state_updates().failed_ts(), /* 5 ms to ns */ 5 * 1000 * 1000);
EXPECT_TRUE(task_event.state_updates().has_error_info());
EXPECT_TRUE(task_event.state_updates().error_info().error_type() ==
rpc::ErrorType::WORKER_DIED);
EXPECT_TRUE(task_event.state_updates().error_info().error_message().find(
"Job finishes") != std::string::npos);
}
}

Expand Down

0 comments on commit d1e7629

Please sign in to comment.