Skip to content

Commit

Permalink
Fix wrong connection close (#3830) (#3866)
Browse files Browse the repository at this point in the history
* Fix wrong connection close
* Reduced logging overhead. Small improvements of tests.
* Consume request entity on valid responses.

Signed-off-by: Tomas Langer <tomas.langer@oracle.com>

(cherry picked from commit 2a1dd6b)
  • Loading branch information
tomas-langer authored Feb 7, 2022
1 parent 19a64c0 commit ea7c59d
Show file tree
Hide file tree
Showing 10 changed files with 424 additions and 100 deletions.
68 changes: 36 additions & 32 deletions microprofile/tests/tck/tck-jwt-auth/tck-base-suite.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,38 +37,42 @@
<exclude name="excludes"/>
</run>
</groups>
<packages>
<package name="org.eclipse.microprofile.jwt.tck.*">
<!-- Optional, we do not support those-->
<exclude name="org.eclipse.microprofile.jwt.tck.container.servlet"/>
<exclude name="org.eclipse.microprofile.jwt.tck.container.ejb"/>
<exclude name="org.eclipse.microprofile.jwt.tck.container.jacc"/>
</package>
</packages>
<classes>
<class name="org.eclipse.microprofile.jwt.tck.util.TokenUtilsTest"/>
<class name="org.eclipse.microprofile.jwt.tck.container.jaxrs.UnsecuredPingTest"/>
<class name="org.eclipse.microprofile.jwt.tck.container.jaxrs.RequiredClaimsTest"/>
<!-- TODO expects injection into custom claims that are NOT JsonValue
https://github.com/eclipse/microprofile-jwt-auth/issues/117
-->
<!--<class name="org.eclipse.microprofile.jwt.tck.container.jaxrs.ClaimValueInjectionTest"/>-->
<class name="org.eclipse.microprofile.jwt.tck.container.jaxrs.JsonValueInjectionTest"/>
<!-- TODO expects injection into custom claims that are NOT JsonValue
https://github.com/eclipse/microprofile-jwt-auth/issues/117
-->
<!--<class name="org.eclipse.microprofile.jwt.tck.container.jaxrs.ProviderInjectionTest"/>-->
<class name="org.eclipse.microprofile.jwt.tck.container.jaxrs.RolesAllowedTest"/>
<class name="org.eclipse.microprofile.jwt.tck.container.jaxrs.InvalidTokenTest"/>
<!-- TODO https://github.com/eclipse/microprofile-jwt-auth/issues/116 -->
<!--<class name="org.eclipse.microprofile.jwt.tck.container.jaxrs.PrimitiveInjectionTest"/>-->
<class name="org.eclipse.microprofile.jwt.tck.container.jaxrs.PrincipalInjectionTest"/>
<class name="org.eclipse.microprofile.jwt.tck.config.PublicKeyAsPEMTest"/>
<class name="org.eclipse.microprofile.jwt.tck.config.PublicKeyAsPEMLocationTest"/>
<class name="org.eclipse.microprofile.jwt.tck.config.PublicKeyAsPEMLocationURLTest"/>
<class name="org.eclipse.microprofile.jwt.tck.config.PublicKeyAsJWKTest"/>
<class name="org.eclipse.microprofile.jwt.tck.config.PublicKeyAsJWKLocationTest"/>
<class name="org.eclipse.microprofile.jwt.tck.config.PublicKeyAsJWKLocationURLTest"/>
<class name="org.eclipse.microprofile.jwt.tck.config.PublicKeyAsJWKSTest"/>
<class name="org.eclipse.microprofile.jwt.tck.config.PublicKeyAsJWKSLocationTest"/>
<class name="org.eclipse.microprofile.jwt.tck.config.PublicKeyAsBase64JWKTest"/>
<class name="org.eclipse.microprofile.jwt.tck.config.PublicKeyAsFileLocationURLTest"/>
<class name="org.eclipse.microprofile.jwt.tck.config.IssNoValidationNoIssTest"/>
<class name="org.eclipse.microprofile.jwt.tck.config.IssNoValidationBadIssTest"/>
<class name="org.eclipse.microprofile.jwt.tck.config.IssValidationTest"/>
<class name="org.eclipse.microprofile.jwt.tck.config.IssValidationFailTest"/>
<!-- TODO expects server to be running before security is configured
https://github.com/eclipse/microprofile-jwt-auth/issues/118
-->
<class name="org.eclipse.microprofile.jwt.tck.config.PublicKeyAsPEMLocationURLTest">
<methods>
<exclude name="validateLocationUrlContents"/>
<exclude name="testKeyAsLocationUrl"/>
</methods>
</class>
<class name="org.eclipse.microprofile.jwt.tck.config.PublicKeyAsJWKLocationURLTest">
<methods>
<exclude name="validateLocationUrlContents"/>
<exclude name="testKeyAsLocationUrl"/>
</methods>
</class>

<!-- Optional, we do not support those-->
<class name="org.eclipse.microprofile.jwt.tck.container.jaxrs.jwe.RolesAllowedSignEncryptTest">
<methods>
<exclude name="testNeedsGroup1Mapping"/>
</methods>
</class>
<class name="org.eclipse.microprofile.jwt.tck.container.jaxrs.RolesAllowedTest">
<methods>
<exclude name="testNeedsGroup1Mapping"/>
</methods>
</class>
</classes>
</test>
</suite>
</suite>
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;

import io.helidon.common.GenericType;
Expand Down Expand Up @@ -204,15 +205,19 @@ private static ChannelFuture obtainChannelFuture(RequestConfiguration configurat
for (ChannelRecord channelRecord : channels) {
Channel channel = channelRecord.channel;
if (channel.isOpen() && channel.attr(IN_USE).get().compareAndSet(false, true)) {
LOGGER.finest(() -> "Reusing -> " + channel.hashCode());
LOGGER.finest(() -> "Setting in use -> true");
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest(() -> "Reusing -> " + channel.hashCode() + ", settting in use -> true");
}
return channelRecord.channelFuture;
}
LOGGER.finest(() -> "Not accepted -> " + channel.hashCode());
LOGGER.finest(() -> "Open -> " + channel.isOpen());
LOGGER.finest(() -> "In use -> " + channel.attr(IN_USE).get());
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest(() -> "Not accepted -> " + channel.hashCode() + ", open -> "
+ channel.isOpen() + ", in use -> " + channel.attr(IN_USE).get());
}
}
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest(() -> "New connection to -> " + connectionIdent);
}
LOGGER.finest(() -> "New connection to -> " + connectionIdent);
URI uri = connectionIdent.base;
ChannelFuture connect = bootstrap.connect(uri.getHost(), uri.getPort());
Channel channel = connect.channel();
Expand All @@ -225,9 +230,10 @@ private static ChannelFuture obtainChannelFuture(RequestConfiguration configurat
}

