Skip to content

Commit

Permalink
Implementation of ResponseWriter using pooled Netty buffers (#2805)
Browse files Browse the repository at this point in the history
* Initial prototype based on Netty buffers.

Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>

* Revised implementation of ByteBufDataChunk and ResponseWriter. All unit tests are passing.

Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>

* Removed test.

Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>

* Checkstyle problems.

Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>

* Ensure proper release of buffers.

Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>

* Renamed DataChunkImpl as we now have multiple implementations. Cleaned up release logic to avoid warnings in logs. Better implementation of isFlushChunk.

Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>

* Fixed checkstyle.

Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>

* Fixed checkstyle.

Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>

* Copyright in module-info.

Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>

* Lazy creation of DataChunkOutputStream publisher. Guard access to downstream using a semaphore and volatiles. Ensure onComplete is called at most once.

Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>

* Implement old method in new ByteBufDataChunk class for testing purposes.

Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>

* Use a static create() method instead of a public constructor.

Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>
  • Loading branch information
spericas authored and paulparkinson committed Mar 29, 2021
1 parent 0217664 commit 4932d0c
Show file tree
Hide file tree
Showing 7 changed files with 481 additions and 131 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
* Copyright (c) 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,9 +21,9 @@
import java.util.concurrent.CompletableFuture;

/**
* Default implementation of {@link DataChunk}.
* Implementation of {@link DataChunk} based on {@code java.nio.ByteBuffer}.
*/
final class DataChunkImpl implements DataChunk {
final class ByteBufferDataChunk implements DataChunk {

private final ByteBuffer[] byteBuffers;
private final boolean flush;
Expand All @@ -38,7 +38,7 @@ final class DataChunkImpl implements DataChunk {
* @param readOnly indicates underlying buffers are not reused
* @param byteBuffers the data for this chunk. Should not be reused until {@code releaseCallback} is used
*/
DataChunkImpl(boolean flush, boolean readOnly, ByteBuffer... byteBuffers) {
ByteBufferDataChunk(boolean flush, boolean readOnly, ByteBuffer... byteBuffers) {
this.flush = flush;
this.readOnly = readOnly;
this.releaseCallback = null;
Expand All @@ -52,7 +52,7 @@ final class DataChunkImpl implements DataChunk {
* @param releaseCallback a callback which is called when this chunk is completely processed and instance is free for reuse
* @param byteBuffers the data for this chunk. Should not be reused until {@code releaseCallback} is used
*/
DataChunkImpl(boolean flush, boolean readOnly, Runnable releaseCallback, ByteBuffer... byteBuffers) {
ByteBufferDataChunk(boolean flush, boolean readOnly, Runnable releaseCallback, ByteBuffer... byteBuffers) {
this.flush = flush;
this.readOnly = readOnly;
this.releaseCallback = Objects.requireNonNull(releaseCallback, "release callback is null");
Expand Down Expand Up @@ -81,10 +81,12 @@ public boolean isReadOnly() {

@Override
public void release() {
if (releaseCallback != null) {
releaseCallback.run();
if (!isReleased) {
if (releaseCallback != null) {
releaseCallback.run();
}
isReleased = true;
}
isReleased = true;
}

@Override
Expand Down
51 changes: 35 additions & 16 deletions common/http/src/main/java/io/helidon/common/http/DataChunk.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2020 Oracle and/or its affiliates.
* Copyright (c) 2018, 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -72,7 +72,7 @@ static DataChunk create(byte[] bytes) {
* @return a data chunk
*/
static DataChunk create(ByteBuffer... byteBuffers) {
return new DataChunkImpl(false, false, byteBuffers);
return new ByteBufferDataChunk(false, false, byteBuffers);
}

/**
Expand All @@ -83,7 +83,7 @@ static DataChunk create(ByteBuffer... byteBuffers) {
* @return a reusable data chunk with no release callback
*/
static DataChunk create(boolean flush, ByteBuffer... byteBuffers) {
return new DataChunkImpl(flush, false, byteBuffers);
return new ByteBufferDataChunk(flush, false, byteBuffers);
}

/**
Expand All @@ -95,7 +95,7 @@ static DataChunk create(boolean flush, ByteBuffer... byteBuffers) {
* @return a reusable data chunk with no release callback
*/
static DataChunk create(boolean flush, boolean readOnly, ByteBuffer... byteBuffers) {
return new DataChunkImpl(flush, readOnly, byteBuffers);
return new ByteBufferDataChunk(flush, readOnly, byteBuffers);
}

/**
Expand All @@ -107,7 +107,7 @@ static DataChunk create(boolean flush, boolean readOnly, ByteBuffer... byteBuffe
* @return a reusable data chunk with a release callback
*/
static DataChunk create(boolean flush, Runnable releaseCallback, ByteBuffer... byteBuffers) {
return new DataChunkImpl(flush, false, releaseCallback, byteBuffers);
return new ByteBufferDataChunk(flush, false, releaseCallback, byteBuffers);
}

/**
Expand All @@ -120,7 +120,7 @@ static DataChunk create(boolean flush, Runnable releaseCallback, ByteBuffer... b
* @return a reusable data chunk with a release callback
*/
static DataChunk create(boolean flush, boolean readOnly, Runnable releaseCallback, ByteBuffer... byteBuffers) {
return new DataChunkImpl(flush, readOnly, releaseCallback, byteBuffers);
return new ByteBufferDataChunk(flush, readOnly, releaseCallback, byteBuffers);
}

/**
Expand All @@ -141,6 +141,32 @@ static DataChunk create(boolean flush, boolean readOnly, Runnable releaseCallbac
*/
ByteBuffer[] data();

/**
* Returns a representation of this chunk as an array of T's.
*
* @param clazz class of return type
* @param <T> the buffer type
* @return an array of T's
*/
@SuppressWarnings("unchecked")
default <T> T[] data(Class<T> clazz) {
if (ByteBuffer.class.isAssignableFrom(clazz)) {
return (T[]) data();
}
throw new UnsupportedOperationException("Unsupported operation for class " + clazz);
}

/**
* Checks if this instance is backed by buffers of a certain kind.
*
* @param clazz a buffer class instance
* @param <T> the buffer type
* @return outcome of test
*/
default <T> boolean isBackedBy(Class<T> clazz) {
return ByteBuffer.class.isAssignableFrom(clazz);
}

/**
* Returns the sum of elements between the current position and the limit of each of the underlying ByteBuffer.
*
Expand Down Expand Up @@ -287,20 +313,13 @@ default boolean isReadOnly() {

/**
* An empty data chunk with a flush flag can be used to force a connection
* flush. This method determines if this chunk is used for that purpose.
* flush without actually writing any bytes. This method determines if
* this chunk is used for that purpose.
*
* @return Outcome of test.
*/
default boolean isFlushChunk() {
if (!flush()) {
return false;
}
for (ByteBuffer byteBuffer : data()) {
if (byteBuffer.limit() != 0) {
return false;
}
}
return true;
return flush() && remaining() == 0;
}

/**
Expand Down
5 changes: 5 additions & 0 deletions webserver/jersey/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@
<groupId>io.helidon.jersey</groupId>
<artifactId>helidon-jersey-server</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</dependency>
<dependency>
<groupId>io.helidon.webserver</groupId>
<artifactId>helidon-webserver-test-support</artifactId>
Expand Down Expand Up @@ -94,6 +98,7 @@
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Loading

0 comments on commit 4932d0c

Please sign in to comment.