Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to track RMM allocations #842

Merged
merged 14 commits into from
Feb 25, 2022
10 changes: 10 additions & 0 deletions dask_cuda/cli/dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@
Logging will only be enabled if ``--rmm-pool-size`` or ``--rmm-managed-memory``
are specified.""",
)
@click.option(
"--rmm-track-allocations/--no-rmm-track-allocations",
default=False,
show_default=True,
help="""Track memory allocations made by RMM. If ``True``, wraps the memory
resource of each worker with a ``rmm.mr.TrackingResourceAdaptor`` that
allows querying the amount of memory allocated by RMM.""",
)
@click.option(
"--pid-file", type=str, default="", help="File to write the process PID.",
)
Expand Down Expand Up @@ -281,6 +289,7 @@ def main(
rmm_managed_memory,
rmm_async,
rmm_log_directory,
rmm_track_allocations,
pid_file,
resources,
dashboard,
Expand Down Expand Up @@ -332,6 +341,7 @@ def main(
rmm_managed_memory,
rmm_async,
rmm_log_directory,
rmm_track_allocations,
pid_file,
resources,
dashboard,
Expand Down
2 changes: 2 additions & 0 deletions dask_cuda/cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def __init__(
rmm_managed_memory=False,
rmm_async=False,
rmm_log_directory=None,
rmm_track_allocations=False,
pid_file=None,
resources=None,
dashboard=True,
Expand Down Expand Up @@ -223,6 +224,7 @@ def del_pid_file():
rmm_managed_memory,
rmm_async,
rmm_log_directory,
rmm_track_allocations,
),
PreImport(pre_import),
},
Expand Down
13 changes: 13 additions & 0 deletions dask_cuda/local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ class LocalCUDACluster(LocalCluster):
.. note::
Logging will only be enabled if ``rmm_pool_size`` is specified or
``rmm_managed_memory=True``.
rmm_track_allocations : bool, default False
If True, wraps the memory resource used by each worker with a
``rmm.mr.TrackingResourceAdaptor``, which tracks the amount of
memory allocated.

.. note::
This option enables additional diagnostics to be collected and
reported by the Dask dashboard. However, there is significant overhead
associated with this and it should only be used for debugging and
memory profiling.
jit_unspill : bool or None, default None
Enable just-in-time unspilling. Can be a boolean or ``None`` to fall back on
the value of ``dask.jit-unspill`` in the local Dask configuration, disabling
Expand Down Expand Up @@ -195,6 +205,7 @@ def __init__(
rmm_managed_memory=False,
rmm_async=False,
rmm_log_directory=None,
rmm_track_allocations=False,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add docstrings for the new parameter in here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

jit_unspill=None,
log_spilling=False,
worker_class=None,
Expand Down Expand Up @@ -258,6 +269,7 @@ def __init__(
)

self.rmm_log_directory = rmm_log_directory
self.rmm_track_allocations = rmm_track_allocations

if not kwargs.pop("processes", True):
raise ValueError(
Expand Down Expand Up @@ -377,6 +389,7 @@ def new_worker_spec(self):
self.rmm_managed_memory,
self.rmm_async,
self.rmm_log_directory,
self.rmm_track_allocations,
),
PreImport(self.pre_import),
},
Expand Down
31 changes: 31 additions & 0 deletions dask_cuda/tests/test_dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,34 @@ def test_cuda_visible_devices_uuid(loop): # noqa: F811

result = client.run(lambda: os.environ["CUDA_VISIBLE_DEVICES"])
assert list(result.values())[0] == gpu_uuid


def test_rmm_track_allocations(loop): # noqa: F811
rmm = pytest.importorskip("rmm")
with popen(["dask-scheduler", "--port", "9369", "--no-dashboard"]):
with popen(
[
"dask-cuda-worker",
"127.0.0.1:9369",
"--host",
"127.0.0.1",
"--rmm-pool-size",
"2 GB",
"--no-dashboard",
"--rmm-track-allocations",
]
):
with Client("127.0.0.1:9369", loop=loop) as client:
assert wait_workers(client, n_gpus=get_n_gpus())

memory_resource_type = client.run(
rmm.mr.get_current_device_resource_type
)
for v in memory_resource_type.values():
assert v is rmm.mr.TrackingResourceAdaptor

memory_resource_upstream_type = client.run(
lambda: type(rmm.mr.get_current_device_resource().upstream_mr)
)
for v in memory_resource_upstream_type.values():
assert v is rmm.mr.PoolMemoryResource
Comment on lines +295 to +323
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given dask-cuda-worker and LocalCUDACluster have somewhat different control paths, could you add the same test for LocalCUDACluster in https://github.com/rapidsai/dask-cuda/blob/branch-22.04/dask_cuda/tests/test_local_cuda_cluster.py ? Should be fairly straightforward, the inner logic will be the same, just the cluster setup will be slightly different.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I added that.

20 changes: 20 additions & 0 deletions dask_cuda/tests/test_local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,3 +289,23 @@ async def test_gpu_uuid():

result = await client.run(lambda: os.environ["CUDA_VISIBLE_DEVICES"])
assert list(result.values())[0] == gpu_uuid


@gen_test(timeout=20)
async def test_rmm_track_allocations():
rmm = pytest.importorskip("rmm")
async with LocalCUDACluster(
rmm_pool_size="2GB", asynchronous=True, rmm_track_allocations=True
) as cluster:
async with Client(cluster, asynchronous=True) as client:
memory_resource_type = await client.run(
rmm.mr.get_current_device_resource_type
)
for v in memory_resource_type.values():
assert v is rmm.mr.TrackingResourceAdaptor

memory_resource_upstream_type = await client.run(
lambda: type(rmm.mr.get_current_device_resource().upstream_mr)
)
for v in memory_resource_upstream_type.values():
assert v is rmm.mr.PoolMemoryResource
7 changes: 7 additions & 0 deletions dask_cuda/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def __init__(
managed_memory,
async_alloc,
log_directory,
track_allocations,
):
if initial_pool_size is None and maximum_pool_size is not None:
raise ValueError(
Expand All @@ -56,6 +57,7 @@ def __init__(
self.async_alloc = async_alloc
self.logging = log_directory is not None
self.log_directory = log_directory
self.rmm_track_allocations = track_allocations

def setup(self, worker=None):
if self.async_alloc:
pentschev marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -83,6 +85,11 @@ def setup(self, worker=None):
worker, self.logging, self.log_directory
),
)
if self.rmm_track_allocations:
shwina marked this conversation as resolved.
Show resolved Hide resolved
import rmm

mr = rmm.mr.get_current_device_resource()
rmm.mr.set_current_device_resource(rmm.mr.TrackingResourceAdaptor(mr))


class PreImport:
Expand Down