diff --git a/CHANGELOG.md b/CHANGELOG.md index d3e6de03709f4..a00bc6447a534 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -71,6 +71,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Fixed - Fix 'org.apache.hc.core5.http.ParseException: Invalid protocol version' under JDK 16+ ([#4827](https://github.com/opensearch-project/OpenSearch/pull/4827)) - Fix compression support for h2c protocol ([#4944](https://github.com/opensearch-project/OpenSearch/pull/4944)) +- Don't over-allocate in HeapBufferedAsyncEntityConsumer in order to consume the response ([#9993](https://github.com/opensearch-project/OpenSearch/pull/9993)) ### Security diff --git a/client/rest/src/main/java/org/opensearch/client/nio/HeapBufferedAsyncEntityConsumer.java b/client/rest/src/main/java/org/opensearch/client/nio/HeapBufferedAsyncEntityConsumer.java index 9bd17d1c24c7e..ae38c1a0308d1 100644 --- a/client/rest/src/main/java/org/opensearch/client/nio/HeapBufferedAsyncEntityConsumer.java +++ b/client/rest/src/main/java/org/opensearch/client/nio/HeapBufferedAsyncEntityConsumer.java @@ -86,25 +86,29 @@ protected void data(final ByteBuffer src, final boolean endOfStream) throws IOEx return; } + int len = src.limit(); + if (len < 0) { + len = 4096; + } else if (len > bufferLimitBytes) { + throw new ContentTooLongException( + "entity content is too long [" + len + "] for the configured buffer limit [" + bufferLimitBytes + "]" + ); + } + ByteArrayBuffer buffer = bufferRef.get(); if (buffer == null) { - buffer = new ByteArrayBuffer(bufferLimitBytes); + buffer = new ByteArrayBuffer(len); if (bufferRef.compareAndSet(null, buffer) == false) { buffer = bufferRef.get(); } } - int len = src.limit(); if (buffer.length() + len > bufferLimitBytes) { throw new ContentTooLongException( "entity content is too long [" + len + "] for the configured buffer limit [" + bufferLimitBytes + "]" ); } - if (len < 0) { - len = 4096; - } - if (src.hasArray()) { buffer.append(src.array(), src.arrayOffset() + src.position(), src.remaining()); } else { @@ -136,4 +140,12 @@ public void releaseResources() { buffer = null; } } + + /** + * Gets current byte buffer instance + * @return byte buffer instance + */ + ByteArrayBuffer getBuffer() { + return bufferRef.get(); + } } diff --git a/client/rest/src/test/java/org/opensearch/client/nio/HeapBufferedAsyncEntityConsumerTests.java b/client/rest/src/test/java/org/opensearch/client/nio/HeapBufferedAsyncEntityConsumerTests.java new file mode 100644 index 0000000000000..6a4b176edd011 --- /dev/null +++ b/client/rest/src/test/java/org/opensearch/client/nio/HeapBufferedAsyncEntityConsumerTests.java @@ -0,0 +1,71 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.nio; + +import org.apache.hc.core5.http.ContentTooLongException; +import org.opensearch.client.RestClientTestCase; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertThrows; + +public class HeapBufferedAsyncEntityConsumerTests extends RestClientTestCase { + private static final int BUFFER_LIMIT = 100 * 1024 * 1024 /* 100Mb */; + private HeapBufferedAsyncEntityConsumer consumer; + + @Before + public void setUp() { + consumer = new HeapBufferedAsyncEntityConsumer(BUFFER_LIMIT); + } + + @After + public void tearDown() { + consumer.releaseResources(); + } + + public void testConsumerAllocatesBufferLimit() throws IOException { + consumer.consume(randomByteBufferOfLength(1000).flip()); + assertThat(consumer.getBuffer().capacity(), equalTo(1000)); + } + + public void testConsumerAllocatesEmptyBuffer() throws IOException { + consumer.consume(ByteBuffer.allocate(0).flip()); + assertThat(consumer.getBuffer().capacity(), equalTo(0)); + } + + public void testConsumerExpandsBufferLimits() throws IOException { + consumer.consume(randomByteBufferOfLength(1000).flip()); + consumer.consume(randomByteBufferOfLength(2000).flip()); + consumer.consume(randomByteBufferOfLength(3000).flip()); + assertThat(consumer.getBuffer().capacity(), equalTo(6000)); + } + + public void testConsumerAllocatesLimit() throws IOException { + consumer.consume(randomByteBufferOfLength(BUFFER_LIMIT).flip()); + assertThat(consumer.getBuffer().capacity(), equalTo(BUFFER_LIMIT)); + } + + public void testConsumerFailsToAllocateOverLimit() throws IOException { + assertThrows(ContentTooLongException.class, () -> consumer.consume(randomByteBufferOfLength(BUFFER_LIMIT + 1).flip())); + } + + public void testConsumerFailsToExpandOverLimit() throws IOException { + consumer.consume(randomByteBufferOfLength(BUFFER_LIMIT).flip()); + assertThrows(ContentTooLongException.class, () -> consumer.consume(randomByteBufferOfLength(1).flip())); + } + + private static ByteBuffer randomByteBufferOfLength(int length) { + return ByteBuffer.allocate(length).put(randomBytesOfLength(length)); + } +}