Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close Zstd Dictionary after execution to avoid any memory leak. #9403

Merged
merged 3 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Add Segment download stats to remotestore stats API ([#8718](https://github.com/opensearch-project/OpenSearch/pull/8718))
- [Remote Store] Add remote segment transfer stats on NodesStats API ([#9168](https://github.com/opensearch-project/OpenSearch/pull/9168))
- Return 409 Conflict HTTP status instead of 503 on failure to concurrently execute snapshots ([#8986](https://github.com/opensearch-project/OpenSearch/pull/5855))
- Fix memory leak when using Zstd Dictionary ([#9403](https://github.com/opensearch-project/OpenSearch/pull/9403))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mgodwan This needs to go in the [Unreleased 2.x] section. I'll submit a quick PR to fix this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @andrross for fixing this.


### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,13 @@

// dictionary compression first
doCompress(bytes, offset, dictLength, cctx, out);
cctx.loadDict(new ZstdDictCompress(bytes, offset, dictLength, compressionLevel));
try (ZstdDictCompress dictCompress = new ZstdDictCompress(bytes, offset, dictLength, compressionLevel)) {
cctx.loadDict(dictCompress);

for (int start = offset + dictLength; start < end; start += blockLength) {
int l = Math.min(blockLength, end - start);
doCompress(bytes, start, l, cctx, out);
for (int start = offset + dictLength; start < end; start += blockLength) {
int l = Math.min(blockLength, end - start);
doCompress(bytes, start, l, cctx, out);
}
}
}
}
Expand Down Expand Up @@ -170,32 +172,33 @@

// decompress dictionary first
doDecompress(in, dctx, bytes, dictLength);

dctx.loadDict(new ZstdDictDecompress(bytes.bytes, 0, dictLength));

int offsetInBlock = dictLength;
int offsetInBytesRef = offset;

// Skip unneeded blocks
while (offsetInBlock + blockLength < offset) {
final int compressedLength = in.readVInt();
in.skipBytes(compressedLength);
offsetInBlock += blockLength;
offsetInBytesRef -= blockLength;
try (ZstdDictDecompress dictDecompress = new ZstdDictDecompress(bytes.bytes, 0, dictLength)) {
Copy link
Collaborator

@nknize nknize Aug 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should use the other ctor? And do this with direct memory instead of using heap?

try (ZstdDictDecompress dictDecompress = new ZstdDictDecompress(ByteBuffer.wrap(bytes.bytes, 0, dictLength))) {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The leak is within the native memory and didn't show any issues within the JVM heap. The allocations in native side of code are not freed if this was not closed, and hence lead to the leak.

dctx.loadDict(dictDecompress);

int offsetInBlock = dictLength;
int offsetInBytesRef = offset;

// Skip unneeded blocks
while (offsetInBlock + blockLength < offset) {
final int compressedLength = in.readVInt();
in.skipBytes(compressedLength);
offsetInBlock += blockLength;
offsetInBytesRef -= blockLength;
}

Check warning on line 187 in server/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCompressionMode.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCompressionMode.java#L183-L187

Added lines #L183 - L187 were not covered by tests

// Read blocks that intersect with the interval we need
while (offsetInBlock < offset + length) {
bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + blockLength);
int l = Math.min(blockLength, originalLength - offsetInBlock);
doDecompress(in, dctx, bytes, l);
offsetInBlock += blockLength;
}

bytes.offset = offsetInBytesRef;
bytes.length = length;

assert bytes.isValid() : "decompression output is corrupted";
}

// Read blocks that intersect with the interval we need
while (offsetInBlock < offset + length) {
bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + blockLength);
int l = Math.min(blockLength, originalLength - offsetInBlock);
doDecompress(in, dctx, bytes, l);
offsetInBlock += blockLength;
}

bytes.offset = offsetInBytesRef;
bytes.length = length;

assert bytes.isValid() : "decompression output is corrupted";
}
}

Expand Down
Loading