Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spill to disk may cause data duplication #3756

Closed
fjetter opened this issue Apr 30, 2020 · 13 comments · Fixed by dask/zict#65 or #5936
Closed

Spill to disk may cause data duplication #3756

fjetter opened this issue Apr 30, 2020 · 13 comments · Fixed by dask/zict#65 or #5936
Assignees
Labels

Comments

@fjetter
Copy link
Member

fjetter commented Apr 30, 2020

In aggressive spill-to-disk scenarios I observed that distributed may spill all the data it has in memory while still complaining with the following message that there is no more data to spill

Memory use is high but worker has no data "
"to store to disk. Perhaps some other process "
"is leaking memory?
Side note: In our setup every worker runs in an isolated container so the chance of another process interfering with it is virtually zero.

It is true that these workers do indeed still hold on to a lot of memory without it being very transparent about where the data is. GC is useless, i.e. it is not related to dask/zict#19

Investigating this issue let me realise that we may try to spill data which is actually currently in use.
The most common example for this is probably once the worker collected the dependencies and schedules the execution of the task, see here. Another one would be when data is requested form a worker and we're still serializing/submitting it.
I could prove this assumption by patching the worker code, tracking the keys in-execution/in-spilling with some logging and it turns out that for some jobs I hold on to multiple GBs of memory although it was supposedly already spilled.

If we spill data which is currently still in use, this is not only misleading to the user, since the data is still in memory, but it may also cause heavy data duplication if the spilled-but-still-in-memory dependency is requested by another worker since the original data is still in memory but the buffer would fetch the key from the slow store and materialise it a second time since it doesn't know of the original object anymore. If this piece of data is requested by more than one worker, the data could be duplicated multiple times even.

In non-aggressive spill-to-disk scenarios we should be protected from this by the LRU in the buffer but if memory pressure is high, the spilling might actually worsen the situation in these cases.

My practical approach to this would be to introduce something like a (un)lock_key_in_store method to the buffer which protects the key from spilling and manually (un)setting this in the distributed code. If there is a smarter approach, I'd be glad to hear about it.

Also, If my reasoning is somewhere flawed, I'd appreciate feedback since, so far, I could only prove that we try to spill data currently in use but, so far, the duplication is just theory.

Related issues:
dask/dask#2456

@mrocklin
Copy link
Member

I think that your understanding that we can both spill and hold onto data is correct. I would expect this inconsistency to be short-lived though, only as long as the task execution or data communication.

Given the LRU policy, I would expect that this inconsistency would not happen until all other data had been spilled. At this point my guess is that the impact of this inconsistency is usually small (the workload was probably going to run out of memory anyway) but that's just a guess. It looks like you have more hands-on experience here.

@fjetter
Copy link
Member Author

fjetter commented Apr 30, 2020

I would expect this inconsistency to be short-lived though, only as long as the task execution or data communication.

I would also think that it is short lived. I don't think we actually hold on permanently to references

Given the LRU policy, I would expect that this inconsistency would not happen until all other data had been spilled. At this point my guess is that the impact of this inconsistency is usually small (the workload was probably going to run out of memory anyway) but that's just a guess. It looks like you have more hands-on experience here.

Yes, this can only happen once the LRU is essentially depleted.

I'll try to collect a bit more data to see if this actually has a huge impact. From what I've seen, a very busy cluster can communicate/execute for minutes. We may need to decrease task sizes but still, if the network is busy we cannot rely on very fast transfers. A minute or two with strong data duplication may already be enough to drive workers OOM. However, if it is marginal the complexity is hardly worth the effort.

@fjetter
Copy link
Member Author

fjetter commented May 4, 2020

Following up with a few observations:

Context

The computation I'm running in this example is a huge groupby.apply where the function reduces to essentially no data.
I estimate the total data in memory to be here about ~1.5TB of in-memory pandas dataframe data. The cluster I'm running this on has 50 workers, 1CPU each with 32GB RAM each, in total that makes 1.6TB RAM. Obviously, for this to succeed we need to rely on spill-to-disk.

The dask version I am using is based on dask/dask#6154 since otherwise the spill-to-disk effectively is useless and first workers go OOM after about 10mins.

The distributed version is more or less 2.14.0 with some additional patches to observe and count the keys duplicated by get_data and execute

Observations

The following is a grafana dashboard showing the task states, CPU and memory usage statistics of the workers. The memory usage per worker remains consistently stable due to passive spill-to-disk (result is finished, put into buffer, buffer evicts results if necessary to maintain the threshold). Once the first user functions are evaluated (at around 18:30) we can see that the memory profile is a bit more volatile since the function itself collects huge chunks of data and works on these. This is also the time where we can see first logs about active spill-to-disk by the memory-monitor (70% threshold hit, evict data until back at target)

image

The following plot shows all data currently held by the workers which is not part of the self.data.fast dictionary. This is either data currently used by executing code or data currently submitting but not part of the fast dict (data based on logs emitted by a PeriodicCallback which compares the keys-in-use and keys-in-store). The left axis (blue) shows the number of keys currently in memory but not tracked while the right hand side shows the memory used by these keys, normalised to the worker memory limit.
We can see that even early on, already up to 18% of a workers memory is occupied by this untracked memory. Especially at a later point in the computation this peaks at about 40% of the memory limit, with duplication factors up to 11.

