-
Notifications
You must be signed in to change notification settings - Fork 598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
storage: high memory usage of compactor #3921
Comments
After some offline discussion with Arne, here are some ideas:
|
The ideas look good to me. cc @hzxa21 |
What's "slack"? 🤔 |
Well its an idea (credits to @ALeitert) where if we are merging one small SST (RHS) into a bunch of SSTs with non-overlapping ranges (LHS). If |RHS| << |LHS|, as each LHS SST is filled, the leftover values are pushed onto another SST, and this would result in a cascade of writes on the LHS. If we instead did not have full SSTs, the new writes can be absorbed by the "slack" region at the end of each LHS SST. I guess the idea is like a B-Tree. However, I guess the reason why this may not be so effective is that RHS is a big batch, so we will likely be touching most of LHS SSTs anyway, assuming some uniform distribution of keys, and that is the point of LSM - we are batching inserts to postpone non-sequential writes to compaction stage. |
It was an idea I had when thinking about the tables and doing some test on my machine. I noticed that the compactor is often loading and rebuilding lots of tables and compacting them even if there is only a small change. And, as mentioned, a small change at the beginning, has a cascading effect to all following. Extreme case would be that we add one KV pair in the very first table. The idea is not fully developed yet, and I would also not give it priority. I also do not know how likely such cases are for real-life data. |
In most cases, we use dedicated compactor nodes for compaction so I think we can assume there is no memory resource competition between compactor and other components. The goal of bounding compactor memory consumption is to avoid OOM. Ideally, we should bound the working set of compactor to avoid OOM but try to prefetch as much as possible. |
In terms of implementation complexity, @ALeitert points out that 4. it is as simple as organizing the SSTables to be compacted in a given task into groups of non-overlapping ranges. Then, we merge over each of these If we have a small number of groups relative to total number of tables being merged, we would be successful in reducing the memory usage at any given point in time while improving our yield for large number of SSTables that have been merged into a collection of non-overlapping SSTs. So I would disagree about added complexity for 4. However, we do need to validate if this helps our workloads. If our workload looks like merging a large number of non-overlapping SSTs with a small number of overlapping SSTs, then it will help. If it looks like merging a large number of overlapping ranges, then it will not help, and we need to rely on:
Arne is working on validating what kind of workload we have by measuring whether interval scheduling can result in small number of groups relative to number of sstables in the task and whether the maximum memory used for storing SSTables in a task at any point of time is reduced. Generally, I think that 2 and 3 are poor compromises. We want a higher yield (large collections of SSTs with non-overlapping ranges per second) for fixed amount of memory. 2 results in same number of SSTs yielded, but results in more than one collection of non-overlapping SSTs. 3 results in fewer SSTs yielded per second. As for 1. I'm not sure how good of an idea it is, as we can save costs from fetching a large contiguous SST from memory (single S3 get request). So I would be hesistant about the prefetch/stream approach. Edit: this is erroneous. We can issue a single get from S3 and stream the resultant bytes. HTTP handles flow control so we only pull data as needed to be buffered for consumption.
A small number N of This is complementary to idea 4. We require idea 4 to make this idea work. This idea will make our ability to apply idea 4 more likely. 5 can be implemented by performing interval scheduling at the meta node. We add groups to a task randomly to have a more uniform workload across tasks (tasks have a balance of groups with many SSTs and few SSTs, and those in between). We add groups up to a max_overlapping_threshold. So we should investigate as:
|
We check the compactor log and find out that the compaction task itself is not huge but the compactor node is running 6-7 compaction tasks concurrently. I think we should limit the number of in-flight compaction tasks per compactor node first. |
But I think in the approaches above, we are trying to make each compaction task itself more efficient, allowing for more compacition tasks to be run concurrently on the same memory budget, resulting in higher SST yield. Reducing number of tasks can prevent OOM but reduces yield.
Not huge does not mean not higher than it needs to be. |
Only Level0 has overlapping SSTs in our design so we should always use FYI, Sub-level introduced in #3111 can also help to make compaction more efficient by reudcing overlapping SSTs in a compaction task. Back to the problem described in this issue, I still think controlling compaction parallelism (i.e. number of running compaction taska) in a compactor node is a simpler and more effective solution because 1) compaction task is preferred to be kept small in many cases; 2) the root cause of this issue is due to too many not-too-big compaction tasks running in one compactor node at the same time. We should try to scale compactor nodes if all availale compactor nodes hit the per node compaction parallelism. limit |
@jon-chuang |
I think rather than limit parallelism per se, we may be better off limiting the expected memory consumption. This way, we can set the target memory usage based on node's memory, whether total or available (where for the latter it is assumed we are sharing the node with other processes). We can assume that N SSTs from L0 in the task results in N * L0 size bytes consumed, and any SST from L1 and above results in 1 * L? size bytes consumed. Since we store the resultant SST in memory until upload, we can also add the target level's size bytes to the budget consumed. Or, I guess, parallelism might be fixed (by number of cores) and we adjust the max number of L0 SSTs we try to merge in one task according to |
Indeed, 1 would only benefit L0->L0 or L0->L1 compaction involving more than one SST from L0. I guess to improve things in light of sub-level compaction, we should try to find concat iterators in each non-overlapping sub-level. So that only highest L0 sub level will need an SSTableIterator |
Actually, considering that currently, the largest SST file can be 2GB, streaming SSTable iter approach does seem reasonable. Perhaps, fetching blocks up to a buffer of 32MBs makes sense. However, note that those would be from levels with non-overlapping SST files. So we would have at most one large SST at a time. On the other hand, we store the SSTs resulting from compaction in memory for the duration of creation and upload as well. Since the target level is >= any of the inputs, the memory usage from keeping these files in memory instead of a streaming upload is already at least the size of largest input SST file. In other words, we may not be able to significantly reduce memory usage unless we also allow streaming upload of blocks of an SST prior to it being sealed... Details: S3 sdk requires a ByteStream implementing object, not necessarily In our case, our object store interface could have an Likewise, object store should have a |
In the origin implement we will read only one block and hold it in block-cache it means that we can control the memory of compactor precisely. But there is a problem that we must pay AWS per IO requests. If one object is 32MB and the size of each of block is 64KB, it means we must pay 500 times money than main branch. |
Exactly it could only reach 256MB and I will change it to 128MB. So I do not think it is the most important thing to reduce the memory usage for us. We only need to control the memory not exceed the upper limit and that is enough. |
🤔 In fact, I should prioritize identifying the various components of the memory occupy. |
@wenym1 has investigated streaming upload to S3 (see #1368) before. In short, it is not possible to achieve streaming upload via providing ByteStream to s3 sdk because s3 only supports http1. Yiming have plan to investigate multi-part upload in this Q and we will see how much it can help.
+1 |
Well, I was thinking that single get request would be streamed over TCP which has flow control, allowing us to control application buffer size and hence memory usage. |
Oh, hold on, I thought that const DEFAULT_TARGET_FILE_SIZE_BASE: u64 = 32 * 1024 * 1024; // 32MB While due to input.target_file_size = self.config.target_file_size_base
<< (input.target_level.level_idx as usize - base_level); L6 file size is 32MB * 2^6 = 2GB? |
If we can achieve stream without additional S3 get request, I think we get a zero-cost memory usage reduction. But it may not be worth it if we cannot do streamed or multi-part upload to S3 as then we get at most factor of 2 reduction in memory usage, instead of possibly factor > 10. Due to the potential to achieve quite a good memory efficiency, allowing us to potentially run a lot more compaction tasks on a single node (up to limit imposed by compute requirement of each task), or on a node with smaller amount of memory, thus saving users cost, I believe it may be worth looking into continuing investigation of #1368 via S3's multipart upload API. Currently, the Rust SDK actually already seems to support this: awslabs/aws-sdk-rust#494. Btw, to my understanding, there is no cost to multiple uploads to S3, multipart upload API or otherwise. (https://aws.amazon.com/s3/pricing/) |
This requires us to provide body size in advance. So we still need to generate the full SST before uploading. |
Yes, so I'm suggesting to use multipart upload API. |
Sure I think we can do multipart upload. I had some rough idea before about providing a new upload API for object store to accept a stream like input and upload the whole SST portion by portion (ideally block by block). The similar idea will also be helpful to save memory for spill to disk, since a file handle can write like a stream instead of writing the whole object. I plan to support this feature soon after I do some experiment about multipart upload on both S3 and MinIO. Besides, it seems that there is a limit on the minimum part size for multipart upload, so we need to handle the upload in a tricky way. |
I guess one requirement is that parts are uploaded sequentially after previous one is complete. I think from our perspective, object store interface implemented for S3 should pull bytes from input stream. If input stream not exhausted, add to buffer of fixed size (min multipart part size, or first multiple of block size greater than that). Once full, do upload of part. Else, if stream is exhausted, just upload remainder. So its probably not too tricky. |
No. in src/storage/src/hummock/compactor.rs we can limit it by let target_file_size = std::cmp::min(
self.compact_task.target_file_size as usize,
max_target_file_size,
); |
So I do not support uploading data by streaming API because it is not necessary. Hummock is a LSM database engine instead of FileSystem. We can limit file size no more than 100MB. And according suggestion from AWS, we only need multipart uploading for large file more than 100MB. |
It's not as simple as this solution. When we upload the remainder, the remainder is also a part, but we cannot ensure that this part is greater than the minimum part size, which may cause the last upload part to fail (not sure about this yet, we may need some experiment on S3 and MinIO). A solution to this is, we always ensure the multipart upload buffer to have more than one part size data, and we will need a buffer of size of two times the minimum part size. When the buffer size exceeds two times the minimum part size, we upload the first minimum part size, and the remaining buffer size is still greater than the minimum part size. In this way, we can make sure the last part can be greater than the minimum part size. If the whole data is smaller than the minimum part size, just do a simple upload. |
Indeed, if we limit the size to ~100 MB, there is a reduced incentive to do streaming download or multipart upload, at least with saving memory as a motivation. However, I guess we don't want to have too small file sizes, then we have to do more get requests for both metadata and data and maintain more metadata. Further, there are other advantages to streaming:
If we can reduce memory cost significantly for 128MB * 10 input files, we can save ~ 1GB per task. So even with reduced incentive, there may be a strong incentive. In terms of prioritizing streaming download v.s. streaming upload, given that when file size is large (~100MB), input file size == output file size, it may be worth spending more time optimizing the download rather than upload. Since for level multiplier = 10, input SSTs would be 10x output, the effectiveness of streaming the download is 10x of streaming the upload. So perhaps we should deprioritize streaming upload as it is also significantly more complicated (including storing output in block cache for local compaction). |
@jon-chuang I do not understand why you think the speed of download could be improved by 10x by streaming |
The total size of input would be close to output in most of case. |
Actually I meant memory consumption saved by streaming download would be 10x of streaming upload. details: In our case, download thread pool maintains open TCP (S3) connection for each SST, and then downloads next block (calls To optimize latency even further, we can have a buffer to fetch N up to blocks per SST, as we assume compaction speed >> download speed. Tradeoff is memory v.s. potential bubble waiting for next block which is not fully downloaded. As an optimization, we have a high queue depth only for the SSTs in the concat iterator, but not in the independent SST iterators. We can have a flag or argument in iterator that indicates prefetch queue depth for that particular SST.
Yes, but size of output SST == size of each input SST. Total SST held in memory = 10 X input + 1 X output. |
The high memory usage of compactor seems to be caused by some bug. I think we shall fix bug before any optimize for read |
No. We only merge overlapping L0 no more than 128MB. It means that most cases, the input data onlys uses several tens MB. |
Oh yes, I was mistaken, if k > 0 then its always 1 from Lk and 1 from Lk+1 needed at a time, not 10 from Lk. So yes, we wouldn't not see this factor 10 improvement I talked about. |
Perhaps the issue is Another possibility: load_table returns CachableEntry, which has an arc reference to LRU https://github.com/singularity-data/risingwave/blob/main/src/common/src/cache.rs Btw, why does meta cache in SSTableStore not cache meta but actual SSTable? https://github.com/singularity-data/risingwave/blob/6d3183927831ea9077499e01470e63f857337301/src/storage/src/hummock/sstable_store.rs#L48 Also, load_table is a particularly confusing piece of code. I have no idea what it is doing! https://github.com/singularity-data/risingwave/blob/6d3183927831ea9077499e01470e63f857337301/src/storage/src/hummock/sstable_store.rs#L232 |
You can see from an old implementation of load_table https://github.com/singularity-data/risingwave/blob/fadff1446325761deb2f06c3e187777b93a8f07e/src/storage/src/hummock/sstable_store.rs#L290 that the current usage is an abuse of the original intent. meta_cache is only meant to store SSTables without data blocks. But here we are abusing it by storing data blocks. I believe this is the cause of high memory usage. This change was introduced in this PR: https://github.com/singularity-data/risingwave/pull/3276/files# The PR attempts to piggy back deduplication of data fetch with deduplication of meta fetch. To my mind, this does not seem like a best practice. If the load_data parameter is not uniform, the requests cannot even be meaningfully deduplicated. I believe we should never insert SSTable with any blocks and add a sanity check which ensures as much. If we want to dedup data block requests, I think it should proceed via a separate mechanism (i.e. via object store e.g. S3 Object Store implementation) Also, I think it is a bad practice to have SSTable struct sometimes hold data, and sometimes not. We should use separate structs or enums for each case. |
To fix this, I believe we should post-fill a copy of the SSTable only after block-less one has been inserted into cache. We should also return an owned value of SSTable if possible. Finally, we may want to consider reducing use of Arc, to reduce chance of memory leak (unrelated) |
The size of the blocks in
Holding an Arc of the LruCache is necessary since we have to sure that the LruCache is not freed while the CachableEntry is accessing the pointer of LruHandle.
The reason why @Little-Wallace chose to store the whole prefetched SST in How we used to use
The |
Memory leak only happens for Arc when cyclic dependency appears. The current Arc dependency graph is quite clearer and is acyclic, so I don't think it's a good idea to reduce the current use of Arc. |
Our current implementation of LruCache is not a strict limitation on memory usage. The externally referenced CacheEntry cannot be evicted. Therefore, when we insert a new entry and the cache is going to exceed the memory capacity, even after we have evicted all evict-able entry, the memory usage can still exceed the memory capacity. In this scenario, insertion to LruCache is still allowed instead of blocking the insertion or failing. The total memory usage will exceed the capacity for a while until some entries are not externally referenced and evicts its data in LruCache. I am not sure whether this increases the real memory usage of LruCache. We may focus on the metrics of the LruCache usage of compactor to see whether the LruCache is using more memory than capacity. |
This bug is caused by the incorrectly charge for |
Ok, but we should not pollute the meta cache to achieve this…? |
Anw, meta_cache is not really that useful in the context of compactor. Since compactor and compute do not share same sstable_store, and compactor will in general fetch a single SST once and then never again, as a new SST has replaced it. For this reason, perhaps we can reduce the meta and block cache capacity significantly for compactor. |
Compactor only use meta-cache. Because compactor always need the whole file, rather than some blocks. |
But do we even need a cache? We can just stream the blocks and have the stream do the prefetching. Cache seems like the wrong tool here. |
The compactor would still cost a lot of memory than our statistics. |
I'm working on this issue by #4590 |
I did some experiments and thought there may be memory leak in compactor. I'm currently investigating it. |
In TPC-H Q12 3-node benchmarks.
The text was updated successfully, but these errors were encountered: