Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Smarter portable defaults for CPU<>GPU dask interop #540

Open
lmeyerov opened this issue Feb 23, 2021 · 10 comments
Open

[ENH] Smarter portable defaults for CPU<>GPU dask interop #540

lmeyerov opened this issue Feb 23, 2021 · 10 comments

Comments

@lmeyerov
Copy link

lmeyerov commented Feb 23, 2021

This got complicated enough that wanted to move from Slack --

Configuring and using dask cpu with dask gpu has been confusing, and is a reality both wrt hw (GPU hw is generally balanced with CPU core counts) and sw (a lot of code isn't GPU, despite our efforts ;-) ). It'd be good to get clarity in docs + defaults here:

Automating nthreads for cpu vs. gpu

We want a simple and portable config for our typical GPU case of launching on a single node with 1-8 GPUs and 4-128 cores, and write size-agnostic dask + dask-cuda code over it. That means never hard-coding the above in the config nor app and still getting reasonable resource utilization. I suspect this is true of most user. It'd also be good to predictably override this. I can't speak to multi-node clusters nor heterogenous ones.

  • Ideally, Psuedo-coding, dask-scheduler 123 & dask-cuda-worker dask-scheduler:123 & should "just work"

    • Or explicit recommendation to do dask-scheduler 123 & dask-cuda-worker dask-scheduler:123 & dask-worker dask-scheduler:123
  • Code should just work (~balanced) for things like dgdf = ddf.map_partitions(cudf.to_pandas), and the reverse

  • Users can optionally override the defaults at the level of worker config or tasks

"Just working" means:

  • number of GPU workers (threads or processes) should default to ~= # visible cuda devices <- current dask-cuda-worker behavior

  • number of CPU workers (threads) should default to ~= # cores < dask-worker's default, but not dask-cuda-worker's

  • Behind the scenes, if this turns into an equiv of dask-cuda-worker-existing && dask-worker, or that's the recommended invocation, so be it. However, I am a bit worried there may be unforeseen buggy / awkward behavior here, like having to use Dataset Publish instead map_partitions for the interop

Automating resource tagging

Separately, here's experimental discussion of using logical resources & annotations, though PyTorch experiences may not carry over to Dask-CPU <> Dask-CUDF. One idea is autotag GPU/CPU workers with # Physical vs # Logical units, and letting app devs use those.

Ex:

8 x A100 $ dask-cuda-worker 
# => {'GPU_NODES': 1, 'GPUS_PHYSICAL': 8, 'GPUS_LOGICAL': 8000}
#       'CPU_NODES': 1, 'CPUS_PHYSICAL': 128, 'CPUS_LOGICAL': 128000}

From there, code can use annotations based on hard-coded physical or more scale-free / agnostic logical styles

@quasiben
Copy link
Member

@lmeyerov thanks for moving the discussion here. If you have time, I think devs would also appreciate some motivating use cases or your thoughts on where mixing GPU/CPU workers is applicable. Pseudo would also be good

@lmeyerov
Copy link
Author

lmeyerov commented Feb 25, 2021

Some examples we see:

  1. Data ingestion: We get some annoying formats like excel files and more niche ones that we do dask jobs to turn into dataframes, and then send to dask-cudf for actual use

  2. Data cleaning: We do some cleaning/enrichment tasks that end up being cpu sometimes, and can be tricky to turn into dgdf b/c the whole point is to infer the metadata, and dgdf assumes it is known

  3. RAPIDS won't do sparse data for awhile, so we need to do dask cpu for those steps

  4. We basically do interactive data dashboards, so viewers will load data of different size and every interaction kicks off pipelines of tasks on them (filter, histogram, ML, ...). We're now switching to default-dask just to keep the web server responsive. Tasks can technically be sized for pandas vs cudf vs dask-cudf, but that can be a lot of work, so we often just run as dask-cudf with 1 partition, and maybe the kernel is cudf. (We mentally put bsql in the dask bucket of writing any-scale).

Maybe also helpful are some code patterns:

  • We have a pretty gnarly and somewhat buggy table_to_dgdf :: Client * DataFrameLike -> DaskCudfDataFrame convertor that handles some funny cases:

    • usual: pandas, arrow
    • blazingsql: needs a sync client, while our app is written async
    • dask: when we used LocalClusters, this was getting confused (maybe also b/c bsql) so we had to roundtrip through Shared Data, though I think that went away with a shared remote dask-cuda-worker
  • we run some sort of asyncio web framework, so pretty early in a handling a response, we generate a Client for the authorized user (1-10ms?). Some reason bsql contexts are even heavier to start so we experimented with caching those. We actually carry around a gpu_client and cpu_client as part of the context as we thought we needed separate ones, but may be getting rid of that.

  • a lot of our requests are bursty and cacheable, so we had some basic LRU logic for cudf. we're investigating a good pattern for that for dask-cudf. imagine loading a single dashboard that makes some histograms over the same overall data

    • /dataset/123/column/345/histogram
    • /dataset/123/column/345/histogram?filter=...
    • /dataset/123/column/678/histogram

    This is currently awkward in dask in general. If memory is idling, we should cache, and allow it to be forced out. The policy is probably LRU. For sanity, ok for (us) to auto-expire every 10min. dask has the combo of 'persist' -> 'publishing' for saving between requests, and cudf has universal memory.... but still need to figure out a basic LRU on top. In cudf, we risk not spilling and just decorate those requests with @lru... but time to do it right.

@quasiben
Copy link
Member

I don't have recommendations yet (first try attempting this). Naively I setup a cluster in the following manner:

Scheduler

dask-scheduler

CPU Workers

dask-worker tcp://...:8786 --nprocs 1 --resources CPU=1.0 --name cpu-worker

GPU Workers

dask-cuda-worker tcp://...:8786 --resources GPU=1 --name gpu-worker

Client

import dask
import cudf
from dask.distributed import Client, wait

client = Client('localhost:8786')

with dask.annotate(resources={'CPU': 1}):
    ddf = dask.datasets.timeseries()
    ddf = ddf.persist()
    wait(ddf)

assert isinstance(ddf, dask.dataframe.core.DataFrame)
with dask.annotate(resources={'GPU': 1}):
    cdf = ddf.map_partitions(cudf.DataFrame)
    cdf = cdf.persist()

assert isinstance(cdf, dask_cudf.core.DataFrame)

The above worked but I can see this being brittle in that missing an annotation can to lead to problems. I agree that automatic resource tagging for dask-cuda-workers could make sense especially in these scenarios. For dask-workers, I don't think we can ask to auto tag workers.

Are you asking for LocalCUDACluster to optionally spin up additional CPU workers with tags ? I could see this being very convenient but also highly prone mistakes. Probably need to think about this more (cc @pentschev )

I'm also curious what you mean by:

RAPIDS won't do sparse data for awhile, so we need to do dask cpu for those steps

Do you mean sparse data frames or sparse arrays ? If the latter, CuPy has grown some what recently to meet these needs

@lmeyerov
Copy link
Author

lmeyerov commented Feb 27, 2021

Thanks. I was starting to get to that conclusion as the current happy path based on the slack conv. Doing some reworking here to be closer to that, I'm guessing it'll take a few weeks to see how this phase ends up. Been a lot of surprises up to here (ex: diff exn's mixing diff clients to same worker), and a sane dask Resource model that's concurrency friendly.

Re:sparse, sparse Series. Typically dense when indexed by another col. Ex: df['stick'] might be sparse, while df[ df['type'] == 'car' ]['stick'] is dense. Related but a bit more there, esp. when doing ML, we're starting to see many-column datasets. But that's all for another ticket..

Will share feedback as it gets clearer.

@lmeyerov
Copy link
Author

lmeyerov commented Feb 28, 2021

Interim update: blazingsql context creation is getting wedged when combining dask-worker --resources CPU=1 and dask-cuda-worker --resources GPU=1 (despite the happy case of single node / single gpu / sequential test). Will help bsql folks track down and once that's unblocked, return here..

@chrisroat
Copy link

I am also curious if this could interact with cluster scaling (and things like dask-gateway). Could each type of worker need to be tracked by some key and scaled independently?

(In my case, I have a workflow with "big disk", "gpu" and "cpu" workers.)

@github-actions
Copy link

This issue has been labeled inactive-30d due to no recent activity in the past 30 days. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed. This issue will be labeled inactive-90d if there is no activity in the next 60 days.

@quasiben
Copy link
Member

quasiben commented Jun 2, 2021

Hi folks, you might be interested in @madsbk recent work dask/distributed#4869 allowing workers to have multiple executors

@github-actions
Copy link

This issue has been labeled inactive-90d due to no recent activity in the past 90 days. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed.

@github-actions
Copy link

This issue has been labeled inactive-30d due to no recent activity in the past 30 days. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed. This issue will be labeled inactive-90d if there is no activity in the next 60 days.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: No status
Development

No branches or pull requests

3 participants