Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core][aDAG] Hang when using ray before using adag #47864

Closed
rkooo567 opened this issue Sep 30, 2024 · 7 comments
Closed

[core][aDAG] Hang when using ray before using adag #47864

rkooo567 opened this issue Sep 30, 2024 · 7 comments
Assignees
Labels
beta Beta release feture bug Something that is supposed to be working; but isn't compiled-graphs core Issues that should be addressed in Ray Core P0 Issues that should be fixed in short order

Comments

@rkooo567
Copy link
Contributor

What happened + What you expected to happen

from time import perf_counter
from time import sleep
from contextlib import contextmanager
from typing import Callable

STATIC_SHAPE = False
NCCL = True

@contextmanager
def catchtime() -> Callable[[], float]:
    t1 = t2 = perf_counter() 
    yield lambda: t2 - t1
    t2 = perf_counter() 

import ray
import ray.dag
from ray.experimental.channel.torch_tensor_type import TorchTensorType

import torch

@ray.remote(num_gpus=1)
class GPUSender:
    def send(self, shape):
        return torch.rand(shape, device="cuda", dtype=torch.float32)

@ray.remote(num_gpus=1)
class GPUReceiver:
    def recv(self, tensor: torch.Tensor):
        assert tensor.device.type == "cuda"
        return tensor.shape

shape = (1000,10000)


def test_basic():
    sender = GPUSender.remote()
    receiver = GPUReceiver.remote()

    # warmup
    for _ in range(5):
        obj = sender.send.remote(shape)
        result = receiver.recv.remote(obj)
        assert ray.get(result) == shape

    with catchtime() as time:
        for _ in range(10):
            obj = sender.send.remote(shape)
            result = receiver.recv.remote(obj)
            assert ray.get(result) == shape
    print(f"Basic: {time()}")
    del sender
    del receiver

def test_dag():
    sender = GPUSender.remote()
    receiver = GPUReceiver.remote()
    with ray.dag.InputNode() as inp:
        dag = sender.send.bind(inp)
        if STATIC_SHAPE:
            assert NCCL
            dag = dag.with_type_hint(TorchTensorType(transport="nccl", _shape=shape, _dtype=torch.float32))

        else:
            if NCCL:
                dag = dag.with_type_hint(TorchTensorType(transport="nccl"))
            else:
                dag = dag.with_type_hint(TorchTensorType())
        dag = receiver.recv.bind(dag)

    # Creates a NCCL group across the participating actors. The group is destroyed during dag.teardown().
    adag = dag.experimental_compile()
    # warmup
    for _ in range(5):
        assert ray.get(adag.execute(shape)) == shape
    # Execute the DAG. Ray aDAG will orchestrate any NCCL ops.
    with catchtime() as time:
        for _ in range(10):
            assert ray.get(adag.execute(shape)) == shape
    print(f"DAG: {time()}")

if __name__ == "__main__":
    ray.init()
    test_basic()
    test_dag()
    ray.shutdown()

This hangs on A100. If I change the order of test_basic() and test_dag(), it works

Versions / Dependencies

master

Reproduction script

n/a

Issue Severity

None

@rkooo567 rkooo567 added bug Something that is supposed to be working; but isn't triage Needs triage (eg: priority, bug/not-bug, and owning component) P0 Issues that should be fixed in short order compiled-graphs labels Sep 30, 2024
@anyscalesam anyscalesam added the core Issues that should be addressed in Ray Core label Oct 4, 2024
@jjyao jjyao removed the triage Needs triage (eg: priority, bug/not-bug, and owning component) label Oct 7, 2024
@kevin85421
Copy link
Member

kevin85421 commented Oct 10, 2024

simplify the reproduction:

import ray
import ray.dag
from ray.experimental.channel.torch_tensor_type import TorchTensorType

import torch

@ray.remote(num_gpus=1)
class GPUSender:
    def send(self, shape):
        return torch.rand(shape, device="cuda", dtype=torch.float32)

@ray.remote(num_gpus=1)
class GPUReceiver:
    def recv(self, tensor: torch.Tensor):
        assert tensor.device.type == "cuda"
        return tensor.shape

shape = (1000,10000)


def test_basic():
    print("Basic start")
    sender = GPUSender.remote()
    receiver = GPUReceiver.remote()

    obj = sender.send.remote(shape)
    result = receiver.recv.remote(obj)
    assert ray.get(result) == shape
    print("Basic end")

def test_dag():
    print("DAG start")
    sender = GPUSender.remote()
    receiver = GPUReceiver.remote()
    with ray.dag.InputNode() as inp:
        dag = sender.send.bind(inp)
        dag = dag.with_type_hint(TorchTensorType(transport="nccl", _shape=shape, _dtype=torch.float32))
        dag = receiver.recv.bind(dag)

    # Creates a NCCL group across the participating actors. The group is destroyed during dag.teardown().
    adag = dag.experimental_compile()
    assert ray.get(adag.execute(shape)) == shape
    print("DAG end")

if __name__ == "__main__":
    ray.init()
    test_basic()
    test_dag()
    ray.shutdown()
  • I ran the script 10 times, and it fails due to [1] 3991255 segmentation fault (core dumped) python3 test.py 3 times.
  • I changed the order from test_basic -> test_dag to test_dag -> test_basic, and ran it 10 times, and all passed.
  • Ran test_dag only 10 times. There is 1 failure due to segmentation fault.
  • I removedray.shutdown and ran test_basic -> test_dag 10 times, and all passed.
  • I added del sender and del receiver to test_basic, and ran test_basic -> test_dag 10 times, and 7 times failed.

