Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: high memory usage of compactor #3921

Closed
skyzh opened this issue Jul 15, 2022 · 49 comments · Fixed by #3268
Closed

storage: high memory usage of compactor #3921

skyzh opened this issue Jul 15, 2022 · 49 comments · Fixed by #3268

Comments

@skyzh
Copy link
Contributor

skyzh commented Jul 15, 2022

image

In TPC-H Q12 3-node benchmarks.

@jon-chuang
Copy link
Contributor

jon-chuang commented Jul 20, 2022

After some offline discussion with Arne, here are some ideas:

  1. Do not load all SST blocks into memory at once. We just load 1 block per SST + prefetch next few blocks in fixed-size queue. (Isn't this streaming merge already implemented? In which case, this would have wrongly identified the issue as trying to merge all SSTs at once, as we would have constant memory for fixed number of SSTs merged.).
  2. Memory-limit k-way merge sort (funnel sort?). Limit to K SSTs merged at once. Needs multiple stages where intermediate results are written back to storage.
  3. Add some "slack" to each SST so that merging into it will not result to a cascade of writes (probably does not help that much, as we could expect quite a lot of cross-SST merges)
  4. Interval scheduling of SSTs so that we do not have an all-or-nothing approach to merging if there are any non-overlapping ranges.

@ALeitert

@skyzh
Copy link
Contributor Author

skyzh commented Jul 20, 2022

The ideas look good to me. cc @hzxa21

@lmatz
Copy link
Contributor

lmatz commented Jul 20, 2022

What's "slack"? 🤔

@jon-chuang
Copy link
Contributor

jon-chuang commented Jul 20, 2022

Well its an idea (credits to @ALeitert) where if we are merging one small SST (RHS) into a bunch of SSTs with non-overlapping ranges (LHS). If |RHS| << |LHS|, as each LHS SST is filled, the leftover values are pushed onto another SST, and this would result in a cascade of writes on the LHS. If we instead did not have full SSTs, the new writes can be absorbed by the "slack" region at the end of each LHS SST.

I guess the idea is like a B-Tree. However, I guess the reason why this may not be so effective is that RHS is a big batch, so we will likely be touching most of LHS SSTs anyway, assuming some uniform distribution of keys, and that is the point of LSM - we are batching inserts to postpone non-sequential writes to compaction stage.

@ALeitert
Copy link
Contributor

It was an idea I had when thinking about the tables and doing some test on my machine. I noticed that the compactor is often loading and rebuilding lots of tables and compacting them even if there is only a small change. And, as mentioned, a small change at the beginning, has a cascading effect to all following. Extreme case would be that we add one KV pair in the very first table. The idea is not fully developed yet, and I would also not give it priority. I also do not know how likely such cases are for real-life data.

@hzxa21
Copy link
Collaborator

hzxa21 commented Jul 20, 2022

After some offline discussion with Arne, here are some ideas:

  1. Do not load all SST blocks into memory at once. We just load 1 block per SST + prefetch next few blocks in fixed-size queue. (Isn't this streaming merge already implemented? In which case, this would have wrongly identified the issue as trying to merge all SSTs at once, as we would have constant memory for fixed number of SSTs merged.).
  2. Memory-limit k-way merge sort (funnel sort?). Limit to K SSTs merged at once. Needs multiple stages where intermediate results are written back to storage.
  3. Add some "slack" to each SST so that merging into it will not result to a cascade of writes (probably does not help that much, as we could expect quite a lot of cross-SST merges)
  4. Interval scheduling of SSTs so that we do not have an all-or-nothing approach to merging if there are any non-overlapping ranges.

@ALeitert

  1. sounds good to me. For 2)-4), I am not sure how much we can get with the added complexity so we need to bench first.

In most cases, we use dedicated compactor nodes for compaction so I think we can assume there is no memory resource competition between compactor and other components. The goal of bounding compactor memory consumption is to avoid OOM. Ideally, we should bound the working set of compactor to avoid OOM but try to prefetch as much as possible.

@jon-chuang
Copy link
Contributor

jon-chuang commented Jul 20, 2022

In terms of implementation complexity, @ALeitert points out that 4. it is as simple as organizing the SSTables to be compacted in a given task into groups of non-overlapping ranges. Then, we merge over each of these ConcatIterators (which lazily load SSTables into memory) in the MergeIterator.

If we have a small number of groups relative to total number of tables being merged, we would be successful in reducing the memory usage at any given point in time while improving our yield for large number of SSTables that have been merged into a collection of non-overlapping SSTs.

So I would disagree about added complexity for 4.


However, we do need to validate if this helps our workloads. If our workload looks like merging a large number of non-overlapping SSTs with a small number of overlapping SSTs, then it will help. If it looks like merging a large number of overlapping ranges, then it will not help, and we need to rely on:

  1. Idea 1 (stream blocks for SSTableIterator instead of loading them eagerly)
  2. Limiting number of SSTs we merge at once
  3. Reduce parallelism for compaction (not ideal?)

Arne is working on validating what kind of workload we have by measuring whether interval scheduling can result in small number of groups relative to number of sstables in the task and whether the maximum memory used for storing SSTables in a task at any point of time is reduced.


Generally, I think that 2 and 3 are poor compromises. We want a higher yield (large collections of SSTs with non-overlapping ranges per second) for fixed amount of memory. 2 results in same number of SSTs yielded, but results in more than one collection of non-overlapping SSTs. 3 results in fewer SSTs yielded per second.


As for 1. I'm not sure how good of an idea it is, as we can save costs from fetching a large contiguous SST from memory (single S3 get request). So I would be hesistant about the prefetch/stream approach.

Edit: this is erroneous. We can issue a single get from S3 and stream the resultant bytes. HTTP handles flow control so we only pull data as needed to be buffered for consumption.


  1. The final idea that has not been covered so far is improving the way in which scheduling is performed. To my understanding, one possible ideal scenario is:

A small number N of SSTable with large range (RHS) are assigned to one task, to be merged with a large number M of non-overlapping SSTables each with small range (LHS). Then, the memory usage is N + 1 which is << N + M.

This is complementary to idea 4. We require idea 4 to make this idea work. This idea will make our ability to apply idea 4 more likely.

5 can be implemented by performing interval scheduling at the meta node. We add groups to a task randomly to have a more uniform workload across tasks (tasks have a balance of groups with many SSTs and few SSTs, and those in between).

We add groups up to a max_overlapping_threshold.


So we should investigate as:

  • 4 (interval scheduling at task level + lazy loading for each group of non-overlapping SSTs) + final idea 5 (interval scheduling at meta level)
  • 1 (lazy loading of blocks on the SST level + prefetching)

@hzxa21
Copy link
Collaborator

hzxa21 commented Jul 20, 2022

We check the compactor log and find out that the compaction task itself is not huge but the compactor node is running 6-7 compaction tasks concurrently. I think we should limit the number of in-flight compaction tasks per compactor node first.

@jon-chuang
Copy link
Contributor

jon-chuang commented Jul 20, 2022

compaction task itself is not huge but the compactor node is running 6-7 compaction tasks concurrently.

But I think in the approaches above, we are trying to make each compaction task itself more efficient, allowing for more compacition tasks to be run concurrently on the same memory budget, resulting in higher SST yield.

Reducing number of tasks can prevent OOM but reduces yield.

compaction task itself is not huge

Not huge does not mean not higher than it needs to be.

@hzxa21
Copy link
Collaborator

hzxa21 commented Jul 20, 2022

@ALeitert points out that 4. it is as simple as organizing the SSTables to be compacted in a given task into groups of non-overlapping ranges. Then, we merge over each of these ConcatIterators (which lazily load SSTables into memory) in the MergeIterator.

Only Level0 has overlapping SSTs in our design so we should always use ConcatIterators for the compaction input SSTs from L1-N. IIUC, this proposal is trying to divide n L0 SSTs into k groups with each group containing non-overlapping SSTs so that we can have a MergeIterator on top of k ConcatIterator instead of a MergeIterator on top of n SSTableIterator. This is a valid optimization and worth trying. My guess is we won't benefit much from it because 1) it only applies to L0->L1 compaction; 2) SSTs in L0 normally has a very wide key range.

FYI, Sub-level introduced in #3111 can also help to make compaction more efficient by reudcing overlapping SSTs in a compaction task.