image

Summary

I was surprised to see that this duplication kicks in even at early stages of the pipeline and is more or less continuously an issue over the entire run. The impact is mostly below 5% memory which is probably not worth the effort but that it peaks at 40% is a bit concerning.

In the past I've often seen workers with up to 80% mem usage while reporting the "Memory use is high but worker has no data" and now I'm wondering if this is/was the issue (in combination to in-flight data but that's another topic).

Is this something we should address or am I paranoid?

@jakirkham
Copy link
Member

I wonder whether this is still the case with PR ( #3849 ). For clarity, that would impact spilling that uses pickle in addition to data transmission. IOW out-of-band buffers would be collected, but not copied. Meaning one would expect the overall memory usage to be lower when pickling objects that support out-of-band buffer extraction. This is because the same memory could be shared between the spilled and non-spilled forms.

@fjetter
Copy link
Member Author

fjetter commented Jul 9, 2020

Once the data is spilled and the reference to it is lost, there is no way to connect the serialized object to its original, is there? I'm thinking of a scenario like this

  • task finishes; put data of key in buffer on worker A
  • spill key to disk on worker A since memory threshold was hit
  • Loose reference and free memory
  • Data for key is requested by worker B from worker A but worker A is at memory limit, i.e. fast data is "full"
  • Worker A still loads data into memory but it doesn't store it in the fast dict since the memory threshold was breached. Therefore we get an entirely new copy without central reference to the object; Let's call this key1
  • In the meantime Worker C asks for the same key and the same thing happens but in a different coroutine. Since the data was never registered in the fast dict, the worker doesn't know of it and loads the data from disk, deserializes it and we have an entirely new copy key2
  • Now worker A wants to execute some code and requires the data of key but the data is still not available (data submission of the above two tasks not completed, yet). It loads data again from disk... now we have key3

In this scenario we have three distinct copies of the same data, don't we? In my naive understanding, pickle5 would help if we were trying to spill/submit/pickle concurrently but I don't believe this is causing this issue.

@jakirkham
Copy link
Member

Just for clarity, what I'm meaning is that data pickled with protocol 5 will take views of the data as part of pickling (as opposed to copying the data into bytes). Here's a code example for context:

import pickle5 as pickle
import numpy as np

a = np.arange(6)
b = []
d = pickle.dumps(a, protocol=5, buffer_callback=b.append)

assert np.may_share_memory(a, b[0])

This would mean the high-water mark would remain a bit lower during the run. Though it's worth noting compression during spilling may affect this as well. So more exploration of exactly how this would affect things is still needed.

That all being said, I've not dug into this issue deeply. Perhaps pickle protocol 5 is not relevant.

@jakirkham
Copy link
Member

There may also be other explanations for high memory usage. For example merge_frames (part of the serialization/deserialization pipeline) was causing data to get copied in some cases. PR ( #3960 ) fixes that issue.

rapids-bot bot pushed a commit to rapidsai/dask-cuda that referenced this issue Jan 6, 2021
This PR introduces a new _device host file_ that uses `ProxyObejct` to implement spilling of individual CUDA objects as opposed to the current host file, which spills entire keys. 

- [x] Implement spilling of individual objects
- [x] Handle task level aliasing
- [x] Handle shared device buffers
- [x] Write docs

To use, set `DASK_JIT_UNSPILL=True`

## Motivation  

### Aliases at the task level 

Consider the following two tasks:
```python

def task1():  # Create list of dataframes
    df1 = cudf.DataFrame({"a": range(10)})
    df2 = cudf.DataFrame({"a": range(10)})
    return [df1, df2]

def task2(dfs):  # Get the second item
    return dfs[1]    
```
Running the two task on a worker we get something like:
```python

>>> data["k1"] = task1()
>>> data["k2"] = task2(data["k1"])
>>> data
{
    "k1": [df1, df2],
    "k2": df2,
}
```
Since the current implementation of spilling works on keys and handles each keys separately, it overestimate the device memory used: `sizeof(df)*3`. But even worse, if it decides to spill `k2` no device memory is freed since `k1` still holds a reference to `df2`!

The new spilling implementation fixes this issue by wrapping identical CUDA objects in a shared `ProxyObejct` thus in this case `df2` in both `k1` and `k2` will refer to the same `ProxyObejct`.


### Sharing device buffers

Consider the following code snippet:
```python

>>> data["df"] = cudf.DataFrame({"a": range(10)})
>>> data["grouped"] = shuffle_group(data["df"], "a", 0, 2, 2, False, 2)
>>> data["v1"] = data["grouped"][0]
>>> data["v2"] = data["grouped"][1]
```
In this case `v1` and `v2` are separate objects and are handled separately both in the current and the new spilling implementation. However, the `shuffle_group()` in cudf actually returns a single device memory buffer such that `v1` and `v2` points to the same underlying memory buffer. Thus the current implement will again overestimate the memory use and spill one of the dataframes without any effect.
The new implementation takes this into account when estimating memory usage and make sure that either both dataframes are spilled or none of them are.

cc. @beckernick, @VibhuJawa
xref: dask/distributed#3756

Authors:
  - Mads R. B. Kristensen <madsbk@gmail.com>

Approvers:
  - Peter Andreas Entschev

URL: #451
@jakirkham
Copy link
Member

@gjoseph92 am wondering if PR ( #5208 ) would help here as well. Do you know? If not, no worries (mostly curious).

@gjoseph92
Copy link
Collaborator

I have no idea unfortunately. As I understood, this issue was more about a bookkeeping error, so I'd doubt it. It might help make the case where we re-materialize data in order to send it to a worker slightly less bad, since at least there wouldn't be another copy during the frames merge, but there would still be the data re-read from disk either way.

@fjetter
Copy link
Member Author

fjetter commented Aug 18, 2021

From my understanding the zero-copy fix doesn't help since the duplication I am referring to here is not caused by memcopies but rather by having distinct python objects. I'll try to explain with some simplified pseudo worker/zict buffer

class ZictSpillData:
    def __getitem__(self, key):
        if key in self.memory:
            return self.memory[key]
        else:
            data =  load_from_store(key)
            if sizeof(data) < self.memory_target:
                self.memory[key] = data
            return data


class Worker:
    async def execute(self, ts, ...):
        assert isinstance(self.data, ZictSpillData)
        data = {}
        for key in ts.dependencies:
            data[key] = self.data[key]
            assert key not in self.data.memory  # data was too large, the buffer forgot it again
        run_task(ts, data)
    
    def get_data(self, keys, ...):
        data = {}
        for key in ts.dependencies:
            data[key] = self.data[key]
            assert key not in self.data.memory  # data was too large, the buffer forgot it again
        return data

Let's assume, for simplicity, the data we're concerned about is large and will be spilled by the buffer immediately because it is beyond a given threshold memory_target = memory_limit * memory_target_fraction. In this case, whenever the data is accessed, the buffer will read it from storage and will create a new data instance without storing a ref to it in it's data. This instance is returned to the caller and every subsequent call to the buffer will do the exact same thing. Load data, create new instance, return without holding a ref.

In this extreme situation this is hopefully easy to follow. In more realisitc scenarios we're spilling data concurrently to other operations. For instance, data is being used as part of an execute but the buffer spills the data. This results in the data no longer being tracked by the buffer (in our new terminology this means the data is "unmanaged memory") but it is still in memory since it is being used by the task execution. If another worker then requests this piece of data we just spilled, it will load it from store and create a new instance and will start serialization. Simultaneously X other workers could do the same, resulting in X (get_data) + 1 (executing) copies of the same data because our buffer doesn't know about the keys still being used.

While typing, I'm wondering if we can't fix this by being a bit smart with weakrefs but I currently don't have time to test this idea.

Edit: Just in case I haven't made myself clear, yet. I believe this issue is caused by the zict.Buffer, not our serialization protocol or any other thing distributed is doing. This issue might be better off in the zict repo but I believe the visibility here is more important (and we have "our own" Buffer subclass, by now, anyways

class SpillBuffer(Buffer):
)

@fjetter
Copy link
Member Author

fjetter commented Jan 31, 2022

A very naive worst case calculation about how severe this issue is.

We're targeting get_data requests of about 50MB (Worker.target_message_size). Let's take this as a reference size for a keys size. In realistic workloads I wouldn't be surprised to see much larger data keys. The original report here described a shuffle for which 50MB splits are reasonable (input 2GB / 32 splits ~ 62MB).

The worker limits concurrent incoming get_data requests to Worker.total_in_connections == 10 (default) but allows this to double for same-host connections, i.e. #key_size * #total_in_commections * #same_host_mult

That yields a 500-1000MB data duplication for get_data assuming the LRU is missed consistently which I assume is approximately true for a shuffle.

The duplication on Worker.execute side scales linearly with the number of threads and the number of input dependents, i.e. #key_size * #dependencies * #num_threads. Again using 50MB per key and assuming 32 dependencies for a shuffle, that yields 1600MB per thread.

Let's have a look at a worst case, all workers on the same machine, max branching of 32 in a shuffle, key size at 50MB and 2 threads per worker (the two is pretty random, I have to admit), that's up to 84 duplications (10 * 2 + 32 * 2) of a keys data or 4200MB unmanaged memory caused by our spillage layer. various data copies and overhead of the UDF is on top of that.

I guess this worst case is not too far off from a LocalCluster "deployment" on a big VM. However, most of the numbers are mere guessing and this can easily be scaled up or down so please take this with a healthy pinch of salt.
Either way, duplicates also mean duplicate effort in deserialization and disk IO so there is a case to be made to avoid these regardless of the memory footprint.

@crusaderky
Copy link
Collaborator

This looks like something that could be solved with weakref and a few extra lines in spill.py. I'll take a stab at it after #5543 is merged.

@fjetter
Copy link
Member Author

fjetter commented Jan 31, 2022

This looks like something that could be solved with weakref and a few extra lines in spill.py.

That's what I thought as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants