Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to track RMM allocations #842

Merged
merged 14 commits into from
Feb 25, 2022
10 changes: 10 additions & 0 deletions dask_cuda/cli/dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@
Logging will only be enabled if ``--rmm-pool-size`` or ``--rmm-managed-memory``
are specified.""",
)
@click.option(
"--rmm-track-allocations/--no-rmm-track-allocations",
default=False,
show_default=True,
help="""Track memory allocations made by RMM. If ``True``, wraps the memory
resource of each worker with a ``rmm.mr.TrackingResourceAdaptor`` that
allows querying the amount of memory allocated by RMM.""",
)
@click.option(
"--pid-file", type=str, default="", help="File to write the process PID.",
)
Expand Down Expand Up @@ -293,6 +301,7 @@ def main(
rmm_managed_memory,
rmm_async,
rmm_log_directory,
rmm_track_allocations,
pid_file,
resources,
dashboard,
Expand Down Expand Up @@ -344,6 +353,7 @@ def main(
rmm_managed_memory,
rmm_async,
rmm_log_directory,
rmm_track_allocations,
pid_file,
resources,
dashboard,
Expand Down
2 changes: 2 additions & 0 deletions dask_cuda/cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def __init__(
rmm_managed_memory=False,
rmm_async=False,
rmm_log_directory=None,
rmm_track_allocations=False,
pid_file=None,
resources=None,
dashboard=True,
Expand Down Expand Up @@ -248,6 +249,7 @@ def del_pid_file():
rmm_managed_memory,
rmm_async,
rmm_log_directory,
rmm_track_allocations,
),
},
name=name if nprocs == 1 or name is None else str(name) + "-" + str(i),
Expand Down
3 changes: 3 additions & 0 deletions dask_cuda/local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ def __init__(
rmm_managed_memory=False,
rmm_async=False,
rmm_log_directory=None,
rmm_track_allocations=False,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add docstrings for the new parameter in here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

jit_unspill=None,
log_spilling=False,
worker_class=None,
Expand Down Expand Up @@ -272,6 +273,7 @@ def __init__(
)

self.rmm_log_directory = rmm_log_directory
self.rmm_track_allocations = rmm_track_allocations

if not kwargs.pop("processes", True):
raise ValueError(
Expand Down Expand Up @@ -415,6 +417,7 @@ def new_worker_spec(self):
self.rmm_managed_memory,
self.rmm_async,
self.rmm_log_directory,
self.rmm_track_allocations,
),
},
}
Expand Down
31 changes: 31 additions & 0 deletions dask_cuda/tests/test_dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,34 @@ def test_cuda_visible_devices_uuid(loop): # noqa: F811

result = client.run(lambda: os.environ["CUDA_VISIBLE_DEVICES"])
assert list(result.values())[0] == gpu_uuid


def test_rmm_track_allocations(loop): # noqa: F811
rmm = pytest.importorskip("rmm")
with popen(["dask-scheduler", "--port", "9369", "--no-dashboard"]):
with popen(
[
"dask-cuda-worker",
"127.0.0.1:9369",
"--host",
"127.0.0.1",
"--rmm-pool-size",
"2 GB",
"--no-dashboard",
"--rmm-track-allocations",
]
):
with Client("127.0.0.1:9369", loop=loop) as client:
assert wait_workers(client, n_gpus=get_n_gpus())

memory_resource_type = client.run(
rmm.mr.get_current_device_resource_type
)
for v in memory_resource_type.values():
assert v is rmm.mr.TrackingResourceAdaptor

memory_resource_upstream_type = client.run(
lambda: type(rmm.mr.get_current_device_resource().upstream_mr)
)
for v in memory_resource_upstream_type.values():
assert v is rmm.mr.PoolMemoryResource
Comment on lines +295 to +323
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given dask-cuda-worker and LocalCUDACluster have somewhat different control paths, could you add the same test for LocalCUDACluster in https://github.com/rapidsai/dask-cuda/blob/branch-22.04/dask_cuda/tests/test_local_cuda_cluster.py ? Should be fairly straightforward, the inner logic will be the same, just the cluster setup will be slightly different.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I added that.

9 changes: 7 additions & 2 deletions dask_cuda/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def __init__(
managed_memory,
async_alloc,
log_directory,
track_allocations,
):
if initial_pool_size is None and maximum_pool_size is not None:
raise ValueError(
Expand All @@ -65,10 +66,12 @@ def __init__(
self.async_alloc = async_alloc
self.logging = log_directory is not None
self.log_directory = log_directory
self.rmm_track_allocations = track_allocations

def setup(self, worker=None):
import rmm
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we safe to always import RMM here? If this function is always run as part of the RMMSetup plugin, wouldn't that make RMM an implicit dependency for starting a CUDA cluster?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch @charlesbluca , indeed I failed to realize that. Yes, we should remove it from the "main" context of setup and leave it solely within the local contexts that exist today, iff one of those options to enable RMM is set by the user.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks both! I've accepted @pentschev's suggestions here.


if self.async_alloc:
shwina marked this conversation as resolved.
Show resolved Hide resolved
pentschev marked this conversation as resolved.
Show resolved Hide resolved
import rmm

rmm.mr.set_current_device_resource(rmm.mr.CudaAsyncMemoryResource())
if self.logging:
Expand All @@ -78,7 +81,6 @@ def setup(self, worker=None):
)
)
elif self.initial_pool_size is not None or self.managed_memory:
pentschev marked this conversation as resolved.
Show resolved Hide resolved
import rmm

pool_allocator = False if self.initial_pool_size is None else True

Expand All @@ -92,6 +94,9 @@ def setup(self, worker=None):
worker, self.logging, self.log_directory
),
)
if self.rmm_track_allocations:
shwina marked this conversation as resolved.
Show resolved Hide resolved
mr = rmm.mr.get_current_device_resource()
rmm.mr.set_current_device_resource(rmm.mr.TrackingResourceAdaptor(mr))


def unpack_bitmask(x, mask_bits=64):
Expand Down