Back to the problem described in this issue, I still think controlling compaction parallelism (i.e. number of running compaction taska) in a compactor node is a simpler and more effective solution because 1) compaction task is preferred to be kept small in many cases; 2) the root cause of this issue is due to too many not-too-big compaction tasks running in one compactor node at the same time. We should try to scale compactor nodes if all availale compactor nodes hit the per node compaction parallelism. limit

@Little-Wallace
Copy link
Contributor

Little-Wallace commented Jul 20, 2022

Interval scheduling of SSTs so that we do not have an all-or-nothing approach to merging if there are any non-overlapping ranges.

@jon-chuang
I have implement it called trivial move in https://github.com/singularity-data/risingwave/pull/3111/files . If some SST does not overlap with anyone in the target level, it would be moved to the position in target level without any compaction task in compactor.

@jon-chuang
Copy link
Contributor

jon-chuang commented Jul 20, 2022

root cause of this issue is due to too many not-too-big compaction tasks running in one compactor node at the same time. We should try to scale compactor nodes if all availale compactor nodes hit the per node compaction parallelism. limit

I think rather than limit parallelism per se, we may be better off limiting the expected memory consumption. This way, we can set the target memory usage based on node's memory, whether total or available (where for the latter it is assumed we are sharing the node with other processes).

We can assume that N SSTs from L0 in the task results in N * L0 size bytes consumed, and any SST from L1 and above results in 1 * L? size bytes consumed.

Since we store the resultant SST in memory until upload, we can also add the target level's size bytes to the budget consumed.

Or, I guess, parallelism might be fixed (by number of cores) and we adjust the max number of L0 SSTs we try to merge in one task according to target_memory_usage / parallelism

@jon-chuang
Copy link
Contributor

jon-chuang commented Jul 21, 2022

Indeed, 1 would only benefit L0->L0 or L0->L1 compaction involving more than one SST from L0. I guess to improve things in light of sub-level compaction, we should try to find concat iterators in each non-overlapping sub-level. So that only highest L0 sub level will need an SSTableIterator

@jon-chuang
Copy link
Contributor

jon-chuang commented Jul 21, 2022

Actually, considering that currently, the largest SST file can be 2GB, streaming SSTable iter approach does seem reasonable. Perhaps, fetching blocks up to a buffer of 32MBs makes sense.

However, note that those would be from levels with non-overlapping SST files. So we would have at most one large SST at a time. On the other hand, we store the SSTs resulting from compaction in memory for the duration of creation and upload as well.

Since the target level is >= any of the inputs, the memory usage from keeping these files in memory instead of a streaming upload is already at least the size of largest input SST file. In other words, we may not be able to significantly reduce memory usage unless we also allow streaming upload of blocks of an SST prior to it being sealed...

Details:

