From d542b5b2099e4e768abbda4ded5a2eaa29880010 Mon Sep 17 00:00:00 2001 From: Tomas Langer Date: Fri, 21 Jan 2022 11:40:14 +0100 Subject: [PATCH 1/8] Fix wrong connection close Signed-off-by: Tomas Langer --- .../helidon/webserver/BareResponseImpl.java | 50 +++---- .../helidon/webserver/ForwardingHandler.java | 45 ++++--- .../io/helidon/webserver/RequestContext.java | 15 ++- .../helidon/webserver/HttpPipelineTest.java | 15 ++- .../io/helidon/webserver/KeepAliveTest.java | 125 ++++++++++++++++++ .../java/io/helidon/webserver/PlainTest.java | 18 +-- .../webserver/ReqEntityAnalyzedTest.java | 7 +- .../webserver/utils/SocketHttpClient.java | 3 +- 8 files changed, 208 insertions(+), 70 deletions(-) create mode 100644 webserver/webserver/src/test/java/io/helidon/webserver/KeepAliveTest.java diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/BareResponseImpl.java b/webserver/webserver/src/main/java/io/helidon/webserver/BareResponseImpl.java index 5afe9af548c..d4915978181 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/BareResponseImpl.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/BareResponseImpl.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2021 Oracle and/or its affiliates. + * Copyright (c) 2017, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -56,7 +56,6 @@ * The BareResponseImpl. */ class BareResponseImpl implements BareResponse { - private static final Logger LOGGER = Logger.getLogger(BareResponseImpl.class.getName()); // See HttpConversionUtil.ExtensionHeaderNames @@ -195,23 +194,24 @@ public void writeStatusAndHeaders(Http.ResponseStatus status, Map log("Request content not fully read with keep-alive: true", channel)); - if (!requestContext.hasRequests() || requestContext.requestCancelled()) { - requestEntityAnalyzed = requestEntityAnalyzed.thenApply(listener -> { - if (listener.equals(ChannelFutureListener.CLOSE)) { - response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); - } else if (!headers.containsKey(HttpHeaderNames.CONNECTION.toString())) { - response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); - } - return listener; - }); - //We are not sure which Connection header value should be set. - //If unhandled entity is only one content large, we can keep the keep-alive - channel.read(); - } else { + + // no requests, nothing emitted or request cancelled (this is fine, we ignore entity and close connection) + boolean entityRequested = true; + if ((!requestContext.hasRequests() && !requestContext.hasEmitted()) + || requestContext.requestCancelled()) { + if (!isWebSocketUpgrade) { + entityRequested = false; + } + } + if (entityRequested) { + HttpRequest request = requestContext.request(); + LOGGER.warning("Entity was requested and not fully consumed before a response is sent. " + + "This is not supported. Connection will be closed. Please fix your route for " + + request.method() + " " + request.uri()); + } + if (!isWebSocketUpgrade) { response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); requestEntityAnalyzed.complete(ChannelFutureListener.CLOSE); - throw new IllegalStateException("Cannot request entity and send response without " - + "waiting for it to be handled"); } } else if (!headers.containsKey(HttpHeaderNames.CONNECTION.toString())) { response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); @@ -377,21 +377,7 @@ public void onSubscribe(Flow.Subscription subscription) { return; } this.subscription = Objects.requireNonNull(subscription, "subscription is null"); - - // TyrusSupport controls order of writes manually - if (isWebSocketUpgrade) { - subscription.request(1); - } else { - // Callback deferring first request for data after: - // - Request stream has been completed - requestEntityAnalyzed.whenComplete((channelFutureListener, throwable) -> { - subscription.request(1); - }); - if (keepAlive) { - //Auxiliary read, does nothing in case of pending read - channel.read(); - } - } + subscription.request(1); } @Override diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/ForwardingHandler.java b/webserver/webserver/src/main/java/io/helidon/webserver/ForwardingHandler.java index 84a7eb37ba7..b99dc03875e 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/ForwardingHandler.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/ForwardingHandler.java @@ -98,7 +98,6 @@ public class ForwardingHandler extends SimpleChannelInboundHandler { private CompletableFuture requestEntityAnalyzed; private CompletableFuture prevRequestFuture; private boolean lastContent; - private boolean hadContentAlready; ForwardingHandler(Routing routing, NettyWebServer webServer, @@ -120,7 +119,6 @@ public class ForwardingHandler extends SimpleChannelInboundHandler { private void reset() { lastContent = false; - hadContentAlready = false; isWebSocketUpgrade = false; actualPayloadSize = 0L; ignorePayload = false; @@ -209,6 +207,7 @@ private void channelReadHttpContent(ChannelHandlerContext ctx, Object msg) { lastContent = false; HttpContent httpContent = (HttpContent) msg; + boolean hasRequests = requestContext.hasRequests(); ByteBuf content = httpContent.content(); if (content.isReadable()) { @@ -263,19 +262,6 @@ private void channelReadHttpContent(ChannelHandlerContext ctx, Object msg) { // this is here to handle the case when the content is not readable but we didn't // exceptionally complete the publisher and close the connection throw new IllegalStateException("It is not expected to not have readable content."); - } else if (!requestContext.hasRequests() - && HttpUtil.isKeepAlive(requestContext.request()) - && !requestEntityAnalyzed.isDone()) { - if (hadContentAlready) { - LOGGER.finest(() -> "More than one unhandled content present. Closing the connection."); - requestEntityAnalyzed.complete(ChannelFutureListener.CLOSE); - } else { - //We are checking the unhandled entity, but we cannot be sure if connection should be closed or not. - //Next content has to be checked if it is last chunk. If not close connection. - hadContentAlready = true; - LOGGER.finest(() -> "Requesting the next chunk to determine if the connection should be closed."); - ctx.channel().read(); - } } } @@ -290,7 +276,6 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { @SuppressWarnings("checkstyle:methodlength") private boolean channelReadHttpRequest(ChannelHandlerContext ctx, Context requestScope, Object msg) { - hadContentAlready = false; if (LOGGER.isLoggable(Level.FINE)) { LOGGER.fine(log("Received HttpRequest: %s. Remote address: %s. Scope id: %s", ctx, @@ -353,7 +338,7 @@ private boolean channelReadHttpRequest(ChannelHandlerContext ctx, Context reques } if (publisher.hasRequests()) { - LOGGER.finest(() -> log("Requesting next chunks from Netty", ctx)); + LOGGER.finest(() -> log("Requesting next (%d, %d) chunks from Netty", ctx, n, demand)); ctx.channel().read(); } else { LOGGER.finest(() -> log("No hook action required", ctx)); @@ -366,7 +351,12 @@ private boolean channelReadHttpRequest(ChannelHandlerContext ctx, Context reques // If a problem with the request URI, return 400 response BareRequestImpl bareRequest; try { - bareRequest = new BareRequestImpl((HttpRequest) msg, publisher, webServer, ctx, sslEngine, requestId); + bareRequest = new BareRequestImpl((HttpRequest) msg, + requestContextRef.publisher(), + webServer, + ctx, + sslEngine, + requestId); } catch (IllegalArgumentException e) { send400BadRequest(ctx, request, e); return true; @@ -376,9 +366,19 @@ private boolean channelReadHttpRequest(ChannelHandlerContext ctx, Context reques LOGGER.finest(log("Request id: %s", ctx, bareRequest.requestId())); } + String contentLength = request.headers().get(HttpHeaderNames.CONTENT_LENGTH); + + if ("0".equals(contentLength) + || (contentLength == null + && !"upgrade".equalsIgnoreCase(request.headers().get(HttpHeaderNames.CONNECTION)) + && !"chunked".equalsIgnoreCase(request.headers().get(HttpHeaderNames.TRANSFER_ENCODING)) + && !"multipart/byteranges".equalsIgnoreCase(request.headers().get(HttpHeaderNames.CONTENT_TYPE)))) { + // no entity + requestContextRef.complete(); + } + // If context length is greater than maximum allowed, return 413 response if (maxPayloadSize >= 0) { - String contentLength = request.headers().get(Http.Header.CONTENT_LENGTH); if (contentLength != null) { try { long value = Long.parseLong(contentLength); @@ -536,7 +536,7 @@ private void send100Continue(ChannelHandlerContext ctx, ""); FullHttpResponse response = toNettyResponse(transportResponse); - ctx.write(response); + ctx.writeAndFlush(response); } /** @@ -555,6 +555,8 @@ private void send400BadRequest(ChannelHandlerContext ctx, HttpRequest request, T t); FullHttpResponse response = toNettyResponse(handlerResponse); + // 400 -> close connection + response.headers().add(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); ctx.writeAndFlush(response) .addListener(future -> ctx.close()); @@ -575,6 +577,8 @@ private void send413PayloadTooLarge(ChannelHandlerContext ctx, HttpRequest reque ""); FullHttpResponse response = toNettyResponse(transportResponse); + // too big entity -> close connection + response.headers().add(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); ctx.writeAndFlush(response) .addListener(future -> ctx.close()); @@ -596,7 +600,6 @@ private FullHttpResponse toNettyResponse(TransportResponse handlerResponse) { HttpHeaders nettyHeaders = response.headers(); headers.forEach(nettyHeaders::add); - nettyHeaders.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); return response; } diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/RequestContext.java b/webserver/webserver/src/main/java/io/helidon/webserver/RequestContext.java index 77b90679895..48bbaafccbd 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/RequestContext.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/RequestContext.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2021 Oracle and/or its affiliates. + * Copyright (c) 2017, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,10 +16,13 @@ package io.helidon.webserver; +import java.util.concurrent.Flow; import java.util.logging.Logger; import io.helidon.common.context.Context; import io.helidon.common.context.Contexts; +import io.helidon.common.http.DataChunk; +import io.helidon.common.reactive.Multi; import io.netty.buffer.ByteBuf; import io.netty.handler.codec.http.HttpRequest; @@ -35,6 +38,7 @@ class RequestContext { private final HttpRequest request; private final Context scope; private volatile boolean responseCompleted; + private volatile boolean emitted; RequestContext(HttpRequestScopedPublisher publisher, HttpRequest request, Context scope) { this.publisher = publisher; @@ -42,6 +46,11 @@ class RequestContext { this.scope = scope; } + Flow.Publisher publisher() { + return Multi.create(publisher) + .peek(something -> emitted = true); + } + HttpRequest request() { return request; } @@ -78,6 +87,10 @@ boolean hasRequests() { return publisher.hasRequests(); } + boolean hasEmitted() { + return emitted; + } + /** * Is request content cancelled. * diff --git a/webserver/webserver/src/test/java/io/helidon/webserver/HttpPipelineTest.java b/webserver/webserver/src/test/java/io/helidon/webserver/HttpPipelineTest.java index 15ed63c484d..14eecbb2216 100644 --- a/webserver/webserver/src/test/java/io/helidon/webserver/HttpPipelineTest.java +++ b/webserver/webserver/src/test/java/io/helidon/webserver/HttpPipelineTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, 2021 Oracle and/or its affiliates. + * Copyright (c) 2020, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,9 +29,9 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; /** * Test support for HTTP 1.1 pipelining. @@ -67,7 +67,12 @@ private static void startServer(int port) throws Exception { .put("/", (req, res) -> { counter.set(0); log("put server"); - res.send(); + req.content() + .as(String.class) + .forSingle(it -> { + log("put: " + it); + res.send(); + }); }) .get("/", (req, res) -> { log("get server"); @@ -99,8 +104,8 @@ private static void startServer(int port) throws Exception { public void testPipelining() throws Exception { try (SocketHttpClient s = new SocketHttpClient(webServer)) { s.request(Http.Method.PUT, "/"); // reset server - s.request(Http.Method.GET, "/"); // request_0 - s.request(Http.Method.GET, "/"); // request_1 + s.request(Http.Method.GET, null); // request_0 + s.request(Http.Method.GET, null); // request_1 log("put client"); String reset = s.receive(); assertThat(reset, notNullValue()); diff --git a/webserver/webserver/src/test/java/io/helidon/webserver/KeepAliveTest.java b/webserver/webserver/src/test/java/io/helidon/webserver/KeepAliveTest.java new file mode 100644 index 00000000000..f642e87be71 --- /dev/null +++ b/webserver/webserver/src/test/java/io/helidon/webserver/KeepAliveTest.java @@ -0,0 +1,125 @@ +/* + * Copyright (c) 2022 Oracle and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.helidon.webserver; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import io.helidon.common.LogConfig; +import io.helidon.common.http.DataChunk; +import io.helidon.common.reactive.Multi; +import io.helidon.webclient.WebClient; +import io.helidon.webclient.WebClientResponse; + +import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.HttpHeaderValues; +import io.netty.util.AsciiString; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.RepeatedTest; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasEntry; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.not; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class KeepAliveTest { + private static WebServer server; + private static WebClient webClient; + + @BeforeAll + static void setUp() { + LogConfig.configureRuntime(); + server = WebServer.builder() + .routing(Routing.builder() + .register("/close", rules -> rules.any((req, res) -> { + req.content().forEach(dataChunk -> { + // consume only first from two chunks + dataChunk.release(); + throw new RuntimeException("BOOM!"); + }).exceptionally(res::send); + })) + .register("/plain", rules -> rules.any((req, res) -> { + req.content() + .forEach(DataChunk::release) + .onComplete(res::send) + .ignoreElement(); + })) + .build()) + .build(); + server.start().await(); + String serverUrl = "http://localhost:" + server.port(); + webClient = WebClient.builder() + .baseUri(serverUrl) + .keepAlive(true) + .build(); + } + + @AfterAll + static void afterAll() { + server.shutdown(); + } + + @RepeatedTest(100) + void closeWithKeepAliveUnconsumedRequest() { + testCall(webClient, true, "/close", 500, HttpHeaderValues.CLOSE); + } + + @RepeatedTest(100) + void sendWithoutKeepAlive() { + testCall(webClient, false, "/plain", 200, null); + } + + @RepeatedTest(100) + void sendWithKeepAlive() { + testCall(webClient, true, "/plain", 200, HttpHeaderValues.KEEP_ALIVE); + } + + private static void testCall(WebClient webClient, + boolean keepAlive, + String path, + int expectedStatus, + AsciiString expectedConnectionHeader) { + WebClientResponse res = null; + try { + res = webClient + .put() + .keepAlive(keepAlive) + .path(path) + .submit(Multi.just("first", "second") + .map(String::getBytes) + .map(ByteBuffer::wrap) + .map(bb -> DataChunk.create(true, true, bb)) + ) + .await(5, TimeUnit.MINUTES); + + assertEquals(expectedStatus, res.status().code()); + if (expectedConnectionHeader != null) { + assertThat(res.headers().toMap(), hasEntry(HttpHeaderNames.CONNECTION.toString(), List.of(expectedConnectionHeader.toString()))); + } else { + assertThat(res.headers().toMap(), not(hasKey(HttpHeaderNames.CONNECTION.toString()))); + } + res.content().forEach(DataChunk::release); + } finally { + Optional.ofNullable(res).ifPresent(WebClientResponse::close); + } + } +} \ No newline at end of file diff --git a/webserver/webserver/src/test/java/io/helidon/webserver/PlainTest.java b/webserver/webserver/src/test/java/io/helidon/webserver/PlainTest.java index c792db38c78..0b3a06a467f 100644 --- a/webserver/webserver/src/test/java/io/helidon/webserver/PlainTest.java +++ b/webserver/webserver/src/test/java/io/helidon/webserver/PlainTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2021 Oracle and/or its affiliates. + * Copyright (c) 2017, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -307,7 +307,7 @@ public void deferredGetWithLargePayloadCausesConnectionClose() throws Exception } @Test - public void getWithIllegalSmallEnoughPayloadDoesntCauseConnectionClose() throws Exception { + public void getWithIllegalSmallEnoughPayloadDoesCauseConnectionClose() throws Exception { // open try (SocketHttpClient s = new SocketHttpClient(webServer)) { // get @@ -315,12 +315,12 @@ public void getWithIllegalSmallEnoughPayloadDoesntCauseConnectionClose() throws // assert assertThat(entityFromResponse(s.receive(), true), is("9\nIt works!\n0\n\n")); - SocketHttpClient.assertConnectionIsOpen(s); + SocketHttpClient.assertConnectionIsClosed(s); } } @Test - public void unconsumedSmallPostDataDoesNotCauseConnectionClose() throws Exception { + public void unconsumedSmallPostDataDoesCauseConnectionClose() throws Exception { // open try (SocketHttpClient s = new SocketHttpClient(webServer)) { // get @@ -329,7 +329,7 @@ public void unconsumedSmallPostDataDoesNotCauseConnectionClose() throws Exceptio System.out.println(received); // assert assertThat(entityFromResponse(received, true), is("15\nPayload not consumed!\n0\n\n")); - SocketHttpClient.assertConnectionIsOpen(s); + SocketHttpClient.assertConnectionIsClosed(s); } } @@ -360,7 +360,7 @@ public void unconsumedDeferredLargePostDataCausesConnectionClose() throws Except } @Test - public void errorHandlerWithGetPayloadDoesNotCauseConnectionClose() throws Exception { + public void errorHandlerWithGetPayloadDoesCauseConnectionClose() throws Exception { // open try (SocketHttpClient s = new SocketHttpClient(webServer)) { // get @@ -368,12 +368,12 @@ public void errorHandlerWithGetPayloadDoesNotCauseConnectionClose() throws Excep // assert assertThat(s.receive(), startsWith("HTTP/1.1 500 Internal Server Error\n")); - SocketHttpClient.assertConnectionIsOpen(s); + SocketHttpClient.assertConnectionIsClosed(s); } } @Test - public void errorHandlerWithPostDataDoesNotCauseConnectionClose() throws Exception { + public void errorHandlerWithPostDataDoesCauseConnectionClose() throws Exception { // open try (SocketHttpClient s = new SocketHttpClient(webServer)) { // get @@ -381,7 +381,7 @@ public void errorHandlerWithPostDataDoesNotCauseConnectionClose() throws Excepti // assert assertThat(s.receive(), startsWith("HTTP/1.1 500 Internal Server Error\n")); - SocketHttpClient.assertConnectionIsOpen(s); + SocketHttpClient.assertConnectionIsClosed(s); } } diff --git a/webserver/webserver/src/test/java/io/helidon/webserver/ReqEntityAnalyzedTest.java b/webserver/webserver/src/test/java/io/helidon/webserver/ReqEntityAnalyzedTest.java index f4c91194b50..5c040439b66 100644 --- a/webserver/webserver/src/test/java/io/helidon/webserver/ReqEntityAnalyzedTest.java +++ b/webserver/webserver/src/test/java/io/helidon/webserver/ReqEntityAnalyzedTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 Oracle and/or its affiliates. + * Copyright (c) 2021, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import io.helidon.common.http.DataChunk; +import io.helidon.common.http.Http; import io.helidon.common.reactive.Multi; import io.helidon.webclient.WebClient; import io.helidon.webclient.WebClientResponse; @@ -34,6 +35,8 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.RepeatedTest; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -88,6 +91,8 @@ private void testCall(WebClient webClient, Multi payload) { .onError(Throwable::printStackTrace) .await(TIME_OUT); + assertThat(webClientResponse.status(), is(Http.Status.OK_200)); + webClientResponse.content().as(String.class) .forSingle(s -> assertEquals("Server:0Server:1Server:2", s, "Wrong response!")) .await(TIME_OUT); diff --git a/webserver/webserver/src/test/java/io/helidon/webserver/utils/SocketHttpClient.java b/webserver/webserver/src/test/java/io/helidon/webserver/utils/SocketHttpClient.java index 4e9ccb3f408..d95de54f05b 100644 --- a/webserver/webserver/src/test/java/io/helidon/webserver/utils/SocketHttpClient.java +++ b/webserver/webserver/src/test/java/io/helidon/webserver/utils/SocketHttpClient.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017, 2021 Oracle and/or its affiliates. + * Copyright (c) 2017, 2022 Oracle and/or its affiliates. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -64,6 +64,7 @@ public class SocketHttpClient implements AutoCloseable { */ public SocketHttpClient(WebServer webServer) throws IOException { socket = new Socket("localhost", webServer.port()); + socket.setSoTimeout(10000); } /** From 51195eb64a3faf59e63b670c8972979db5d3a28e Mon Sep 17 00:00:00 2001 From: Tomas Langer Date: Fri, 21 Jan 2022 14:54:31 +0100 Subject: [PATCH 2/8] Spotbugs fix Signed-off-by: Tomas Langer --- .../src/main/java/io/helidon/webserver/ForwardingHandler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/ForwardingHandler.java b/webserver/webserver/src/main/java/io/helidon/webserver/ForwardingHandler.java index b99dc03875e..a31f56c6114 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/ForwardingHandler.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/ForwardingHandler.java @@ -207,7 +207,6 @@ private void channelReadHttpContent(ChannelHandlerContext ctx, Object msg) { lastContent = false; HttpContent httpContent = (HttpContent) msg; - boolean hasRequests = requestContext.hasRequests(); ByteBuf content = httpContent.content(); if (content.isReadable()) { From 98e9d18393b9b0b3bf2e45d284d6c5573747ddae Mon Sep 17 00:00:00 2001 From: Tomas Langer Date: Fri, 21 Jan 2022 19:52:10 +0100 Subject: [PATCH 3/8] Review udpate Signed-off-by: Tomas Langer --- .../helidon/webserver/BareResponseImpl.java | 27 +++++++++---------- .../io/helidon/webserver/RequestContext.java | 9 +++++++ 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/BareResponseImpl.java b/webserver/webserver/src/main/java/io/helidon/webserver/BareResponseImpl.java index d4915978181..5bdc050bd21 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/BareResponseImpl.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/BareResponseImpl.java @@ -195,21 +195,20 @@ public void writeStatusAndHeaders(Http.ResponseStatus status, Map log("Request content not fully read with keep-alive: true", channel)); - // no requests, nothing emitted or request cancelled (this is fine, we ignore entity and close connection) - boolean entityRequested = true; - if ((!requestContext.hasRequests() && !requestContext.hasEmitted()) - || requestContext.requestCancelled()) { - if (!isWebSocketUpgrade) { - entityRequested = false; - } - } - if (entityRequested) { - HttpRequest request = requestContext.request(); - LOGGER.warning("Entity was requested and not fully consumed before a response is sent. " - + "This is not supported. Connection will be closed. Please fix your route for " - + request.method() + " " + request.uri()); - } if (!isWebSocketUpgrade) { + if (requestContext.isDataRequested()) { + // there are pending requests, we have emitted some data and request was not explicitly canceled + // this is a bug in code, where entity is requested and not fully processed + // throwing an exception here is a breaking change (also this may be an intermittent problem + // as it may depend on thread race) + HttpRequest request = requestContext.request(); + LOGGER.warning("Entity was requested and not fully consumed before a response is sent. " + + "This is not supported. Connection will be closed. Please fix your route for " + + request.method() + " " + request.uri()); + } + + // in either case, we need to close the connection as it has data pending + // and we do not want to read an unknown amount of request data response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); requestEntityAnalyzed.complete(ChannelFutureListener.CLOSE); } diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/RequestContext.java b/webserver/webserver/src/main/java/io/helidon/webserver/RequestContext.java index 48bbaafccbd..4911f9b2e50 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/RequestContext.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/RequestContext.java @@ -87,6 +87,15 @@ boolean hasRequests() { return publisher.hasRequests(); } + /** + * Has there been a request for content. + * + * @return {@code true} if data was requested and request was not cancelled + */ + boolean isDataRequested() { + return (hasRequests() || hasEmitted()) && !requestCancelled(); + } + boolean hasEmitted() { return emitted; } From b8338365df4c3ddf4d12b1031d9fcb599ebb0535 Mon Sep 17 00:00:00 2001 From: Tomas Langer Date: Sat, 22 Jan 2022 18:14:06 +0100 Subject: [PATCH 4/8] Reduced logging overhead. Small improvements of tests. Signed-off-by: Tomas Langer --- .../WebClientRequestBuilderImpl.java | 29 ++++++++++++------- .../helidon/webserver/ForwardingHandler.java | 3 +- .../webserver/ReqEntityAnalyzedTest.java | 15 ++++------ 3 files changed, 26 insertions(+), 21 deletions(-) diff --git a/webclient/webclient/src/main/java/io/helidon/webclient/WebClientRequestBuilderImpl.java b/webclient/webclient/src/main/java/io/helidon/webclient/WebClientRequestBuilderImpl.java index ccb9b5c69f9..1915fdb3938 100644 --- a/webclient/webclient/src/main/java/io/helidon/webclient/WebClientRequestBuilderImpl.java +++ b/webclient/webclient/src/main/java/io/helidon/webclient/WebClientRequestBuilderImpl.java @@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; +import java.util.logging.Level; import java.util.logging.Logger; import io.helidon.common.GenericType; @@ -206,15 +207,19 @@ private static ChannelFuture obtainChannelFuture(RequestConfiguration configurat for (ChannelRecord channelRecord : channels) { Channel channel = channelRecord.channel; if (channel.isOpen() && channel.attr(IN_USE).get().compareAndSet(false, true)) { - LOGGER.finest(() -> "Reusing -> " + channel.hashCode()); - LOGGER.finest(() -> "Setting in use -> true"); + if (LOGGER.isLoggable(Level.FINEST)) { + LOGGER.finest(() -> "Reusing -> " + channel.hashCode() + ", settting in use -> true"); + } return channelRecord.channelFuture; } - LOGGER.finest(() -> "Not accepted -> " + channel.hashCode()); - LOGGER.finest(() -> "Open -> " + channel.isOpen()); - LOGGER.finest(() -> "In use -> " + channel.attr(IN_USE).get()); + if (LOGGER.isLoggable(Level.FINEST)) { + LOGGER.finest(() -> "Not accepted -> " + channel.hashCode() + ", open -> " + + channel.isOpen() + ", in use -> " + channel.attr(IN_USE).get()); + } + } + if (LOGGER.isLoggable(Level.FINEST)) { + LOGGER.finest(() -> "New connection to -> " + connectionIdent); } - LOGGER.finest(() -> "New connection to -> " + connectionIdent); URI uri = connectionIdent.base; ChannelFuture connect = bootstrap.connect(uri.getHost(), uri.getPort()); Channel channel = connect.channel(); @@ -227,9 +232,9 @@ private static ChannelFuture obtainChannelFuture(RequestConfiguration configurat } static void removeChannelFromCache(ConnectionIdent key, Channel channel) { - LOGGER.finest(() -> "Removing from channel cache."); - LOGGER.finest(() -> "Connection ident -> " + key); - LOGGER.finest(() -> "Channel -> " + channel.hashCode()); + if (LOGGER.isLoggable(Level.FINEST)) { + LOGGER.finest(() -> "Removing from channel cache. Connection ident -> " + key + ", channel -> " + channel.hashCode()); + } CHANNEL_CACHE.get(key).remove(new ChannelRecord(channel)); } @@ -581,8 +586,10 @@ private Single invoke(Flow.Publisher requestEntity : bootstrap.connect(finalUri.getHost(), finalUri.getPort()); channelFuture.addListener((ChannelFutureListener) future -> { - LOGGER.finest(() -> "(client reqID: " + requestId + ") " - + "Channel hashcode -> " + channelFuture.channel().hashCode()); + if (LOGGER.isLoggable(Level.FINEST)) { + LOGGER.finest(() -> "(client reqID: " + requestId + ") " + + "Channel hashcode -> " + channelFuture.channel().hashCode()); + } channelFuture.channel().attr(REQUEST).set(clientRequest); channelFuture.channel().attr(RESPONSE_RECEIVED).set(false); channelFuture.channel().attr(RECEIVED).set(responseReceived); diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/ForwardingHandler.java b/webserver/webserver/src/main/java/io/helidon/webserver/ForwardingHandler.java index a31f56c6114..c7e7171794b 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/ForwardingHandler.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/ForwardingHandler.java @@ -350,7 +350,7 @@ private boolean channelReadHttpRequest(ChannelHandlerContext ctx, Context reques // If a problem with the request URI, return 400 response BareRequestImpl bareRequest; try { - bareRequest = new BareRequestImpl((HttpRequest) msg, + bareRequest = new BareRequestImpl(request, requestContextRef.publisher(), webServer, ctx, @@ -535,6 +535,7 @@ private void send100Continue(ChannelHandlerContext ctx, ""); FullHttpResponse response = toNettyResponse(transportResponse); + // we should flush this immediately, as we need the client to send entity ctx.writeAndFlush(response); } diff --git a/webserver/webserver/src/test/java/io/helidon/webserver/ReqEntityAnalyzedTest.java b/webserver/webserver/src/test/java/io/helidon/webserver/ReqEntityAnalyzedTest.java index 5c040439b66..25ec49625f4 100644 --- a/webserver/webserver/src/test/java/io/helidon/webserver/ReqEntityAnalyzedTest.java +++ b/webserver/webserver/src/test/java/io/helidon/webserver/ReqEntityAnalyzedTest.java @@ -19,8 +19,6 @@ import java.nio.ByteBuffer; import java.time.Duration; -import java.util.Optional; -import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -37,9 +35,7 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; public class ReqEntityAnalyzedTest { private static final Duration TIME_OUT = Duration.ofSeconds(5); @@ -93,14 +89,15 @@ private void testCall(WebClient webClient, Multi payload) { assertThat(webClientResponse.status(), is(Http.Status.OK_200)); - webClientResponse.content().as(String.class) - .forSingle(s -> assertEquals("Server:0Server:1Server:2", s, "Wrong response!")) + String response = webClientResponse.content() + .as(String.class) .await(TIME_OUT); - } catch (CompletionException e) { - fail(e); + assertThat(response, is("Server:0Server:1Server:2")); } finally { - Optional.ofNullable(webClientResponse).ifPresent(WebClientResponse::close); + if (webClientResponse != null) { + webClientResponse.close(); + } } } From c0c7ab59a51604582188518a5f458a008b1d5b6c Mon Sep 17 00:00:00 2001 From: Tomas Langer Date: Mon, 24 Jan 2022 23:42:28 +0100 Subject: [PATCH 5/8] Checkstyle fix. Signed-off-by: Tomas Langer --- .../java/io/helidon/webclient/WebClientRequestBuilderImpl.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/webclient/webclient/src/main/java/io/helidon/webclient/WebClientRequestBuilderImpl.java b/webclient/webclient/src/main/java/io/helidon/webclient/WebClientRequestBuilderImpl.java index 1915fdb3938..7670f4c071c 100644 --- a/webclient/webclient/src/main/java/io/helidon/webclient/WebClientRequestBuilderImpl.java +++ b/webclient/webclient/src/main/java/io/helidon/webclient/WebClientRequestBuilderImpl.java @@ -233,7 +233,8 @@ private static ChannelFuture obtainChannelFuture(RequestConfiguration configurat static void removeChannelFromCache(ConnectionIdent key, Channel channel) { if (LOGGER.isLoggable(Level.FINEST)) { - LOGGER.finest(() -> "Removing from channel cache. Connection ident -> " + key + ", channel -> " + channel.hashCode()); + LOGGER.finest(() -> "Removing from channel cache. Connection ident -> " + key + + ", channel -> " + channel.hashCode()); } CHANNEL_CACHE.get(key).remove(new ChannelRecord(channel)); } From 0d0b8b6e622ecde678ad2330c38e2f07514b8e0d Mon Sep 17 00:00:00 2001 From: Tomas Langer Date: Mon, 31 Jan 2022 11:58:29 +0100 Subject: [PATCH 6/8] Test with JDK. Signed-off-by: Tomas Langer --- .../helidon/webserver/ForwardingHandler.java | 4 ++ .../io/helidon/webserver/KeepAliveTest.java | 54 +++++++++++++++++-- 2 files changed, 53 insertions(+), 5 deletions(-) diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/ForwardingHandler.java b/webserver/webserver/src/main/java/io/helidon/webserver/ForwardingHandler.java index c7e7171794b..4aea3f65b43 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/ForwardingHandler.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/ForwardingHandler.java @@ -438,6 +438,10 @@ private boolean channelReadHttpRequest(ChannelHandlerContext ctx, Context reques LOGGER.fine(log("Response complete: %s", ctx, System.identityHashCode(msg))); } }); + /* + TODO we should only send continue in case the entity is request (e.g. we found a route and user started reading it) + This would solve connection close for 404 for requests with entity + */ if (HttpUtil.is100ContinueExpected(request)) { send100Continue(ctx, request); } diff --git a/webserver/webserver/src/test/java/io/helidon/webserver/KeepAliveTest.java b/webserver/webserver/src/test/java/io/helidon/webserver/KeepAliveTest.java index f642e87be71..b7b57a05d24 100644 --- a/webserver/webserver/src/test/java/io/helidon/webserver/KeepAliveTest.java +++ b/webserver/webserver/src/test/java/io/helidon/webserver/KeepAliveTest.java @@ -17,10 +17,14 @@ package io.helidon.webserver; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; import java.nio.ByteBuffer; +import java.time.Duration; import java.util.List; import java.util.Optional; -import java.util.concurrent.TimeUnit; import io.helidon.common.LogConfig; import io.helidon.common.http.DataChunk; @@ -35,15 +39,17 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.RepeatedTest; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.not; -import static org.junit.jupiter.api.Assertions.assertEquals; public class KeepAliveTest { private static WebServer server; private static WebClient webClient; + private static HttpClient httpClient; + private static URI uri; @BeforeAll static void setUp() { @@ -71,6 +77,12 @@ static void setUp() { .baseUri(serverUrl) .keepAlive(true) .build(); + + httpClient = HttpClient.newBuilder() + .connectTimeout(Duration.ofSeconds(10)) + .build(); + + uri = URI.create(serverUrl); } @AfterAll @@ -78,11 +90,15 @@ static void afterAll() { server.shutdown(); } - @RepeatedTest(100) + @RepeatedTest(1000) void closeWithKeepAliveUnconsumedRequest() { testCall(webClient, true, "/close", 500, HttpHeaderValues.CLOSE); } + void closeWithKeepAliveUnconsumedRequestHttpClient() { + testCallJdk("/close", 500, HttpHeaderValues.CLOSE); + } + @RepeatedTest(100) void sendWithoutKeepAlive() { testCall(webClient, false, "/plain", 200, null); @@ -93,6 +109,34 @@ void sendWithKeepAlive() { testCall(webClient, true, "/plain", 200, HttpHeaderValues.KEEP_ALIVE); } + private static void testCallJdk(String path, + int expectedStatus, + AsciiString expectedConnectionHeader) { + try { + HttpResponse res = httpClient.send(HttpRequest.newBuilder() + .uri(uri.resolve(path)) + .PUT(HttpRequest.BodyPublishers.fromPublisher(Multi.just("first", + "second") + .map(String::getBytes) + .map(ByteBuffer::wrap))) + .build(), + HttpResponse.BodyHandlers.ofByteArray()); + + assertThat(res.statusCode(), is(expectedStatus)); + if (expectedConnectionHeader == null) { + assertThat(res.headers().firstValue(HttpHeaderNames.CONNECTION.toString()), is(Optional.empty())); + } else { + assertThat(res.headers().firstValue(HttpHeaderNames.CONNECTION.toString()), + is(Optional.of(expectedConnectionHeader.toString()))); + } + byte[] bytes = res.body(); + } catch (AssertionError e) { + throw e; + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + private static void testCall(WebClient webClient, boolean keepAlive, String path, @@ -109,9 +153,9 @@ private static void testCall(WebClient webClient, .map(ByteBuffer::wrap) .map(bb -> DataChunk.create(true, true, bb)) ) - .await(5, TimeUnit.MINUTES); + .await(Duration.ofMinutes(5)); - assertEquals(expectedStatus, res.status().code()); + assertThat(res.status().code(), is(expectedStatus)); if (expectedConnectionHeader != null) { assertThat(res.headers().toMap(), hasEntry(HttpHeaderNames.CONNECTION.toString(), List.of(expectedConnectionHeader.toString()))); } else { From 8ac6362fab56b82c652fdd0b00604d225af53937 Mon Sep 17 00:00:00 2001 From: Tomas Langer Date: Tue, 1 Feb 2022 14:37:41 +0100 Subject: [PATCH 7/8] Consume request entity on valid responses. Signed-off-by: Tomas Langer --- .../helidon/webserver/BareResponseImpl.java | 34 +++++++-- .../io/helidon/webserver/RequestContext.java | 5 +- .../io/helidon/webserver/KeepAliveTest.java | 73 ++++++------------- .../java/io/helidon/webserver/PlainTest.java | 32 ++++---- 4 files changed, 68 insertions(+), 76 deletions(-) diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/BareResponseImpl.java b/webserver/webserver/src/main/java/io/helidon/webserver/BareResponseImpl.java index 5bdc050bd21..53218ee21a2 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/BareResponseImpl.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/BareResponseImpl.java @@ -75,6 +75,7 @@ class BareResponseImpl implements BareResponse { private final HttpHeaders requestHeaders; private final ChannelFuture channelClosedFuture; private final GenericFutureListener> channelClosedListener; + private final CompletableFuture originalEntityAnalyzed; // Accessed by Subscriber method threads private Flow.Subscription subscription; @@ -102,6 +103,7 @@ class BareResponseImpl implements BareResponse { CompletableFuture requestEntityAnalyzed, long requestId) { this.requestContext = requestContext; + this.originalEntityAnalyzed = requestEntityAnalyzed; this.requestEntityAnalyzed = requestEntityAnalyzed; this.responseFuture = new CompletableFuture<>(); this.headersFuture = new CompletableFuture<>(); @@ -191,7 +193,11 @@ public void writeStatusAndHeaders(Http.ResponseStatus status, Map log("Request content not fully read with keep-alive: true", channel)); @@ -205,12 +211,26 @@ public void writeStatusAndHeaders(Http.ResponseStatus status, Map { + response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); + originalEntityAnalyzed.complete(ChannelFutureListener.CLOSE_ON_FAILURE); + }) + .onError(t -> { + response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); + originalEntityAnalyzed.complete(ChannelFutureListener.CLOSE); + }) + .ignoreElement(); + } } } else if (!headers.containsKey(HttpHeaderNames.CONNECTION.toString())) { response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); @@ -219,8 +239,8 @@ public void writeStatusAndHeaders(Http.ResponseStatus status, Map log("Writing headers %s", status)); requestEntityAnalyzed = requestEntityAnalyzed.thenApply(listener -> { + LOGGER.fine(() -> log("Writing headers %s", status)); requestContext.runInScope(() -> orderedWrite(this::initWriteResponse)); return listener; }); diff --git a/webserver/webserver/src/main/java/io/helidon/webserver/RequestContext.java b/webserver/webserver/src/main/java/io/helidon/webserver/RequestContext.java index 4911f9b2e50..246807d7aac 100644 --- a/webserver/webserver/src/main/java/io/helidon/webserver/RequestContext.java +++ b/webserver/webserver/src/main/java/io/helidon/webserver/RequestContext.java @@ -16,7 +16,6 @@ package io.helidon.webserver; -import java.util.concurrent.Flow; import java.util.logging.Logger; import io.helidon.common.context.Context; @@ -46,7 +45,7 @@ class RequestContext { this.scope = scope; } - Flow.Publisher publisher() { + Multi publisher() { return Multi.create(publisher) .peek(something -> emitted = true); } @@ -93,7 +92,7 @@ boolean hasRequests() { * @return {@code true} if data was requested and request was not cancelled */ boolean isDataRequested() { - return (hasRequests() || hasEmitted()) && !requestCancelled(); + return (hasRequests() || hasEmitted()) || requestCancelled(); } boolean hasEmitted() { diff --git a/webserver/webserver/src/test/java/io/helidon/webserver/KeepAliveTest.java b/webserver/webserver/src/test/java/io/helidon/webserver/KeepAliveTest.java index b7b57a05d24..9e12a46981a 100644 --- a/webserver/webserver/src/test/java/io/helidon/webserver/KeepAliveTest.java +++ b/webserver/webserver/src/test/java/io/helidon/webserver/KeepAliveTest.java @@ -17,14 +17,13 @@ package io.helidon.webserver; -import java.net.URI; -import java.net.http.HttpClient; -import java.net.http.HttpRequest; -import java.net.http.HttpResponse; import java.nio.ByteBuffer; import java.time.Duration; import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import io.helidon.common.LogConfig; import io.helidon.common.http.DataChunk; @@ -48,8 +47,6 @@ public class KeepAliveTest { private static WebServer server; private static WebClient webClient; - private static HttpClient httpClient; - private static URI uri; @BeforeAll static void setUp() { @@ -78,11 +75,6 @@ static void setUp() { .keepAlive(true) .build(); - httpClient = HttpClient.newBuilder() - .connectTimeout(Duration.ofSeconds(10)) - .build(); - - uri = URI.create(serverUrl); } @AfterAll @@ -90,65 +82,36 @@ static void afterAll() { server.shutdown(); } - @RepeatedTest(1000) + @RepeatedTest(100) void closeWithKeepAliveUnconsumedRequest() { - testCall(webClient, true, "/close", 500, HttpHeaderValues.CLOSE); - } - - void closeWithKeepAliveUnconsumedRequestHttpClient() { - testCallJdk("/close", 500, HttpHeaderValues.CLOSE); + testCall(webClient, true, "/close", 500, HttpHeaderValues.CLOSE, true); } @RepeatedTest(100) void sendWithoutKeepAlive() { - testCall(webClient, false, "/plain", 200, null); + testCall(webClient, false, "/plain", 200, null, false); } @RepeatedTest(100) void sendWithKeepAlive() { - testCall(webClient, true, "/plain", 200, HttpHeaderValues.KEEP_ALIVE); - } - - private static void testCallJdk(String path, - int expectedStatus, - AsciiString expectedConnectionHeader) { - try { - HttpResponse res = httpClient.send(HttpRequest.newBuilder() - .uri(uri.resolve(path)) - .PUT(HttpRequest.BodyPublishers.fromPublisher(Multi.just("first", - "second") - .map(String::getBytes) - .map(ByteBuffer::wrap))) - .build(), - HttpResponse.BodyHandlers.ofByteArray()); - - assertThat(res.statusCode(), is(expectedStatus)); - if (expectedConnectionHeader == null) { - assertThat(res.headers().firstValue(HttpHeaderNames.CONNECTION.toString()), is(Optional.empty())); - } else { - assertThat(res.headers().firstValue(HttpHeaderNames.CONNECTION.toString()), - is(Optional.of(expectedConnectionHeader.toString()))); - } - byte[] bytes = res.body(); - } catch (AssertionError e) { - throw e; - } catch (Throwable e) { - throw new RuntimeException(e); - } + testCall(webClient, true, "/plain", 200, HttpHeaderValues.KEEP_ALIVE, false); } private static void testCall(WebClient webClient, boolean keepAlive, String path, int expectedStatus, - AsciiString expectedConnectionHeader) { + AsciiString expectedConnectionHeader, + boolean ignoreConnectionClose) { WebClientResponse res = null; try { res = webClient .put() .keepAlive(keepAlive) .path(path) - .submit(Multi.just("first", "second") + .submit(Multi.interval(10, TimeUnit.MILLISECONDS, Executors.newSingleThreadScheduledExecutor()) + .limit(2) + .map(l -> "msg_"+ l) .map(String::getBytes) .map(ByteBuffer::wrap) .map(bb -> DataChunk.create(true, true, bb)) @@ -157,11 +120,21 @@ private static void testCall(WebClient webClient, assertThat(res.status().code(), is(expectedStatus)); if (expectedConnectionHeader != null) { - assertThat(res.headers().toMap(), hasEntry(HttpHeaderNames.CONNECTION.toString(), List.of(expectedConnectionHeader.toString()))); + assertThat(res.headers().toMap(), + hasEntry(HttpHeaderNames.CONNECTION.toString(), List.of(expectedConnectionHeader.toString()))); } else { assertThat(res.headers().toMap(), not(hasKey(HttpHeaderNames.CONNECTION.toString()))); } res.content().forEach(DataChunk::release); + } catch (CompletionException e) { + if (ignoreConnectionClose && e.getMessage().contains("Connection reset")) { + // this is an expected (intermittent) result - due to a natural race (between us writing the request + // data and server responding), we may either get a response + // or the socket may be closed for writing (the reset comes from an attempt to write entity to a closed + // socket) + return; + } + throw e; } finally { Optional.ofNullable(res).ifPresent(WebClientResponse::close); } diff --git a/webserver/webserver/src/test/java/io/helidon/webserver/PlainTest.java b/webserver/webserver/src/test/java/io/helidon/webserver/PlainTest.java index 0b3a06a467f..0507b67b005 100644 --- a/webserver/webserver/src/test/java/io/helidon/webserver/PlainTest.java +++ b/webserver/webserver/src/test/java/io/helidon/webserver/PlainTest.java @@ -256,7 +256,7 @@ public void postGetPostGetTheSameConnection() throws Exception { } @Test - public void getWithLargePayloadCausesConnectionClose() throws Exception { + public void getWithLargePayloadDoesNotCauseConnectionClose() throws Exception { // open try (SocketHttpClient s = new SocketHttpClient(webServer)) { // get @@ -264,7 +264,7 @@ public void getWithLargePayloadCausesConnectionClose() throws Exception { // assert assertThat(entityFromResponse(s.receive(), true), is("9\nIt works!\n0\n\n")); - SocketHttpClient.assertConnectionIsClosed(s); + SocketHttpClient.assertConnectionIsOpen(s); } } @@ -295,19 +295,19 @@ public void traceWithAnyPayloadCausesConnectionCloseAndBadRequestWhenHandled() t } @Test - public void deferredGetWithLargePayloadCausesConnectionClose() throws Exception { + public void deferredGetWithLargePayloadDoesNotCauseConnectionClose() throws Exception { // open try (SocketHttpClient s = new SocketHttpClient(webServer)) { // get s.request(Http.Method.GET, "/deferred", SocketHttpClient.longData(100_000).toString()); // assert assertThat(entityFromResponse(s.receive(), true), is("d\nI'm deferred!\n0\n\n")); - SocketHttpClient.assertConnectionIsClosed(s); + SocketHttpClient.assertConnectionIsOpen(s); } } @Test - public void getWithIllegalSmallEnoughPayloadDoesCauseConnectionClose() throws Exception { + public void getWithIllegalSmallEnoughPayloadDoesntCauseConnectionClose() throws Exception { // open try (SocketHttpClient s = new SocketHttpClient(webServer)) { // get @@ -315,12 +315,12 @@ public void getWithIllegalSmallEnoughPayloadDoesCauseConnectionClose() throws Ex // assert assertThat(entityFromResponse(s.receive(), true), is("9\nIt works!\n0\n\n")); - SocketHttpClient.assertConnectionIsClosed(s); + SocketHttpClient.assertConnectionIsOpen(s); } } @Test - public void unconsumedSmallPostDataDoesCauseConnectionClose() throws Exception { + public void unconsumedSmallPostDataDoesNotCauseConnectionClose() throws Exception { // open try (SocketHttpClient s = new SocketHttpClient(webServer)) { // get @@ -329,12 +329,12 @@ public void unconsumedSmallPostDataDoesCauseConnectionClose() throws Exception { System.out.println(received); // assert assertThat(entityFromResponse(received, true), is("15\nPayload not consumed!\n0\n\n")); - SocketHttpClient.assertConnectionIsClosed(s); + SocketHttpClient.assertConnectionIsOpen(s); } } @Test - public void unconsumedLargePostDataCausesConnectionClose() throws Exception { + public void unconsumedLargePostDataDoesNotCauseConnectionClose() throws Exception { // open try (SocketHttpClient s = new SocketHttpClient(webServer)) { // get @@ -342,12 +342,12 @@ public void unconsumedLargePostDataCausesConnectionClose() throws Exception { // assert assertThat(entityFromResponse(s.receive(), true), is("15\nPayload not consumed!\n0\n\n")); - SocketHttpClient.assertConnectionIsClosed(s); + SocketHttpClient.assertConnectionIsOpen(s); } } @Test - public void unconsumedDeferredLargePostDataCausesConnectionClose() throws Exception { + public void unconsumedDeferredLargePostDataDoesNotCauseConnectionClose() throws Exception { // open try (SocketHttpClient s = new SocketHttpClient(webServer)) { // get @@ -355,12 +355,12 @@ public void unconsumedDeferredLargePostDataCausesConnectionClose() throws Except // assert assertThat(entityFromResponse(s.receive(), true), is("d\nI'm deferred!\n0\n\n")); - SocketHttpClient.assertConnectionIsClosed(s); + SocketHttpClient.assertConnectionIsOpen(s); } } @Test - public void errorHandlerWithGetPayloadDoesCauseConnectionClose() throws Exception { + public void errorHandlerWithGetPayloadDoesNotCauseConnectionClose() throws Exception { // open try (SocketHttpClient s = new SocketHttpClient(webServer)) { // get @@ -368,12 +368,12 @@ public void errorHandlerWithGetPayloadDoesCauseConnectionClose() throws Exceptio // assert assertThat(s.receive(), startsWith("HTTP/1.1 500 Internal Server Error\n")); - SocketHttpClient.assertConnectionIsClosed(s); + SocketHttpClient.assertConnectionIsOpen(s); } } @Test - public void errorHandlerWithPostDataDoesCauseConnectionClose() throws Exception { + public void errorHandlerWithPostDataDoesNotCauseConnectionClose() throws Exception { // open try (SocketHttpClient s = new SocketHttpClient(webServer)) { // get @@ -381,7 +381,7 @@ public void errorHandlerWithPostDataDoesCauseConnectionClose() throws Exception // assert assertThat(s.receive(), startsWith("HTTP/1.1 500 Internal Server Error\n")); - SocketHttpClient.assertConnectionIsClosed(s); + SocketHttpClient.assertConnectionIsOpen(s); } } From 74612e3eb2875289bc2546dc475b854b360ccdd1 Mon Sep 17 00:00:00 2001 From: Tomas Langer Date: Tue, 1 Feb 2022 20:07:21 +0100 Subject: [PATCH 8/8] Fix test intermittently failing on known state Signed-off-by: Tomas Langer --- .../java/io/helidon/webserver/ReqEntityAnalyzedTest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/webserver/webserver/src/test/java/io/helidon/webserver/ReqEntityAnalyzedTest.java b/webserver/webserver/src/test/java/io/helidon/webserver/ReqEntityAnalyzedTest.java index 25ec49625f4..886cac1d18d 100644 --- a/webserver/webserver/src/test/java/io/helidon/webserver/ReqEntityAnalyzedTest.java +++ b/webserver/webserver/src/test/java/io/helidon/webserver/ReqEntityAnalyzedTest.java @@ -94,6 +94,12 @@ private void testCall(WebClient webClient, Multi payload) { .await(TIME_OUT); assertThat(response, is("Server:0Server:1Server:2")); + } catch (Exception e) { + // this is expected - we do not support parallel read of entity + if (e.getMessage().contains("reset by the host")) { + return; + } + throw e; } finally { if (webClientResponse != null) { webClientResponse.close();