Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Efficient scalable shuffle - P2P shuffle extension #7507

Open
fjetter opened this issue Jan 27, 2023 · 33 comments
Open

Efficient scalable shuffle - P2P shuffle extension #7507

fjetter opened this issue Jan 27, 2023 · 33 comments

Comments

@fjetter
Copy link
Member

fjetter commented Jan 27, 2023

Motivation

Shuffles are an integral part of many distributed data manipulation algorithms. Common DataFrame operations relying on shuffling include sort, merge, set_index, or various groupby operations (e.g. groupby().apply(), groupby(split_out>1)) whereas the most stereotypical array workload is the rechunk. There are many other applications for an efficient shuffle implementation which justifies taking a dedicated approach to solve this issue.

Shuffling is a poor fit for centralized graph-based scheduling, since the graph is all-to-all (naive O(N²), dask O(N logN); in size where N is the number of partitions), yet the core logic of a shuffle is so simple, it benefits little from centralized coordination, while suffering significant overhead from it. With task-based shuffles, the amount of data we can shuffle effectively (before workers run out of memory, or the scheduler crashes or bottlenecks) is severely limited. Allowing workers to autonomously exchange data with their peers and manage disk and memory usage in a more granular way allows us to push that limit significantly higher.

See https://coiled.io/blog/better-shuffling-in-dask-a-proof-of-concept/ for more background.

This issue tracks the current implementation progress and highlights various milestones. We intend to update the top-level description of this issue continuously such that this issue can serve as an always up-to-date overview of the current efforts.

Goals

  • Can reliably shuffle orders-of-magnitude larger datasets (in total size and number of partitions) than the current task-based shuffle
  • Can shuffle larger-than-memory datasets by spilling to disk
  • Constant, predictable memory footprint per worker, which scales linearly with partition size, not total number of partitions
  • Just works, without users needing to tune parameters (buffer sizes, etc.)
  • Graceful restarting when possible, and quick failure when not
  • All state is cleaned up on success, failure, or cancellation
  • Shuffle performance is IO-bound (network, disk)
  • Resilience to worker failures via restart of computation

Roadmap

1 - Foundations and dask.DataFrame ✅

The implementation effort so far focused on creating a stable foundation for the things to come and is deriving from the early prototype. This stage mostly focused on a consistent concurrency model that supports off-band, direct peer to peer communication between workers and integrates well with the existing task based scheduling logic.

This was developed at the example of a DataFrame based shuffle and we consider this now ready to use!

For detailed instructions, known issues and feedback, please see #7509. We encourage all users of dask.DataFrame to try this and report with feedback.

2 - dask.Array rechunking

The new shuffle extension is currently build to handle pandas DataFrames and is using pyarrow behind the scenes. It's architecture is built with generic types in mind and will be suited just as well for array workloads. One of the most popular many-to-many problems is the array rechunking which we will implement next using this extension.

Basic functionality is being set up in #7534

This approach already provides constant time array rechunking but sometimes falls short in terms of walltime performance compared to old style task based shuffling.

3 - Misc

This next stage is not as refined as the intial ones. There are many smaller to medium sized issues that will either expand adoption of the P2P algorithm or make it run faster and smoother. This section will become more refined over time.

@fjetter fjetter pinned this issue Jan 27, 2023
This was referenced Feb 23, 2023
@dcherian
Copy link

dcherian commented Feb 24, 2023

Hello, this is very cool to see, and I'm excited to see it being applied to arrays!

In my Xarray-centric experience the shuffle is usually expressed as a out-of-order indexing of a dask.Array (for e.g. index out all elements belonging to a group in a groupby operation; and present that to the user as a new array). See this comment and below for some discussion. Currently this just results in an explosion of tasks, and is only workable for a limited set of problems.

For built-in reductions, we can use a dask.array.reduction style approach (see flox)but we still need this kind of shuffling for GroupBy.map much like a dataframe.