static void removeChannelFromCache(ConnectionIdent key, Channel channel) {
LOGGER.finest(() -> "Removing from channel cache.");
LOGGER.finest(() -> "Connection ident -> " + key);
LOGGER.finest(() -> "Channel -> " + channel.hashCode());
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest(() -> "Removing from channel cache. Connection ident -> " + key
+ ", channel -> " + channel.hashCode());
}
CHANNEL_CACHE.get(key).remove(new ChannelRecord(channel));
}

Expand Down Expand Up @@ -578,8 +584,10 @@ private Single<WebClientResponse> invoke(Flow.Publisher<DataChunk> requestEntity
: bootstrap.connect(finalUri.getHost(), finalUri.getPort());

channelFuture.addListener((ChannelFutureListener) future -> {
LOGGER.finest(() -> "(client reqID: " + requestId + ") "
+ "Channel hashcode -> " + channelFuture.channel().hashCode());
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest(() -> "(client reqID: " + requestId + ") "
+ "Channel hashcode -> " + channelFuture.channel().hashCode());
}
channelFuture.channel().attr(REQUEST).set(clientRequest);
channelFuture.channel().attr(RESPONSE_RECEIVED).set(false);
channelFuture.channel().attr(RECEIVED).set(responseReceived);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2021 Oracle and/or its affiliates.
* Copyright (c) 2017, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -56,7 +56,6 @@
* The BareResponseImpl.
*/
class BareResponseImpl implements BareResponse {

private static final Logger LOGGER = Logger.getLogger(BareResponseImpl.class.getName());

// See HttpConversionUtil.ExtensionHeaderNames
Expand All @@ -76,6 +75,7 @@ class BareResponseImpl implements BareResponse {
private final HttpHeaders requestHeaders;
private final ChannelFuture channelClosedFuture;
private final GenericFutureListener<? extends Future<? super Void>> channelClosedListener;
private final CompletableFuture<ChannelFutureListener> originalEntityAnalyzed;

// Accessed by Subscriber method threads
private Flow.Subscription subscription;
Expand Down Expand Up @@ -103,6 +103,7 @@ class BareResponseImpl implements BareResponse {
CompletableFuture<ChannelFutureListener> requestEntityAnalyzed,
long requestId) {
this.requestContext = requestContext;
this.originalEntityAnalyzed = requestEntityAnalyzed;
this.requestEntityAnalyzed = requestEntityAnalyzed;
this.responseFuture = new CompletableFuture<>();
this.headersFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -192,26 +193,44 @@ public void writeStatusAndHeaders(Http.ResponseStatus status, Map<String, List<S
// Add keep alive header as per:
// http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
// If already set (e.g. WebSocket upgrade), do not override
if (keepAlive) {
// if response Connection header is set explicitly to close, we can ignore the following
if (!keepAlive || HttpHeaderValues.CLOSE.contentEqualsIgnoreCase(response.headers().get(HttpHeaderNames.CONNECTION))) {
response.headers().remove(HttpHeaderNames.CONNECTION);
originalEntityAnalyzed.complete(ChannelFutureListener.CLOSE);
} else {
if (!requestContext.requestCompleted()) {
LOGGER.finer(() -> log("Request content not fully read with keep-alive: true", channel));
if (!requestContext.hasRequests() || requestContext.requestCancelled()) {
requestEntityAnalyzed = requestEntityAnalyzed.thenApply(listener -> {
if (listener.equals(ChannelFutureListener.CLOSE)) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
} else if (!headers.containsKey(HttpHeaderNames.CONNECTION.toString())) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
return listener;
});
//We are not sure which Connection header value should be set.
//If unhandled entity is only one content large, we can keep the keep-alive
channel.read();
} else {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
requestEntityAnalyzed.complete(ChannelFutureListener.CLOSE);
throw new IllegalStateException("Cannot request entity and send response without "
+ "waiting for it to be handled");

if (!isWebSocketUpgrade) {
if (requestContext.isDataRequested()) {
// there are pending requests, we have emitted some data and request was not explicitly canceled
// this is a bug in code, where entity is requested and not fully processed
// throwing an exception here is a breaking change (also this may be an intermittent problem
// as it may depend on thread race)
HttpRequest request = requestContext.request();
LOGGER.warning("Entity was requested and not fully consumed before a response is sent. "
+ "This is not supported. Connection will be closed. Please fix your route for "
+ request.method() + " " + request.uri());

// let's close this connection, as it is in an unexpected state
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
originalEntityAnalyzed.complete(ChannelFutureListener.CLOSE);
} else {
// we want to consume the entity and keep alive
// entity must be consumed here, so we do not close connection in forwarding handler
// because of unconsumed payload (the following code will only succeed if there is no subscriber)
requestContext.publisher()
.forEach(DataChunk::release)
.onComplete(() -> {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
originalEntityAnalyzed.complete(ChannelFutureListener.CLOSE_ON_FAILURE);
})
.onError(t -> {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
originalEntityAnalyzed.complete(ChannelFutureListener.CLOSE);
})
.ignoreElement();
}
}
} else if (!headers.containsKey(HttpHeaderNames.CONNECTION.toString())) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
Expand All @@ -220,8 +239,8 @@ public void writeStatusAndHeaders(Http.ResponseStatus status, Map<String, List<S

// Content length optimization attempt
if (!lengthOptimization) {
LOGGER.fine(() -> log("Writing headers %s", status));
requestEntityAnalyzed = requestEntityAnalyzed.thenApply(listener -> {
LOGGER.fine(() -> log("Writing headers %s", status));
requestContext.runInScope(() -> orderedWrite(this::initWriteResponse));
return listener;
});
Expand Down
Loading

0 comments on commit ea7c59d

Please sign in to comment.