All failures happened after "DAG end".

Updated: I synchronized the branch with the master branch, and I need to try 10s times to reproduce the segmentation fault issue.

@kevin85421
Copy link
Member

I used valgrind to profile it. SIGSEGV is from the following:

==3950== Process terminating with default action of signal 11 (SIGSEGV): dumping core
==3950==  Access not within mapped region at address 0x10
==3950==    at 0x2DD5DD: new_threadstate (in /opt/conda/envs/ray-dev/bin/python3.9)
==3950==    by 0x1AD0A9: PyGILState_Ensure.cold (in /opt/conda/envs/ray-dev/bin/python3.9)
==3950==    by 0x6726CE9: __pyx_f_3ray_7_raylet_check_signals() (in /home/ubuntu/ray/python/ray/_raylet.so)
==3950==    by 0x6682042: std::_Function_handler<ray::Status (), ray::Status (*)()>::_M_invoke(std::_Any_data const&) (in /home/ubuntu/ray/python/ray/_raylet.so)
==3950==    by 0x69A049E: ray::core::CoreWorkerMemoryStore::GetImpl(std::vector<ray::ObjectID, std::allocator<ray::ObjectID> > const&, int, long, ray::core::WorkerContext const&, bool, std::vector<std::shared_ptr<ray::RayObject>, std::allocator<std::shared_ptr<ray::RayObject> > >*, bool) (in /home/ubuntu/ray/python/ray/_raylet.so)
==3950==    by 0x69A0F7F: ray::core::CoreWorkerMemoryStore::Get(std::vector<ray::ObjectID, std::allocator<ray::ObjectID> > const&, int, long, ray::core::WorkerContext const&, bool, std::vector<std::shared_ptr<ray::RayObject>, std::allocator<std::shared_ptr<ray::RayObject> > >*) (in /home/ubuntu/ray/python/ray/_raylet.so)
==3950==    by 0x69A1168: ray::core::CoreWorkerMemoryStore::Get(absl::lts_20230125::flat_hash_set<ray::ObjectID, absl::lts_20230125::hash_internal::Hash<ray::ObjectID>, std::equal_to<ray::ObjectID>, std::allocator<ray::ObjectID> > const&, long, ray::core::WorkerContext const&, absl::lts_20230125::flat_hash_map<ray::ObjectID, std::shared_ptr<ray::RayObject>, absl::lts_20230125::hash_internal::Hash<ray::ObjectID>, std::equal_to<ray::ObjectID>, std::allocator<std::pair<ray::ObjectID const, std::shared_ptr<ray::RayObject> > > >*, bool*) (in /home/ubuntu/ray/python/ray/_raylet.so)
==3950==    by 0x68AD641: ray::core::CoreWorker::GetObjects(std::vector<ray::ObjectID, std::allocator<ray::ObjectID> > const&, long, std::vector<std::shared_ptr<ray::RayObject>, std::allocator<std::shared_ptr<ray::RayObject> > >&) (in /home/ubuntu/ray/python/ray/_raylet.so)
==3950==    by 0x68B5C3B: ray::core::CoreWorker::Get(std::vector<ray::ObjectID, std::allocator<ray::ObjectID> > const&, long, std::vector<std::shared_ptr<ray::RayObject>, std::allocator<std::shared_ptr<ray::RayObject> > >&) (in /home/ubuntu/ray/python/ray/_raylet.so)
==3950==    by 0x67ABF6E: __pyx_pw_3ray_7_raylet_10CoreWorker_41get_objects(_object*, _object*, _object*) (in /home/ubuntu/ray/python/ray/_raylet.so)
==3950==    by 0x249344: method_vectorcall_VARARGS_KEYWORDS (in /opt/conda/envs/ray-dev/bin/python3.9)
==3950==    by 0x237605: _PyEval_EvalFrameDefault (in /opt/conda/envs/ray-dev/bin/python3.9)

@kevin85421
Copy link
Member

kevin85421 commented Oct 11, 2024

I checked core dump, and I found the segmentation fault is from ~RayLog(). I will chat with @jjyao tomorrow.

image

@kevin85421
Copy link
Member

Found another issue which is the same as the one valgrind found:

image

@rynewang suggests me to use Py_IsFinalizing() to check whether the interpreter is in process of being finalized.

@jjyao jjyao added the beta Beta release feture label Nov 12, 2024
@kevin85421
Copy link
Member

After giving it a second thought, I think I may have misunderstood the issue. I ran the script and didn't observe any "hanging" behavior as described in #47864 (comment). I ran it on my GPU devbox as the screenshot below.

The segmentation fault always occurs at ray.shutdown(), which may be resolved or alleviated by #48808.

image

I will close this issue after double check with @rkooo567 tomorrow.

@kevin85421
Copy link
Member

chatted with @rkooo567 offline. Close this issue.

@kevin85421
Copy link
Member

cc @dayshah

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
beta Beta release feture bug Something that is supposed to be working; but isn't compiled-graphs core Issues that should be addressed in Ray Core P0 Issues that should be fixed in short order
Projects
None yet
Development

No branches or pull requests

4 participants