Skip to content

Commit

Permalink
Don't over-allocate in HeapBufferedAsyncEntityConsumer in order to co…
Browse files Browse the repository at this point in the history
…nsume the response

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
  • Loading branch information
reta committed Sep 12, 2023
1 parent 18ac060 commit c696634
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,25 +86,25 @@ protected void data(final ByteBuffer src, final boolean endOfStream) throws IOEx
return;
}

int len = src.limit();
if (len < 0) {
len = 4096;
}

ByteArrayBuffer buffer = bufferRef.get();
if (buffer == null) {
buffer = new ByteArrayBuffer(bufferLimitBytes);
buffer = new ByteArrayBuffer(len);
if (bufferRef.compareAndSet(null, buffer) == false) {
buffer = bufferRef.get();
}
}

int len = src.limit();
if (buffer.length() + len > bufferLimitBytes) {
throw new ContentTooLongException(
"entity content is too long [" + len + "] for the configured buffer limit [" + bufferLimitBytes + "]"
);
}

if (len < 0) {
len = 4096;
}

if (src.hasArray()) {
buffer.append(src.array(), src.arrayOffset() + src.position(), src.remaining());
} else {
Expand Down Expand Up @@ -136,4 +136,12 @@ public void releaseResources() {
buffer = null;
}
}

/**
* Gets current byte buffer instance
* @return byte buffer instance
*/
ByteArrayBuffer getBuffer() {
return bufferRef.get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client.nio;

import com.carrotsearch.randomizedtesting.RandomizedTest;

import org.apache.hc.core5.http.ContentTooLongException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.nio.ByteBuffer;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows;

public class HeapBufferedAsyncEntityConsumerTest extends RandomizedTest {
private static final int BUFFER_LIMIT = 100 * 1024 * 1024 /* 100Mb */;
private HeapBufferedAsyncEntityConsumer consumer;

@Before
public void setUp() {
consumer = new HeapBufferedAsyncEntityConsumer(BUFFER_LIMIT);
}

@After
public void tearDown() {
consumer.releaseResources();
}

@Test
public void consumerAllocatesBufferLimit() throws IOException {
consumer.consume(randomByteBufferOfLength(1000).flip());
assertThat(consumer.getBuffer().capacity(), equalTo(1000));
}

@Test
public void consumerAllocatesEmptyBuffer() throws IOException {
consumer.consume(ByteBuffer.allocate(0).flip());
assertThat(consumer.getBuffer().capacity(), equalTo(0));
}

@Test
public void consumerExpandsBufferLimits() throws IOException {
consumer.consume(randomByteBufferOfLength(1000).flip());
consumer.consume(randomByteBufferOfLength(2000).flip());
consumer.consume(randomByteBufferOfLength(3000).flip());
assertThat(consumer.getBuffer().capacity(), equalTo(6000));
}

@Test
public void consumerAllocateLimit() throws IOException {
consumer.consume(randomByteBufferOfLength(BUFFER_LIMIT).flip());
assertThat(consumer.getBuffer().capacity(), equalTo(BUFFER_LIMIT));
}

@Test
public void consumerFailsToOverAllocateLimit() throws IOException {
consumer.consume(randomByteBufferOfLength(BUFFER_LIMIT).flip());
assertThrows(ContentTooLongException.class, () -> consumer.consume(randomByteBufferOfLength(1).flip()));
}

private ByteBuffer randomByteBufferOfLength(int length) {
return ByteBuffer.allocate(length).put(randomBytesOfLength(length));
}
}

0 comments on commit c696634

Please sign in to comment.