S3 sdk requires a ByteStream implementing object, not necessarily Bytes like we provide now. So if we change the object store interface to accept some sort of byte stream instead of bytes, perhaps we can achieve a streaming upload. Specifically, in the S3 case, the way to achieve this would be to have smithy::ByteStream::from(SdkBody::from(hyper::Body::channel().0) as the upload parameter.

In our case, our object store interface could have an upload_stream(path, byte_stream) interface where byte_stream is a generic bytestream implementing Stream trait. The s3 object store would take data from this bytestream and dump into the hyper byte stream (or simply use a hyper::Body's wrap_stream interface). However, we should investigate Hyper's stream buffer size.

Likewise, object store should have a read_stream interface that allows reading dyn Stream object. Then, we can have a BlockStream and SstableStream objects that stream the bytes and produces the next block by calling next().await. etc.

@Little-Wallace
Copy link
Contributor

In the origin implement we will read only one block and hold it in block-cache it means that we can control the memory of compactor precisely. But there is a problem that we must pay AWS per IO requests. If one object is 32MB and the size of each of block is 64KB, it means we must pay 500 times money than main branch.

@Little-Wallace
Copy link
Contributor

Actually, considering that currently, the largest SST file can be 2GB, streaming SSTable iter approach does seem reasonable. Perhaps, fetching blocks up to a buffer of 32MBs makes sense.

Exactly it could only reach 256MB and I will change it to 128MB. So I do not think it is the most important thing to reduce the memory usage for us. We only need to control the memory not exceed the upper limit and that is enough.
The cost of IO for S3 is much expensive.

@Little-Wallace
Copy link
Contributor

🤔 In fact, I should prioritize identifying the various components of the memory occupy.

@hzxa21
Copy link
Collaborator

hzxa21 commented Jul 21, 2022

Actually, considering that currently, the largest SST file can be 2GB, streaming SSTable iter approach does seem reasonable. Perhaps, fetching blocks up to a buffer of 32MBs makes sense.

However, note that those would be from levels with non-overlapping SST files. So we would have at most one large SST at a time. On the other hand, we store the SSTs resulting from compaction in memory for the duration of creation and upload as well.

Since the target level is >= any of the inputs, the memory usage from keeping these files in memory instead of a streaming upload is already at least the size of largest input SST file. In other words, we may not be able to significantly reduce memory usage unless we also allow streaming upload of blocks of an SST prior to it being sealed...

Details:

S3 sdk requires a ByteStream implementing object, not necessarily Bytes like we provide now. So if we change the object store interface to accept some sort of byte stream instead of bytes, perhaps we can achieve a streaming upload. Specifically, in the S3 case, the way to achieve this would be to have smithy::ByteStream::from(SdkBody::from(hyper::Body::channel().0) as the upload parameter.

In our case, our object store interface could have an upload_stream(path, byte_stream) interface where byte_stream is a generic bytestream implementing Stream trait. The s3 object store would take data from this bytestream and dump into the hyper byte stream (or simply use a hyper::Body's wrap_stream interface). However, we should investigate Hyper's stream buffer size.

Likewise, object store should have a read_stream interface that allows reading dyn Stream object. Then, we can have a BlockStream and SstableStream objects that stream the bytes and produces the next block by calling next().await. etc.

@wenym1 has investigated streaming upload to S3 (see #1368) before. In short, it is not possible to achieve streaming upload via providing ByteStream to s3 sdk because s3 only supports http1. Yiming have plan to investigate multi-part upload in this Q and we will see how much it can help.

We only need to control the memory not exceed the upper limit and that is enough

+1

@jon-chuang
Copy link
Contributor

jon-chuang commented Jul 21, 2022

In the origin implement we will read only one block and hold it in block-cache it means that we can control the memory of compactor precisely. But there is a problem that we must pay AWS per IO requests. If one object is 32MB and the size of each of block is 64KB, it means we must pay 500 times money than main branch.

Well, I was thinking that single get request would be streamed over TCP which has flow control, allowing us to control application buffer size and hence memory usage.

@jon-chuang
Copy link
Contributor

Exactly it could only reach 256MB and I will change it to 128MB.

Oh, hold on, I thought that
L0 file size is

const DEFAULT_TARGET_FILE_SIZE_BASE: u64 = 32 * 1024 * 1024; // 32MB

While due to

input.target_file_size = self.config.target_file_size_base
    << (input.target_level.level_idx as usize - base_level);

L6 file size is 32MB * 2^6 = 2GB?

@jon-chuang
Copy link
Contributor

jon-chuang commented Jul 21, 2022

We only need to control the memory not exceed the upper limit and that is enough.

If we can achieve stream without additional S3 get request, I think we get a zero-cost memory usage reduction.

But it may not be worth it if we cannot do streamed or multi-part upload to S3 as then we get at most factor of 2 reduction in memory usage, instead of possibly factor > 10.


Due to the potential to achieve quite a good memory efficiency, allowing us to potentially run a lot more compaction tasks on a single node (up to limit imposed by compute requirement of each task), or on a node with smaller amount of memory, thus saving users cost, I believe it may be worth looking into continuing investigation of #1368 via S3's multipart upload API.

Currently, the Rust SDK actually already seems to support this: awslabs/aws-sdk-rust#494.

Btw, to my understanding, there is no cost to multiple uploads to S3, multipart upload API or otherwise. (https://aws.amazon.com/s3/pricing/)

@skyzh
Copy link
Contributor Author

skyzh commented Jul 21, 2022

S3 sdk requires a ByteStream implementing object, not necessarily Bytes like we provide now. So if we change the object store interface to accept some sort of byte stream instead of bytes, perhaps we can achieve a streaming upload. Specifically, in the S3 case, the way to achieve this would be to have smithy::ByteStream::from(SdkBody::from(hyper::Body::channel().0) as the upload parameter.

This requires us to provide body size in advance. So we still need to generate the full SST before uploading.

@jon-chuang
Copy link
Contributor

This requires us to provide body size in advance. So we still need to generate the full SST before uploading.

Yes, so I'm suggesting to use multipart upload API.

@wenym1
Copy link
Contributor

wenym1 commented Jul 21, 2022

Yes, so I'm suggesting to use multipart upload API.

Sure I think we can do multipart upload.

I had some rough idea before about providing a new upload API for object store to accept a stream like input and upload the whole SST portion by portion (ideally block by block). The similar idea will also be helpful to save memory for spill to disk, since a file handle can write like a stream instead of writing the whole object.

I plan to support this feature soon after I do some experiment about multipart upload on both S3 and MinIO. Besides, it seems that there is a limit on the minimum part size for multipart upload, so we need to handle the upload in a tricky way.

@jon-chuang
Copy link
Contributor

jon-chuang commented Jul 21, 2022

so we need to handle the upload in a tricky way.

I guess one requirement is that parts are uploaded sequentially after previous one is complete.

I think from our perspective, object store interface implemented for S3 should pull bytes from input stream. If input stream not exhausted, add to buffer of fixed size (min multipart part size, or first multiple of block size greater than that). Once full, do upload of part. Else, if stream is exhausted, just upload remainder. So its probably not too tricky.

@Little-Wallace
Copy link
Contributor

L6 file size is 32MB * 2^6 = 2GB?

No. in src/storage/src/hummock/compactor.rs we can limit it by

        let target_file_size = std::cmp::min(
            self.compact_task.target_file_size as usize,
            max_target_file_size,
        );

@Little-Wallace
Copy link
Contributor

So I do not support uploading data by streaming API because it is not necessary. Hummock is a LSM database engine instead of FileSystem. We can limit file size no more than 100MB. And according suggestion from AWS, we only need multipart uploading for large file more than 100MB.

@wenym1
Copy link
Contributor

wenym1 commented Jul 21, 2022

I think from our perspective, object store interface implemented for S3 should pull bytes from input stream. If input stream not exhausted, add to buffer of fixed size (min multipart part size, or first multiple of block size greater than that). Once full, do upload of part. Else, if stream is exhausted, just upload remainder. So its probably not too tricky.

It's not as simple as this solution.

When we upload the remainder, the remainder is also a part, but we cannot ensure that this part is greater than the minimum part size, which may cause the last upload part to fail (not sure about this yet, we may need some experiment on S3 and MinIO). A solution to this is, we always ensure the multipart upload buffer to have more than one part size data, and we will need a buffer of size of two times the minimum part size. When the buffer size exceeds two times the minimum part size, we upload the first minimum part size, and the remaining buffer size is still greater than the minimum part size. In this way, we can make sure the last part can be greater than the minimum part size. If the whole data is smaller than the minimum part size, just do a simple upload.

@jon-chuang
Copy link
Contributor

jon-chuang commented Jul 22, 2022

So I do not support uploading data by streaming API because it is not necessary.

Indeed, if we limit the size to ~100 MB, there is a reduced incentive to do streaming download or multipart upload, at least with saving memory as a motivation.

However, I guess we don't want to have too small file sizes, then we have to do more get requests for both metadata and data and maintain more metadata.

Further, there are other advantages to streaming:

  1. Performance - According to https://singularity-data.quip.com/RalDAVhdy5EI/S3-performance#eCFACA0rGWu, download of 128MB file takes on average 1.6s. Upload 128MB takes 1.28s, and 0.69s for multipart. note: If task and compaction share same thread, streaming will not improve performance. We need concurrent download, compaction and upload to achieve better per-task completion time.

If we can reduce memory cost significantly for 128MB * 10 input files, we can save ~ 1GB per task. So even with reduced incentive, there may be a strong incentive.


In terms of prioritizing streaming download v.s. streaming upload, given that when file size is large (~100MB), input file size == output file size, it may be worth spending more time optimizing the download rather than upload. Since for level multiplier = 10, input SSTs would be 10x output, the effectiveness of streaming the download is 10x of streaming the upload. So perhaps we should deprioritize streaming upload as it is also significantly more complicated (including storing output in block cache for local compaction).

@Little-Wallace
Copy link
Contributor

@jon-chuang I do not understand why you think the speed of download could be improved by 10x by streaming

@Little-Wallace
Copy link
Contributor

The total size of input would be close to output in most of case.

@jon-chuang
Copy link
Contributor

jon-chuang commented Jul 22, 2022

I do not understand why you think the speed of download could be improved by 10x by streaming

Actually I meant memory consumption saved by streaming download would be 10x of streaming upload.

details:
I guess conversely, if we do run downloads concurrently, in a download thread pool, then we could get 10x improvement in download speed. This is a next step optimization. We are creating an RFC that includes this proposal as future optimization.

In our case, download thread pool maintains open TCP (S3) connection for each SST, and then downloads next block (calls next_bytes(next_block_bytes) on stream) for that SST if previous one has been consumed by the compactor.

To optimize latency even further, we can have a buffer to fetch N up to blocks per SST, as we assume compaction speed >> download speed. Tradeoff is memory v.s. potential bubble waiting for next block which is not fully downloaded.

As an optimization, we have a high queue depth only for the SSTs in the concat iterator, but not in the independent SST iterators. We can have a flag or argument in iterator that indicates prefetch queue depth for that particular SST.


The total size of input would be close to output in most of case.

Yes, but size of output SST == size of each input SST. Total SST held in memory = 10 X input + 1 X output.

@Little-Wallace
Copy link
Contributor

The high memory usage of compactor seems to be caused by some bug. I think we shall fix bug before any optimize for read

@Little-Wallace
Copy link
Contributor

Little-Wallace commented Jul 23, 2022

Yes, but size of output SST == size of each input SST. Total SST held in memory = 10 X input + 1 X output.

No. We only merge overlapping L0 no more than 128MB. It means that most cases, the input data onlys uses several tens MB.

@jon-chuang
Copy link
Contributor

jon-chuang commented Jul 23, 2022

No. We only merge overlapping L0 no more than 128MB. It means that most cases, the input data onlys uses several tens MB.

Oh yes, I was mistaken, if k > 0 then its always 1 from Lk and 1 from Lk+1 needed at a time, not 10 from Lk. So yes, we wouldn't not see this factor 10 improvement I talked about.

@jon-chuang
Copy link
Contributor

jon-chuang commented Jul 23, 2022

The high memory usage of compactor seems to be caused by some bug.

Perhaps the issue is Arc<Block> in SSTable: https://github.com/singularity-data/risingwave/blob/main/src/storage/src/hummock/sstable/mod.rs

Another possibility: load_table returns CachableEntry, which has an arc reference to LRU https://github.com/singularity-data/risingwave/blob/main/src/common/src/cache.rs

Btw, why does meta cache in SSTableStore not cache meta but actual SSTable? https://github.com/singularity-data/risingwave/blob/6d3183927831ea9077499e01470e63f857337301/src/storage/src/hummock/sstable_store.rs#L48
I think this is the real issue. We are cacheing ALL sstables that are loaded! And after, we are not deleting the entries!

Also, load_table is a particularly confusing piece of code. I have no idea what it is doing! https://github.com/singularity-data/risingwave/blob/6d3183927831ea9077499e01470e63f857337301/src/storage/src/hummock/sstable_store.rs#L232

@jon-chuang
Copy link
Contributor

jon-chuang commented Jul 23, 2022

You can see from an old implementation of load_table https://github.com/singularity-data/risingwave/blob/fadff1446325761deb2f06c3e187777b93a8f07e/src/storage/src/hummock/sstable_store.rs#L290 that the current usage is an abuse of the original intent. meta_cache is only meant to store SSTables without data blocks. But here we are abusing it by storing data blocks. I believe this is the cause of high memory usage.


This change was introduced in this PR: https://github.com/singularity-data/risingwave/pull/3276/files#

The PR attempts to piggy back deduplication of data fetch with deduplication of meta fetch. To my mind, this does not seem like a best practice.

If the load_data parameter is not uniform, the requests cannot even be meaningfully deduplicated. I believe we should never insert SSTable with any blocks and add a sanity check which ensures as much.

If we want to dedup data block requests, I think it should proceed via a separate mechanism (i.e. via object store e.g. S3 Object Store implementation)


Also, I think it is a bad practice to have SSTable struct sometimes hold data, and sometimes not.

We should use separate structs or enums for each case.

@jon-chuang
Copy link
Contributor

jon-chuang commented Jul 23, 2022

To fix this, I believe we should post-fill a copy of the SSTable only after block-less one has been inserted into cache. We should also return an owned value of SSTable if possible.

Finally, we may want to consider reducing use of Arc, to reduce chance of memory leak (unrelated)

@wenym1
Copy link
Contributor

wenym1 commented Jul 24, 2022

Perhaps the issue is Arc in SSTable

The size of the blocks in SSTable is included when we calculate the cost of SSTable so the overall cache memory usage is not likely to be affected by including blocks in SStable, since when the LruCache reaches its capacity, it will evict some entries to ensure that its memory usage will not exceed the limit.

Another possibility: load_table returns CachableEntry, which has an arc reference to LRU

Holding an Arc of the LruCache is necessary since we have to sure that the LruCache is not freed while the CachableEntry is accessing the pointer of LruHandle.

Btw, why does meta cache in SSTableStore not cache meta but actual SSTable?

The reason why @Little-Wallace chose to store the whole prefetched SST in SSTable is that, he found a previous misuse of bytes::Bytes.

How we used to use bytes::Bytes is that we prefetched the whole SST and split the byte array into different owned block data Bytes, and add each block into the block cache. However, when we use the slice of Bytes, though it returns a new Bytes with static lifetime, it's not a memory copy, but instead a reference to the memory of the whole SST Bytes. Therefore, if we added such block data Bytess into the block cache, the real SST memory will be freed until all blocks of the SSTs is evicted, which will increase the real memory usage of block cache. Therefore, @Little-Wallace chose to add the whole prefetched SSTs in SSTable as a temporary solution so that at least the real memory usage can be tracked correctly. I think he is working some solutions to make cache logic clearer.

We are cacheing ALL sstables that are loaded! And after, we are not deleting the entries!

The SSTable will be evicted when the cache reaches its capacity, so as the real SST data it holds. So the entries will finally be deleted when SST is evicted.

@wenym1
Copy link
Contributor

wenym1 commented Jul 24, 2022

Finally, we may want to consider reducing use of Arc, to reduce chance of memory leak (unrelated)

Memory leak only happens for Arc when cyclic dependency appears. The current Arc dependency graph is quite clearer and is acyclic, so I don't think it's a good idea to reduce the current use of Arc.

@wenym1
Copy link
Contributor

wenym1 commented Jul 24, 2022

Our current implementation of LruCache is not a strict limitation on memory usage.

The externally referenced CacheEntry cannot be evicted. Therefore, when we insert a new entry and the cache is going to exceed the memory capacity, even after we have evicted all evict-able entry, the memory usage can still exceed the memory capacity. In this scenario, insertion to LruCache is still allowed instead of blocking the insertion or failing. The total memory usage will exceed the capacity for a while until some entries are not externally referenced and evicts its data in LruCache.

I am not sure whether this increases the real memory usage of LruCache. We may focus on the metrics of the LruCache usage of compactor to see whether the LruCache is using more memory than capacity.

@Little-Wallace
Copy link
Contributor

This bug is caused by the incorrectly charge for Sstable when it include data blocks. And I will fix it in #4034

@jon-chuang
Copy link
Contributor

at least the real memory usage can be tracked correctly.

Ok, but we should not pollute the meta cache to achieve this…?

@jon-chuang
Copy link
Contributor

jon-chuang commented Jul 25, 2022

Anw, meta_cache is not really that useful in the context of compactor. Since compactor and compute do not share same sstable_store, and compactor will in general fetch a single SST once and then never again, as a new SST has replaced it. For this reason, perhaps we can reduce the meta and block cache capacity significantly for compactor.

@Little-Wallace
Copy link
Contributor

Compactor only use meta-cache. Because compactor always need the whole file, rather than some blocks.

@jon-chuang
Copy link
Contributor

But do we even need a cache? We can just stream the blocks and have the stream do the prefetching. Cache seems like the wrong tool here.

@Little-Wallace
Copy link
Contributor

Little-Wallace commented Aug 11, 2022

The compactor would still cost a lot of memory than our statistics.

@Little-Wallace
Copy link
Contributor

I'm working on this issue by #4590
I will run some tests in next few days to see whether we can control the input memory.

@MrCroxx
Copy link
Contributor

MrCroxx commented Aug 12, 2022

I did some experiments and thought there may be memory leak in compactor. I'm currently investigating it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

9 participants