Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Unable to Initialize Comms with Multiple Dask Clients #4089

Closed
Tracked by #3256
alexbarghi-nv opened this issue Jan 11, 2024 · 2 comments
Closed
Tracked by #3256

[BUG] Unable to Initialize Comms with Multiple Dask Clients #4089

alexbarghi-nv opened this issue Jan 11, 2024 · 2 comments
Assignees
Labels
bug Something isn't working
Milestone

Comments

@alexbarghi-nv
Copy link
Member

We are currently unable to have multiple workers access a cuGraph Graph in parallel, such as in a distributed training workflow with PyTorch, because cuGraph comms (really RAFT comms) can't be initialized. This was previously working, but was found to no longer be working when debugging a different issue in a cuGraph example training workflow.

We want to support a broad variety of workflows (one of the main goals of upcoming release 24.04) so resolving this issue is critical. If possible, this should be resolved in release 24.02 so that the fix makes it into the 24.03 DLFW container, which is going to add a new round of cuGraph example workflows.

Reproducer:

import torch
import torch.distributed as dist
import torch.multiprocessing as mp

import os
os.environ['RAPIDS_NO_INITIALIZE'] = '1'
os.environ['CUDF_SPILL'] = '1'

def start_dask_cluster():
    from cugraph.testing.mg_utils import enable_spilling
    from dask_cuda import LocalCUDACluster

    cluster = LocalCUDACluster(
        protocol="tcp",
        rmm_pool_size=None,
        memory_limit=None,
    )

    from dask.distributed import Client
    client = Client(cluster)
    client.wait_for_workers(n_workers=len(cluster.workers))
    client.run(enable_spilling)

    print("Dask Cluster Setup Complete")
    return client, cluster

def init_pytorch_worker(rank, world_size):
    import cupy
    cupy.cuda.Device(rank).use()
    from rmm.allocators.cupy import rmm_cupy_allocator
    cupy.cuda.set_allocator(rmm_cupy_allocator)

    from cugraph.testing.mg_utils import enable_spilling
    enable_spilling()

    torch.cuda.set_device(rank)

    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    dist.init_process_group('nccl', rank=rank, world_size=world_size)

def main(rank, world_size, scheduler_address):
    init_pytorch_worker(rank, world_size)

    from dask.distributed import Client
    client = Client(address=scheduler_address)
    print(f'rank {rank} successfully created a dask client: {str(client)}', flush=True)

    
    import cudf
    import dask_cudf

    df = cudf.Series([1, 3, 5, 6, 7])
    ddf = dask_cudf.from_cudf(df, npartitions=3)

    assert ddf.compute().values_host.tolist() == df.values_host.tolist()
    print(f'dask-cudf test successful on rank {rank}')

    from dask.distributed import Lock
    lock = Lock('comms_init')
    dist.barrier()

    if lock.acquire(timeout=100):
        try:
            from cugraph.dask.comms import comms as Comms
            Comms.initialize(p2p=True)
            print(f'cugraph comms initialized on rank {rank}')
        finally:
            lock.release()
    else:
        raise TimeoutError("Failed to acquire lock to initialize comms")

if __name__ == '__main__':
    client, cluster = start_dask_cluster()
    print(cluster.scheduler_address)

    world_size = torch.cuda.device_count()
    mp.spawn(
        main,
        args=(world_size, cluster.scheduler_address,),
        nprocs=world_size,
        join=True,
    )

    client.close()
    cluster.close()

Output:

Dask Cluster Setup Complete
tcp://127.0.0.1:33305
rank 1 successfully created a dask client: <Client: 'tcp://127.0.0.1:33305' processes=5 threads=5>
rank 0 successfully created a dask client: <Client: 'tcp://127.0.0.1:33305' processes=5 threads=5>
rank 3 successfully created a dask client: <Client: 'tcp://127.0.0.1:33305' processes=5 threads=5>
rank 4 successfully created a dask client: <Client: 'tcp://127.0.0.1:33305' processes=5 threads=5>
rank 2 successfully created a dask client: <Client: 'tcp://127.0.0.1:33305' processes=5 threads=5>
dask-cudf test successful on rank 2
dask-cudf test successful on rank 3
dask-cudf test successful on rank 1
dask-cudf test successful on rank 0
dask-cudf test successful on rank 4
2024-01-11 12:15:59,792 - distributed.worker - WARNING - Run Failed
Function: _func_init_all
args:     (b"\x91\xae\xda\xeb\xfe\xb1A}\x83'\xbdf\xf0\xecm7", b'\xf3\xdd?\xfe(\xb3j|\x02\x00\xcb\xbd\n!\xe1\xa9\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00@\x18\x81$B\x7f\x00\x00 f\xc1\x16B\x7f\x00\x00\xb0\xd2\xf9\xedB\x7f\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00P\xba\xec\xa5WU\x00\x00\x00\x00\x00\x00V:\xbf]`\xbe\xda\xa5WU\x00\x00@\x18\x81$B\x7f\x00\x00\x00\x90\x915\xdb\x99\xee\x84\xc0\xf5C$B\x7f\x00\x00\xb0\xd2\xf9\xedB\x7f\x00', True, {'tcp://127.0.0.1:32793': {'rank': 4, 'port': 52000}, 'tcp://127.0.0.1:35165': {'rank': 1, 'port': 47041}, 'tcp://127.0.0.1:35203': {'rank': 0, 'port': 45080}, 'tcp://127.0.0.1:38909': {'rank': 3, 'port': 42314}, 'tcp://127.0.0.1:41665': {'rank': 2, 'port': 51892}}, False, 0)
kwargs:   {'dask_worker': <Worker 'tcp://127.0.0.1:32793', name: 4, status: running, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>}
Traceback (most recent call last):
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/asyncio/tasks.py", line 456, in wait_for
    return fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/distributed/worker.py", line 3182, in run
    result = await function(*args, **kwargs)
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/raft_dask/common/comms.py", line 463, in _func_init_all
    await _func_ucp_create_endpoints(
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/raft_dask/common/comms.py", line 632, in _func_ucp_create_endpoints
    ep = await get_ucx(dask_worker=dask_worker).get_endpoint(
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/raft_dask/common/ucx.py", line 69, in get_endpoint
    ep = await self._create_endpoint(ip, port)
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/raft_dask/common/ucx.py", line 60, in _create_endpoint
    ep = await ucp.create_endpoint(ip, port)
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/ucp/core.py", line 1016, in create_endpoint
    return await _get_ctx().create_endpoint(
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/ucp/core.py", line 328, in create_endpoint
    peer_info = await exchange_peer_info(
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/ucp/core.py", line 60, in exchange_peer_info
    await asyncio.wait_for(
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/asyncio/tasks.py", line 458, in wait_for
    raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError
2024-01-11 12:15:59,791 - distributed.worker - WARNING - Run Failed
Function: _func_init_all
args:     (b"\x91\xae\xda\xeb\xfe\xb1A}\x83'\xbdf\xf0\xecm7", b'\xf3\xdd?\xfe(\xb3j|\x02\x00\xcb\xbd\n!\xe1\xa9\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00@\x18\x81$B\x7f\x00\x00 f\xc1\x16B\x7f\x00\x00\xb0\xd2\xf9\xedB\x7f\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00P\xba\xec\xa5WU\x00\x00\x00\x00\x00\x00V:\xbf]`\xbe\xda\xa5WU\x00\x00@\x18\x81$B\x7f\x00\x00\x00\x90\x915\xdb\x99\xee\x84\xc0\xf5C$B\x7f\x00\x00\xb0\xd2\xf9\xedB\x7f\x00', True, {'tcp://127.0.0.1:32793': {'rank': 4, 'port': 52000}, 'tcp://127.0.0.1:35165': {'rank': 1, 'port': 47041}, 'tcp://127.0.0.1:35203': {'rank': 0, 'port': 45080}, 'tcp://127.0.0.1:38909': {'rank': 3, 'port': 42314}, 'tcp://127.0.0.1:41665': {'rank': 2, 'port': 51892}}, False, 0)
kwargs:   {'dask_worker': <Worker 'tcp://127.0.0.1:35165', name: 1, status: running, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>}
Traceback (most recent call last):
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/asyncio/tasks.py", line 456, in wait_for
    return fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/distributed/worker.py", line 3182, in run
    result = await function(*args, **kwargs)
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/raft_dask/common/comms.py", line 463, in _func_init_all
    await _func_ucp_create_endpoints(
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/raft_dask/common/comms.py", line 632, in _func_ucp_create_endpoints
    ep = await get_ucx(dask_worker=dask_worker).get_endpoint(
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/raft_dask/common/ucx.py", line 69, in get_endpoint
    ep = await self._create_endpoint(ip, port)
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/raft_dask/common/ucx.py", line 60, in _create_endpoint
    ep = await ucp.create_endpoint(ip, port)
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/ucp/core.py", line 1016, in create_endpoint
    return await _get_ctx().create_endpoint(
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/ucp/core.py", line 328, in create_endpoint
    peer_info = await exchange_peer_info(
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/ucp/core.py", line 60, in exchange_peer_info
    await asyncio.wait_for(
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/asyncio/tasks.py", line 458, in wait_for
    raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError
Task exception was never retrieved
future: <Task finished name='Task-3103' coro=<_listener_handler_coroutine() done, defined at /home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/ucp/core.py:140> exception=TimeoutError()>
Traceback (most recent call last):
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/asyncio/tasks.py", line 456, in wait_for
    return fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/ucp/core.py", line 155, in _listener_handler_coroutine
    peer_info = await exchange_peer_info(
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/ucp/core.py", line 51, in exchange_peer_info
    await asyncio.wait_for(
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/asyncio/tasks.py", line 458, in wait_for
    raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError
Traceback (most recent call last):
  File "/home/nfs/abarghi/timeout_repro.py", line 78, in <module>
    mp.spawn(
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/torch/multiprocessing/spawn.py", line 239, in spawn
    return start_processes(fn, args, nprocs, join, daemon, start_method='spawn')
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/torch/multiprocessing/spawn.py", line 197, in start_processes
    while not context.join():
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/torch/multiprocessing/spawn.py", line 160, in join
    raise ProcessRaisedException(msg, error_index, failed_process.pid)
torch.multiprocessing.spawn.ProcessRaisedException: 

-- Process 1 terminated with the following error:
Traceback (most recent call last):
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/asyncio/tasks.py", line 456, in wait_for
    return fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/torch/multiprocessing/spawn.py", line 69, in _wrap
    fn(i, *args)
  File "/home/nfs/abarghi/timeout_repro.py", line 66, in main
    Comms.initialize(p2p=True)
  File "/home/nfs/abarghi/cugraph6/python/cugraph/cugraph/dask/comms/comms.py", line 152, in initialize
    __instance.init()
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/raft_dask/common/comms.py", line 201, in init
    self.client.run(
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/distributed/client.py", line 2998, in run
    return self.sync(
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/distributed/utils.py", line 358, in sync
    return sync(
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/distributed/utils.py", line 434, in sync
    raise error
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/distributed/utils.py", line 408, in f
    result = yield future
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/tornado/gen.py", line 767, in run
    value = future.result()
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/distributed/client.py", line 2903, in _run
    raise exc
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/raft_dask/common/comms.py", line 463, in _func_init_all
    await _func_ucp_create_endpoints(
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/raft_dask/common/comms.py", line 632, in _func_ucp_create_endpoints
    ep = await get_ucx(dask_worker=dask_worker).get_endpoint(
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/raft_dask/common/ucx.py", line 69, in get_endpoint
    ep = await self._create_endpoint(ip, port)
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/raft_dask/common/ucx.py", line 60, in _create_endpoint
    ep = await ucp.create_endpoint(ip, port)
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/ucp/core.py", line 1016, in create_endpoint
    return await _get_ctx().create_endpoint(
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/ucp/core.py", line 328, in create_endpoint
    peer_info = await exchange_peer_info(
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/ucp/core.py", line 60, in exchange_peer_info
    await asyncio.wait_for(
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/asyncio/tasks.py", line 458, in wait_for
    raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError

2024-01-11 12:16:06,160 - distributed.nanny - WARNING - Worker process still alive after 3.199997253417969 seconds, killing
2024-01-11 12:16:06,160 - distributed.nanny - WARNING - Worker process still alive after 3.199999389648438 seconds, killing
2024-01-11 12:16:06,161 - distributed.nanny - WARNING - Worker process still alive after 3.1999995422363288 seconds, killing
2024-01-11 12:16:06,161 - distributed.nanny - WARNING - Worker process still alive after 3.1999995422363288 seconds, killing
2024-01-11 12:16:06,161 - distributed.nanny - WARNING - Worker process still alive after 3.199999389648438 seconds, killing
2024-01-11 12:16:06,497 - distributed.scheduler - ERROR - broadcast to tcp://127.0.0.1:32793 failed: CommClosedError: in <TCP (closed) Scheduler Broadcast local=tcp://127.0.0.1:33174 remote=tcp://127.0.0.1:32793>: Stream is closed
2024-01-11 12:16:06,533 - distributed.scheduler - ERROR - broadcast to tcp://127.0.0.1:35165 failed: CommClosedError: in <TCP (closed) Scheduler Broadcast local=tcp://127.0.0.1:60924 remote=tcp://127.0.0.1:35165>: Stream is closed
2024-01-11 12:16:06,960 - tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x7f4215483010>>, <Task finished name='Task-12445' coro=<SpecCluster._correct_state_internal() done, defined at /home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/distributed/deploy/spec.py:346> exception=TimeoutError()>)
Traceback (most recent call last):
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/asyncio/tasks.py", line 456, in wait_for
    return fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/tornado/ioloop.py", line 738, in _run_callback
    ret = callback()
  File "/home/nfs/abarghi/miniconda3/envs/rapids/lib/python3.10/site-packages/tornado/ioloop.py", line 762, in _discard_future_result
    future.result()
asyncio.exceptions.TimeoutError
2024-01-11 12:16:06,998 - distributed.process - WARNING - [<AsyncProcess Dask Worker process (from Nanny)>] process 443571 exit status was already read will report exitcode 255
2024-01-11 12:16:07,070 - distributed.process - WARNING - [<AsyncProcess Dask Worker process (from Nanny)>] process 443558 exit status was already read will report exitcode 255
@alexbarghi-nv alexbarghi-nv added the bug Something isn't working label Jan 11, 2024
@alexbarghi-nv alexbarghi-nv added this to the 24.02 milestone Jan 11, 2024
@alexbarghi-nv alexbarghi-nv self-assigned this Jan 11, 2024
@rlratzel
Copy link
Contributor

@alexbarghi-nv
Copy link
Member Author

Not planned; no longer using dask in this way in the GNN packages.

@alexbarghi-nv alexbarghi-nv closed this as not planned Won't fix, can't repro, duplicate, stale Apr 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants