Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix RayTaskRunner exception handling in Prefect >= 2.6.0 #60

Merged
merged 4 commits into from
Nov 30, 2022

Conversation

rpeden
Copy link
Contributor

@rpeden rpeden commented Nov 30, 2022

In Prefect 2.6.0, exception_to_crashed_state changed from a sync function to an async function that needs to be awaited. RayTaskRunner does not currently await this call, resulting in users seeing coroutine' object has no attribute 'type' when an unhandled exception occurs while a Ray worker node is trying to unpickle and load the task.

Consequently, it is difficult to see and understand what went wrong. Looking at logs from the Ray worker node(s) that tried to execute the failed task(s) will usually show what went wrong, but not all users who can submit work to a Ray cluster will have access to individual worker node logs.

Several users have encountered this issue, as noted in this Discourse post and issue #58.

Changes in this PR:

  • The task runner now unwraps the original exception whenever possible instead of showing a generic RayTaskError
  • The task runner now awaits exception_to_crashed_state so users will see the actual exception instead of coroutine' object has no attribute 'result'
  • Added a new test to help prevent future regression to previous behavior.

Closes #58

Example

Start by reproducing the error:

  • Create a fresh Conda environment or venv
  • run pip install prefect "ray[default]"
  • Create a local Ray cluster by running ray start --head
  • Run the following flow:
from prefect import task, flow
from prefect_ray import RayTaskRunner

task_runner = RayTaskRunner(
    address="ray://localhost:10001"
)

@task()
def test_task():
    raise KeyboardInterrupt()

@flow(task_runner=task_runner)
def test_flow():
    future = test_task.submit()
    future.result(10)

if __name__ == "__main__":
    test_flow()
  • Notice the AttributeError: 'coroutine' object has no attribute 'result' error instead of the error you raised
  • Pull in the changes from this PR
  • Notice that you now see the actual error caused by the exception: ray.exceptions.TaskCancelledError: Task: TaskID(...) was cancelled

Checklist

  • This pull request references any related issue by including "Closes #<ISSUE_NUMBER>"
    • If no issue exists and your change is not a small fix, please create an issue first.
  • This pull request includes tests or only affects documentation.
  • Summarized PR's changes in CHANGELOG.md

- Added `await` when calling `exception_to_crashed_state`
- Added `RayTaskError` exception handler that unwraps the original exception. This makes `RayTaskRunner` behave similarly to other task runners and makes it easier to determine the original cause of the exception.
@rpeden rpeden marked this pull request as ready for review November 30, 2022 06:55
@rpeden rpeden added the bug Something isn't working label Nov 30, 2022
@ahuang11 ahuang11 merged commit 180571b into main Nov 30, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Flow crashes after multiple hanged tasks with "Prefect Task run TASK_RUN_ID already finished"
3 participants