Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of ResponseWriter using pooled Netty buffers #2805

Merged
merged 14 commits into from
Mar 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates.
* Copyright (c) 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,9 +21,9 @@
import java.util.concurrent.CompletableFuture;

/**
* Default implementation of {@link DataChunk}.
* Implementation of {@link DataChunk} based on {@code java.nio.ByteBuffer}.
*/
final class DataChunkImpl implements DataChunk {
final class ByteBufferDataChunk implements DataChunk {

private final ByteBuffer[] byteBuffers;
private final boolean flush;
Expand All @@ -38,7 +38,7 @@ final class DataChunkImpl implements DataChunk {
* @param readOnly indicates underlying buffers are not reused
* @param byteBuffers the data for this chunk. Should not be reused until {@code releaseCallback} is used
*/
DataChunkImpl(boolean flush, boolean readOnly, ByteBuffer... byteBuffers) {
ByteBufferDataChunk(boolean flush, boolean readOnly, ByteBuffer... byteBuffers) {
this.flush = flush;
this.readOnly = readOnly;
this.releaseCallback = null;
Expand All @@ -52,7 +52,7 @@ final class DataChunkImpl implements DataChunk {
* @param releaseCallback a callback which is called when this chunk is completely processed and instance is free for reuse
* @param byteBuffers the data for this chunk. Should not be reused until {@code releaseCallback} is used
*/
DataChunkImpl(boolean flush, boolean readOnly, Runnable releaseCallback, ByteBuffer... byteBuffers) {
ByteBufferDataChunk(boolean flush, boolean readOnly, Runnable releaseCallback, ByteBuffer... byteBuffers) {
this.flush = flush;
this.readOnly = readOnly;
this.releaseCallback = Objects.requireNonNull(releaseCallback, "release callback is null");
Expand Down Expand Up @@ -81,10 +81,12 @@ public boolean isReadOnly() {

@Override
public void release() {
if (releaseCallback != null) {
releaseCallback.run();
if (!isReleased) {
if (releaseCallback != null) {
releaseCallback.run();
}
isReleased = true;
}
isReleased = true;
}

@Override
Expand Down
51 changes: 35 additions & 16 deletions common/http/src/main/java/io/helidon/common/http/DataChunk.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2020 Oracle and/or its affiliates.
* Copyright (c) 2018, 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -72,7 +72,7 @@ static DataChunk create(byte[] bytes) {
* @return a data chunk
*/
static DataChunk create(ByteBuffer... byteBuffers) {
return new DataChunkImpl(false, false, byteBuffers);
return new ByteBufferDataChunk(false, false, byteBuffers);
}

/**
Expand All @@ -83,7 +83,7 @@ static DataChunk create(ByteBuffer... byteBuffers) {
* @return a reusable data chunk with no release callback
*/
static DataChunk create(boolean flush, ByteBuffer... byteBuffers) {
return new DataChunkImpl(flush, false, byteBuffers);
return new ByteBufferDataChunk(flush, false, byteBuffers);
}

/**
Expand All @@ -95,7 +95,7 @@ static DataChunk create(boolean flush, ByteBuffer... byteBuffers) {
* @return a reusable data chunk with no release callback
*/
static DataChunk create(boolean flush, boolean readOnly, ByteBuffer... byteBuffers) {
return new DataChunkImpl(flush, readOnly, byteBuffers);
return new ByteBufferDataChunk(flush, readOnly, byteBuffers);
}

/**
Expand All @@ -107,7 +107,7 @@ static DataChunk create(boolean flush, boolean readOnly, ByteBuffer... byteBuffe
* @return a reusable data chunk with a release callback
*/
static DataChunk create(boolean flush, Runnable releaseCallback, ByteBuffer... byteBuffers) {
return new DataChunkImpl(flush, false, releaseCallback, byteBuffers);
return new ByteBufferDataChunk(flush, false, releaseCallback, byteBuffers);
}

/**
Expand All @@ -120,7 +120,7 @@ static DataChunk create(boolean flush, Runnable releaseCallback, ByteBuffer... b
* @return a reusable data chunk with a release callback
*/
static DataChunk create(boolean flush, boolean readOnly, Runnable releaseCallback, ByteBuffer... byteBuffers) {
return new DataChunkImpl(flush, readOnly, releaseCallback, byteBuffers);
return new ByteBufferDataChunk(flush, readOnly, releaseCallback, byteBuffers);
}

/**
Expand All @@ -141,6 +141,32 @@ static DataChunk create(boolean flush, boolean readOnly, Runnable releaseCallbac
*/
ByteBuffer[] data();

/**
* Returns a representation of this chunk as an array of T's.
*
* @param clazz class of return type
* @param <T> the buffer type
* @return an array of T's
*/
@SuppressWarnings("unchecked")
default <T> T[] data(Class<T> clazz) {
if (ByteBuffer.class.isAssignableFrom(clazz)) {
return (T[]) data();
}
throw new UnsupportedOperationException("Unsupported operation for class " + clazz);
}

/**
* Checks if this instance is backed by buffers of a certain kind.
*
* @param clazz a buffer class instance
* @param <T> the buffer type
* @return outcome of test
*/
default <T> boolean isBackedBy(Class<T> clazz) {
return ByteBuffer.class.isAssignableFrom(clazz);
}

/**
* Returns the sum of elements between the current position and the limit of each of the underlying ByteBuffer.
*
Expand Down Expand Up @@ -287,20 +313,13 @@ default boolean isReadOnly() {

/**
* An empty data chunk with a flush flag can be used to force a connection
* flush. This method determines if this chunk is used for that purpose.
* flush without actually writing any bytes. This method determines if
* this chunk is used for that purpose.
*
* @return Outcome of test.
*/
default boolean isFlushChunk() {
if (!flush()) {
return false;
}
for (ByteBuffer byteBuffer : data()) {
if (byteBuffer.limit() != 0) {
return false;
}
}
return true;
return flush() && remaining() == 0;
}

/**
Expand Down
5 changes: 5 additions & 0 deletions webserver/jersey/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@
<groupId>io.helidon.jersey</groupId>
<artifactId>helidon-jersey-server</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</dependency>
<dependency>
<groupId>io.helidon.webserver</groupId>
<artifactId>helidon-webserver-test-support</artifactId>
Expand Down Expand Up @@ -94,6 +98,7 @@
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Loading