Skip to content

Commit

Permalink
Properly release underlying buffer before passing it to WebSocket han…
Browse files Browse the repository at this point in the history
…dler (#2715)

* Properly release underlying buffer before passing it to handler.

* Releases data chunks after passing them to Tyrus without any copying. Reports an error and closes connection if Tyrus is unable to handle the data. Finally, fixed a problem related to subscription requests.

Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>

* Removed unused logger.

Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>

* Fixed checkstyle.

Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>
  • Loading branch information
spericas authored Feb 2, 2021
1 parent 0f52c76 commit b4c1161
Showing 1 changed file with 33 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2021 Oracle and/or its affiliates. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,7 +19,6 @@
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Flow;
import java.util.logging.Logger;

import javax.websocket.CloseReason;

Expand All @@ -34,7 +33,6 @@
* Class TyrusReaderSubscriber.
*/
public class TyrusReaderSubscriber implements Flow.Subscriber<DataChunk> {
private static final Logger LOGGER = Logger.getLogger(TyrusSupport.class.getName());

private static final int MAX_RETRIES = 5;
private static final CloseReason CONNECTION_CLOSED = new CloseReason(NORMAL_CLOSURE, "Connection closed");
Expand Down Expand Up @@ -63,34 +61,55 @@ public void onSubscribe(Flow.Subscription subscription) {

@Override
public void onNext(DataChunk item) {
if (executorService == null) {
if (subscription != null) {
if (executorService == null) {
submitDataChunk(item);
} else {
executorService.submit(() -> submitDataChunk(item));
}
} else {
item.release();
}
}

/**
* Submits all data in a chunk and requests one more if successful.
*
* @param item a data chunk
*/
private void submitDataChunk(DataChunk item) {
try {
for (ByteBuffer byteBuffer : item.data()) {
submitBuffer(byteBuffer);
}
} else {
executorService.submit(() -> {
for (ByteBuffer byteBuffer : item.data()) {
submitBuffer(byteBuffer);
}
});
} finally {
item.release();
}
if (subscription != null) {
subscription.request(1L);
}
}

/**
* Submits data buffer to Tyrus. Retries a few times to make sure the entire buffer
* is consumed or logs an error.
* Submits single buffer to Tyrus. Retries a few times to make sure the entire buffer
* is consumed.
*
* @param data Data buffer.
*/
private void submitBuffer(ByteBuffer data) {
// Pass all data to Tyrus spi
int retries = MAX_RETRIES;
while (data.remaining() > 0 && retries-- > 0) {
connection.getReadHandler().handle(data);
}

// If we can't push all data to Tyrus, cancel and report problem
if (retries == 0) {
LOGGER.warning("Tyrus did not consume all data buffer after " + MAX_RETRIES + " retries");
subscription.cancel();
subscription = null;
connection.close(new CloseReason(UNEXPECTED_CONDITION, "Tyrus did not "
+ "consume all data after " + MAX_RETRIES + " retries"));
}
subscription.request(1L);
}

@Override
Expand Down

0 comments on commit b4c1161

Please sign in to comment.