Is it feasible for the indexing/slicing code to use the shuffle for such cases.

@fjetter
Copy link
Member Author

fjetter commented Feb 27, 2023

Is it feasible for the indexing/slicing code to use the shuffle for such cases.

So far, we implemented this only for the API call arr.rechunk and haven't considered more generic slicing. However, the biggest lift was not the rechunk API but rather figuring out how to chop up the array and piece it back together properly but that's basically slicing :)

We are currently generating a plan (this all is subject to change) that calculates the "target chunk" for every slice of an input chunk

def rechunk_slicing(
old: ChunkedAxes, new: ChunkedAxes
) -> dict[NIndex, list[tuple[ShardID, NSlice]]]:
"""Calculate how to slice the old chunks to create the new chunks
Returns
-------
Mapping of each old chunk to a list of tuples defining where each slice
of the old chunk belongs. Each tuple consists of the index
of the new chunk, the index of the slice within the composition of slices
creating the new chunk, and the slice to be applied to the old chunk.
"""
from dask.array.rechunk import intersect_chunks
ndim = len(old)
intersections = intersect_chunks(old, new)
new_indices = product(*(range(len(c)) for c in new))
slicing = defaultdict(list)
for new_index, new_chunk in zip(new_indices, intersections):
sub_shape = [len({slice[dim][0] for slice in new_chunk}) for dim in range(ndim)]
shard_indices = product(*(range(dim) for dim in sub_shape))
for shard_index, sliced_chunk in zip(shard_indices, new_chunk):
old_index, nslice = zip(*sliced_chunk)
slicing[old_index].append((ShardID(new_index, shard_index), nslice))
return slicing

effectively this is just a mapping like old_chunk_ix -> list of (new_chunk_ix, slice), i.e. if you can express an operating like this we can use this P2P backend.

The only difference between rechunk and generic slicing (IIUC) is that rechunk typically slices contiguously. The extension currently doesn't care (or optimize for) this so I am pretty confident we can express generic indexing as well using this. There are very likely cases where the old style indexing is still better so a challenge will be figuring out which algorithm to choose.

@dcherian is arr.rechunk also useful for you or will you only be able to try this once we implement indexing/slicing?

@dcherian
Copy link

dcherian commented Feb 27, 2023

how to chop up the array and piece it back together properly but that's basically slicing :)

That's what I was thinking ;)

@dcherian is arr.rechunk also useful for you or will you only be able to try this once we implement indexing/slicing?

Yes it is useful but I'll have to make up a use-case. It has been such a problem for so long, we try to avoid it as much as possible, or use rechunker to create a second copy of the dataset.

The other common place this bites us is arr.reshape (or stack with xarray) Example: dask/dask#5544 (EDIT: there are so many issues pinging this one!)

cc @TomNicholas @jbusecke @rabernat

@mrocklin
Copy link
Member

I think that the right approach here is to start to generalize out the shuffle system to the point where a sharp and adventurous developer, like @dcherian , could try implementing his own operation. This will also likely come up with map_overlap with the imaging people.

My sense is that there is likely some abstraction that could be designed similar to Aggregation but more complex that could enable folks outside of @hendrikmakait and @fjetter to engage effectively here.

@fjetter fjetter unpinned this issue Mar 23, 2023
@TomNicholas
Copy link

This looks very exciting! I have a test problem I would like to try it out on. What's the easiest way for me to do that? Use Coiled with distributed >2023.3.0?

@hendrikmakait
Copy link
Member

hendrikmakait commented Jun 12, 2023

