Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Heterogeneous Computing Design #5201

Open
madsbk opened this issue Aug 11, 2021 · 2 comments
Open

Heterogeneous Computing Design #5201

madsbk opened this issue Aug 11, 2021 · 2 comments

Comments

@madsbk
Copy link
Contributor

madsbk commented Aug 11, 2021

I would like to discuss how we can make it easier to utilize a cluster of mixed architectures -- focusing on mixing GPU and CPU tasks/workers.

Background

Currently, a common setup is to have one Worker per GPU and nothing else. This is how a typical Dask-CUDA cluster looks like and is a simple setup that works reasonably well. It makes Distributed handle memory transfer between GPUs seamlessly and GPU tasks are not accidentally scheduled on machines without GPUs.

However, it also means that CPU-tasks are scheduled on GPU-workers and potential CPU-cores on the machines are unutilized. E.g., a DGX-1 has 8 GPUs and 80 CPU-cores thus most of the time the 80 CPU-cores are idling.

Distributed supports resource management, which makes it possible to restrict tasks to specific types of Workers. However, it requires manually annotating operations and is very hard to do for individual operations efficiently. It is easy enough to annotate high-level operations like Dask collectives but if you want to annotate each chunk individually it is hard. Additionally, it is not possible to annotate tasks based on task outputs dynamically.

Currently mixing different datatypes in Dask collectives are not possible. E.g., if you have a Dask dataframe each chunk can be a cuDF or a Pandas dataframe but not a mix of the two. This limits the use of heterogeneous computing since it forces the user to decide between either a pure GPU or a pure CPU dataframe computations.

Goals

  • Utilize both available CPUs and GPUs.
  • Avoid CPU-tasks taking up GPU slots if CPU slots are available on the Worker.
  • Avoid scheduling GPU-tasks on Workers without GPUs.
  • Make it possible for systems like rebalancing and Active Memory Management Control to reason about GPU tasks and memory.
  • Mixing cuDF and Pandas in a Dask dataframe or mixing Numpy and CuPy in a Dask array.
  • Enable computation on spilled data by converting GPU objects to CPU objects instead of spilling. E.g., spilling of a cuDF dataframe converts it to a Pandas dataframe.

I can think of three projects that could achieve the goals.

Dynamic Annotations and Restrictions

Make it possible for a Worker to update the annotation and restriction of a task based on task output. The user specifies a function that the worker calls after executing a task. The function returns a dict of annotations and restrictions (if not None), which the Worker send back to the Scheduler as part of the "op": "task-finished" message. The Scheduler then updates the task and its dependent tasks with the updated annotations and restrictions.

In order to implement Detect GPU tasks as proposed by @mrocklin and restrict GPU-tasks to GPU-Workers, a function could look something like the following, which makes the task’s dependent use the Worker’s default GPU executor:

def annotate_gpu_tasks(ts: TaskState, value:object):
    from dask_cuda.is_device_object import is_device_object
    annotations = {}
    restrictions = {}
    if is_device_object(value):
        annotations = {"executor": "gpu"}
        restrictions = {"GPU": 1}
    return (annotations, restrictions)

Mixed Typed Collectives

Make it possible to create Dask collectives with mixed typed underlying objects. This works in many cases already! E.g., calling map_partitions() with a function that returns a cuDF or a Pandas dataframe based on the partition_info argument works. The only operation that I have found to fail is concat(), which takes multiple chunks of input. We could implement concat() by first concat the cuDF dataframes, then the Pandas dataframes, and finally convert everything to cuDF dataframes before concatenating the result.

I have experimented with this approach, and it works like a charm, but we properly need a more generic design that makes use of type dispatching and are extendable like all the other backend functions.

Spilling by Conversion

Make it possible for extensions like Dask-CUDA's DeviceHostFile to annotate tasks. This way when DeviceHostFile spills GPU memory by converting to Pandas, it can inform the Worker and the Scheduler that the task and its dependents should be handled as a CPU task. And the other way around when un-spilling.

cc. @dask/gpu

@madsbk madsbk mentioned this issue Aug 12, 2021
2 tasks
@mrocklin
Copy link
Member

Thank you for writing this up @madsbk . It's a good read. I apologize that it took me a couple of days to get to this.

Regarding dynamic annotations I agree that this seems like a good idea. I've done similar things actually in custom code.

I suggest a slight variation to your approach. Rather than include this information as part of the message for task-finished, let's separate this out to a separate worker handler. So in stead of sending one message with many fields

{"op": "task-finished", ..., "annotations": ..., "restrictions": ...}

We might send a few different messages

{"op": "annotate-task", "key": ..., "annotations": ...}
{"op": "task-finished", ...}

The batched comms will guarantee ordering, and these will almost certainly be sent in the same batch, so I think that this will have the same performance characteristics, but reduce complexity / separate things out a little. As a nice side effect we can set annotations and restrictions and other things outside of the context of a specific task finishing.

Thoughts?

Spilling by Conversion

I'll admit to being less personally excited by the idea of changing the type of stored data on the fly, but I can see that it might be helpful. I think that the separated design above might make this easier to implement.

@madsbk
Copy link
Contributor Author

madsbk commented Aug 16, 2021

I suggest a slight variation to your approach. Rather than include this information as part of the message for task-finished, let's separate this out to a separate worker handler. So in stead of sending one message with many fields

Good point, #5207 now introduces a dedicated "annotate-task" operation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants