diff --git a/webserver/tyrus/src/main/java/io/helidon/webserver/tyrus/TyrusReaderSubscriber.java b/webserver/tyrus/src/main/java/io/helidon/webserver/tyrus/TyrusReaderSubscriber.java index b4878648334..5a06f0f07dd 100644 --- a/webserver/tyrus/src/main/java/io/helidon/webserver/tyrus/TyrusReaderSubscriber.java +++ b/webserver/tyrus/src/main/java/io/helidon/webserver/tyrus/TyrusReaderSubscriber.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020 Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2021 Oracle and/or its affiliates. All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,6 @@ import java.nio.ByteBuffer; import java.util.concurrent.ExecutorService; import java.util.concurrent.Flow; -import java.util.logging.Logger; import javax.websocket.CloseReason; @@ -34,7 +33,6 @@ * Class TyrusReaderSubscriber. */ public class TyrusReaderSubscriber implements Flow.Subscriber { - private static final Logger LOGGER = Logger.getLogger(TyrusSupport.class.getName()); private static final int MAX_RETRIES = 5; private static final CloseReason CONNECTION_CLOSED = new CloseReason(NORMAL_CLOSURE, "Connection closed"); @@ -63,34 +61,55 @@ public void onSubscribe(Flow.Subscription subscription) { @Override public void onNext(DataChunk item) { - if (executorService == null) { + if (subscription != null) { + if (executorService == null) { + submitDataChunk(item); + } else { + executorService.submit(() -> submitDataChunk(item)); + } + } else { + item.release(); + } + } + + /** + * Submits all data in a chunk and requests one more if successful. + * + * @param item a data chunk + */ + private void submitDataChunk(DataChunk item) { + try { for (ByteBuffer byteBuffer : item.data()) { submitBuffer(byteBuffer); } - } else { - executorService.submit(() -> { - for (ByteBuffer byteBuffer : item.data()) { - submitBuffer(byteBuffer); - } - }); + } finally { + item.release(); + } + if (subscription != null) { + subscription.request(1L); } } /** - * Submits data buffer to Tyrus. Retries a few times to make sure the entire buffer - * is consumed or logs an error. + * Submits single buffer to Tyrus. Retries a few times to make sure the entire buffer + * is consumed. * * @param data Data buffer. */ private void submitBuffer(ByteBuffer data) { + // Pass all data to Tyrus spi int retries = MAX_RETRIES; while (data.remaining() > 0 && retries-- > 0) { connection.getReadHandler().handle(data); } + + // If we can't push all data to Tyrus, cancel and report problem if (retries == 0) { - LOGGER.warning("Tyrus did not consume all data buffer after " + MAX_RETRIES + " retries"); + subscription.cancel(); + subscription = null; + connection.close(new CloseReason(UNEXPECTED_CONDITION, "Tyrus did not " + + "consume all data after " + MAX_RETRIES + " retries")); } - subscription.request(1L); } @Override