@TomNicholas: Yes, using distributed>=2023.3.1 with a Coiled cluster should be the easiest way for you to go about that, though I suggest using distributed>=2023.5.0. See the instructions on the Pangeo Discourse (https://discourse.pangeo.io/t/rechunking-large-data-at-constant-memory-in-dask-experimental/3266/2) on how to make sure you're using p2p rechunking. Just note that disabling fusion is not necessary anymore if you're using distributed>=2023.5.0.

On a related note, I'm currently improving the P2P rechunking implementation (distributed#7897). I still have to create a range of benchmarks to test these changes and a decision heuristic between tasks and p2p rechunking (dask/dask#10226), so I might add your example to the list.

Please let me know if you have more common rechunking workloads you care about. Creating an issue on https://github.com/coiled/benchmarks/issues might be the best way to do that.

@dcherian
Copy link

dcherian commented Jun 21, 2023

I'm not sure why P2P rechunking helps with Tom's test problem. Can someone explain that?

ms = MemorySampler()

ds = xr.Dataset(
    dict(
        anom_u=(["time", "face", "j", "i"], da.random.random((5000, 1, 987, 1920), chunks=(10, 1, -1, -1))),
        anom_v=(["time", "face", "j", "i"], da.random.random((5000, 1, 987, 1920), chunks=(10, 1, -1, -1))),
    )
)

quad = ds**2
quad["uv"] = ds.anom_u * ds.anom_v
mean = quad.mean("time")

with ms.sample():
    mean.compute()

@hendrikmakait
Copy link
Member

@dcherian: I don't think it does. I've tested the problem recently, and it appears to lack rechunking of any sort. Hence P2P rechunking won't help. @TomNicholas: Do you have more information on this? Where would you have expected it to help?

@TomNicholas
Copy link

Okay hmmm - in that case there must be some other change that's caused me to see a totally different performance between running an old version of distributed locally and the new version with P2P on Coiled. I'll try both on Coiled instead now to eliminate possible causes.

@TomNicholas
Copy link

TomNicholas commented Jun 21, 2023

Okay I take it back, 'array.rechunk.method': 'tasks' and 'array.rechunk.method': 'p2p' behaved indistinguishably on this problem when I tried both with distributed main via Coiled.

@fjetter
Copy link
Member Author

fjetter commented Jun 22, 2023

Okay I take it back, 'array.rechunk.method': 'tasks' and 'array.rechunk.method': 'p2p' behaved indistinguishably on this problem when I tried both with distributed main via Coiled.

If you ran this on Coiled, can you provide us with the cluster ID?

@TomNicholas
Copy link

If you ran this on Coiled, can you provide us with the cluster ID?

232904 for without P2P and 232905 for with P2P.

@fjetter
Copy link
Member Author

fjetter commented Jun 22, 2023

These two runs both look pretty much the same. Were you able to reproduce the totally different performance you mentioned here?

@TomNicholas
Copy link

TomNicholas commented Jun 22, 2023

The performance I originally saw when I ran with a local cluster without p2p looked like this.

image

I'm wondering now whether I actually somehow accidentally switched from measuring total cluster memory usage of all types (including spilled) to just measuring process memory when I moved to Coiled. That would explain the difference in the shape of the graphs.

Now on Coiled I get this plot, where the red is spilling, which is consistent with my first result.

Screenshot from 2023-06-22 10-51-58

EDIT: Actually I now realise what happened - I had set 'distributed.worker.memory.spill': False when I originally ran it locally (to force a crash on the large run I expected to fail instead of having out interminably). My bad 🤦

@tasansal
Copy link

Today I tried the P2P chunking to rechunk a 3D array (chunked at 128x128x128) into an optimized, directional (1x1024x1024) variant. The input array is Zarr and the output is also Zarr.

On array-based P2P chunking, I noticed the shuffle barrier is the primary source of the bottleneck (i.e. seems like it makes a copy of the entire array on disk? I saw huge disk write usage while it wasn't writing anything to the target array.) It starts

Am I doing something incorrectly, or are my expectations wrong? I would expect (N, 1024, 1024) sized pieces to be written out whenever they're ready and freed from temporary cache. The memory usage is definitely consistent, but it doesn't flush anything to the new Zarr array until the whole dataset is read. Any suggestions on the best way to diagnose what is going on?

image

@fjetter
Copy link
Member Author

fjetter commented Aug 15, 2023

@tasansal What you are describing is indeed what is expected. Before we can write out any of the output chunks we have to read the entire dataset in, otherwise the output chunk may be incomplete before we write it out.

@mrocklin
Copy link
Member

That's probably a case where task-based rechunking is better. In general my guess is that we'll probably prefer task-based rechunking when the number of stages in the plan is minimal, and prefer p2p-based rechunking when there are more stages in the plan. Those cases have full copies regardless.

@tasansal
Copy link

tasansal commented Aug 15, 2023

@fjetter I understand. However, that would reduce the utility of P2P shuffling for many tasks, no? Is it possible to have multiple shuffle barriers based on the chunking? What I mean is:

If I have an array that is shaped (128, 128, 128) and chunked at (16, 16, 16) and if I want to re-chunk it to (4, 64, 64); the problem can be broken down to make the computation to use the disk for (16, 64, 64) pieces at a time. Or ideally, some logic like Rechunker Algorithm. You may ask, why not just use rechunker; I have used it in the past and it has some quirks and because it would be much better if Dask does it for us in a bigger, maintained codebase 😃

@mrocklin That Makes sense. Is there a way to limit/adjust the memory usage of task-based re-chunking? I have tried it with the new worker-saturation config and inlining the Zarr arrays but it still does use much more memory than expected and spills.

Also, probably out of scope, but caching the full float32 array on disk doesn't work well with compressed formats like Zarr. It can potentially take 2-10x more disk space than users expect. Should this be documented? Or could the cache use compression?

@hendrikmakait
Copy link
Member

@fjetter I understand. However, that would reduce the utility of P2P shuffling for many tasks, no?

The benefit is indeed situational; that's also why we haven't made it the default (as opposed to P2P shuffles for dataframes). Finding a good heuristic has also proven surprisingly hard and depends on the underlying hardware (dask/dask#10226).

Is it possible to have multiple shuffle barriers based on the chunking?

We've considered the possibility of using multiple barriers, but it is not a high priority. The way P2P is currently built, it would still force you to roundtrip all data through disk even if it fits into memory. So before solving the algorithmic problem, we need to fix this one. Otherwise, you likely wouldn't benefit much from the speedups because rechunking would still be disk-bound. #7618 is a first step into that direction. All that being said, let us know if you would like to get involved in the efforts of improving P2P rechunking. There's much to be done :)

Also, probably out of scope, but caching the full float32 array on disk doesn't work well with compressed formats like Zarr. It can potentially take 2-10x more disk space than users expect. Should this be documented? Or could the cache use compression?

Given the current append-strategy to the files on disk, I doubt that we would gain much benefit from disk. Documenting this might be a good idea, would you be interested in creating a PR for this?

@TomNicholas
Copy link

Or ideally, some logic like Rechunker Algorithm.

Is there a way to limit/adjust the memory usage of task-based re-chunking?

@tasansal if you're not already aware you might be interested in cubed, which is a generalisation of rechunker (and ships a version of rechunker within it).

@mrocklin
Copy link
Member

mrocklin commented Sep 1, 2023

@mrocklin That Makes sense. Is there a way to limit/adjust the memory usage of task-based re-chunking? I have tried it with the new worker-saturation config and inlining the Zarr arrays but it still does use much more memory than expected and spills.

@tasansal can I ask you for an example problem that people can look at? Is something like the following representative of your problem?

import dask.array as da

x = da.random.random((2000, 2000, 2000), chunks=(128, 128, 128))
x.rechunk(("auto", 1000, 1000)).to_zarr(...)

@mrocklin
Copy link
Member

mrocklin commented Sep 1, 2023

I'm also curious, what is actually stopping you from using this today? Is it slow performance? (If so, I'm curious what you're using today that has better performance). Is it running out of memory? (that seems unlikely given the p2p solution). Is it running out of disk?

What I'm hearing now is "this could be better" and I entirely agree with that. I'm also curious on what specifically is causing you pain, and also what, if anything, makes it insufficient.

@mrocklin
Copy link
Member

mrocklin commented Sep 1, 2023

I ran this both locally and on cloud and I think I'm hitting hardware performance. Video here in case people are intereted.

Probably my version of the problem isn't representative of what you're running into @tasansal . If you can help by providing a representative example that would be really useful.

Notebook here

@hansmohrmann
Copy link

Hi all, I'm having the same issue. A small way to reproduce this is

import dask
import dask.array as dsa
dask.config.set({"array.rechunk.method": "p2p", "optimization.fuse.active": False})
shape = (1500, 700, 720*20)
chunks = (1500, 700, 720)
data = dsa.random.random(shape, chunks=chunks)

data_rc = data.rechunk((1500, 700, 300))
data_rc.compute() # or save to disk

The real-world example that this is trying to imitate is opening 20 netcdfs (monthly files of hourly data) and rechunking to fixed length-300 chunks (then saving to zarr). The resulting dask graph using p2p has the same shuffle-block that lease to memory spill and eventual crash.
image

Context for why I was trying p2p in the first place: the non-p2p dask graph for this task has a different failure where the order in which rechunk-splits and merges are shuffled for reasons I don't understand. Using the toy code above, disabling p2p, I get what I expect, a very neat dask graph with a well-optimized task order.
image

However in my real-world example, I instead get an unexpected shuffling of task ordering (represented by the diagonals below) that result in tasks being held in memory much longer than would be necessary under the more optimal ordering that the toy example results in.
image

I can't reproduce this exactly without the data but it's just a bunch of standard netcdfs:

    files = glob.glob(args.data_location)
    ds = xr.open_mfdataset(files[:20], inline_array=True)
    ds = ds.chunk({"latitude": -1, "latitude": -1, "time": 300})
    ds.to_zarr('/data/dummy.zarr')

@mrocklin
Copy link
Member

mrocklin commented Sep 1, 2023

Interesting, I don't suppose there's a way to reproduce this, even if in a non-minimal way?

My sense is that for rechunking that isn't all-to-all (like (1000, 1) to (1, 1000)) the the task based mechanism will be near-optimal. If there is some ordering issue then we should try to hunt that down.

@hansmohrmann
Copy link

annoyingly, my attempt at a reproducer with public data has led to the graph I expect :/

import s3fs
import xarray as xr
from dask.distributed import Client, LocalCluster
local_dir = '/data/era5-test/'
s3 = s3fs.S3FileSystem(anon=False)
for f in s3.glob('s3://era5-pds/1980/*/data/air_temperature_at_2_metres.nc'):
    savename = local_dir + f[9:]
    s3.download(f, savename)
cluster = LocalCluster(n_workers=2, threads_per_worker=1, memory_limit='28GB')
client = Client(cluster)
ds = xr.open_mfdataset(local_dir+'*/*/data/*.nc', inline_array=True)
ds = ds.chunk({'lat':-1,'lon':-1,'time0':300})
ds.air_temperature_at_2_metres.data.visualize(optimize_graph=True, color='order')
ds.to_zarr(local_dir+'output.zarr')

image

@hansmohrmann
Copy link

hansmohrmann commented Sep 1, 2023

Ah, managed to reproduce with just adding a few more files. Also I think I'm hijacking @fjetter's thread here, so lmk if I should just open a new issue.

Code is as follows (takes ~5 minutes to download if depending on zone, requires ~20 GiB of disk space). Interesting that this only shows up when I have 24 files instead of the 12 above.

import s3fs
import xarray as xr
from dask.distributed import Client, LocalCluster
local_dir = '/data/era5-test/'
cluster = LocalCluster(n_workers=2, threads_per_worker=1, memory_limit='28GB')
client = Client(cluster)
s3 = s3fs.S3FileSystem(anon=False)
for year in [1980,1981]:
    for f in s3.glob(f's3://era5-pds/{year}/*/data/air_temperature_at_2_metres.nc'):
        savename = local_dir + f[9:]
        s3.download(f, savename)
ds = xr.open_mfdataset(local_dir+'*/*/data/*.nc', inline_array=True)
ds = ds.chunk({'lat':-1,'lon':-1,'time0':300})
ds.air_temperature_at_2_metres.data.visualize(optimize_graph=True, color='order')
ds.to_zarr(local_dir+'output.zarr')

image
So here you can see those funky parallels again
The rechunk-merge bottom center that's held in memory is a consequence of the funky task ordering here (represented by those diagonals); do this for 500 files and those aggregate to crash this otherwise straightforward job.

@mrocklin
Copy link
Member

mrocklin commented Sep 1, 2023

ds.air_temperature_at_2_metres.data.visualize(optimize_graph=True, color='order')

Can you share the result of this call? You'll need to include a filename:

ds.air_temperature_at_2_metres.data.visualize("myfile.png", optimize_graph=True, color='order')

The .visualize method gives a much more informative/careful visualization than what the Bokeh chart does (the bokeh chart has to update every 200ms, so doesn't try as hard).

@hansmohrmann
Copy link

Ah, sorry, was running in a notebook so that bit was interactive. Here is the resulting chart:
image

@mrocklin
Copy link
Member

mrocklin commented Sep 1, 2023

🤔 someone else should take a look at that, but it's not immediately clear to me that that's a fail case just yet. Looking at the numbers, it seems like maybe the graphviz/dot layout is a bit off?

There are clearly lines swooping around that look off, but when you check the numbers and colors it seems like Graphviz has placed things a oddly anyway (everyone uses heuristics to this problem).

Are you able to make this look more obviously bad by adding more files?

@hansmohrmann
Copy link

output

The swooping lines in the visualize() figure aren't clear issues, agree that they're mostly just a quirk of plotting. Some issues that I do see: node 200 (bottom, middle right) should be running before 186, which should run before 179; I think this might be whatever led to the allowed failing of this test. I'm wondering if it's in conflict with the desired ordering specified in this test?

@hansmohrmann
Copy link

OK, a clean example; era5 sample data downloaded as above (2 years worth).

    files = glob.glob('/data/era5-test/*/*/data/*.nc')
    ds = xr.open_mfdataset(files, inline_array=True)
    ds = ds.chunk({"lat": -1, "lon": -1, "time0": 300})
    s3 = s3fs.S3FileSystem(anon=False)
    store = s3fs.S3Map(root=str(s3_zarr_store_path), s3=s3, check=False)
    encoding = {vname: {"compressor": zarr.Blosc(cname='zstd', clevel=3)} for vname in ds.data_vars}
    zarr_data_store_out = ds.to_zarr(store=store, encoding=encoding, consolidated=True, compute=False)
    zarr_data_store_out.visualize('/home/ubuntu/gro/vis_graph_full.png', optimize_graph=True, color='order')
    zarr_data_store_out.compute()

bokeh plot after a few minutes; notice the held rechunk-splits (and there are a lot more of these when scaling up 20x)
image

matching .visualize() plot:
vis_graph_full

I'm having a bit of a hard time aligning these two. The 0 node (representing what I understand to be a task resulting from the first open_dataset) shares dependent nodes with the 3 node and the 15 node; by 'root' node ordering, those should be 2nd and 4th 'root' nodes processed (0, 3, 9, 15). Yet the bokeh plot shows that the first 'root' node (lowest on the left; the left column here processed monotonically bottom to top) shares a dependent node with the 10th 'root' node (left column, 10 from the top). So where am I misunderstanding the relationship between the static plot and the runtime dask graph?

@mrocklin
Copy link
Member

mrocklin commented Sep 5, 2023

Just checking in here, I've gotten pulled away this week. I think that @fjetter plans to think about this soon though. It's near the top (but not yet at the top) of his priority list.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants