Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix wrong connection close #3830

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;

import io.helidon.common.GenericType;
Expand Down Expand Up @@ -206,15 +207,19 @@ private static ChannelFuture obtainChannelFuture(RequestConfiguration configurat
for (ChannelRecord channelRecord : channels) {
Channel channel = channelRecord.channel;
if (channel.isOpen() && channel.attr(IN_USE).get().compareAndSet(false, true)) {
LOGGER.finest(() -> "Reusing -> " + channel.hashCode());
LOGGER.finest(() -> "Setting in use -> true");
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest(() -> "Reusing -> " + channel.hashCode() + ", settting in use -> true");
}
return channelRecord.channelFuture;
}
LOGGER.finest(() -> "Not accepted -> " + channel.hashCode());
LOGGER.finest(() -> "Open -> " + channel.isOpen());
LOGGER.finest(() -> "In use -> " + channel.attr(IN_USE).get());
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest(() -> "Not accepted -> " + channel.hashCode() + ", open -> "
+ channel.isOpen() + ", in use -> " + channel.attr(IN_USE).get());
}
}
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest(() -> "New connection to -> " + connectionIdent);
}
LOGGER.finest(() -> "New connection to -> " + connectionIdent);
URI uri = connectionIdent.base;
ChannelFuture connect = bootstrap.connect(uri.getHost(), uri.getPort());
Channel channel = connect.channel();
Expand All @@ -227,9 +232,10 @@ private static ChannelFuture obtainChannelFuture(RequestConfiguration configurat
}

static void removeChannelFromCache(ConnectionIdent key, Channel channel) {
LOGGER.finest(() -> "Removing from channel cache.");
LOGGER.finest(() -> "Connection ident -> " + key);
LOGGER.finest(() -> "Channel -> " + channel.hashCode());
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest(() -> "Removing from channel cache. Connection ident -> " + key
+ ", channel -> " + channel.hashCode());
}
CHANNEL_CACHE.get(key).remove(new ChannelRecord(channel));
}

Expand Down Expand Up @@ -581,8 +587,10 @@ private Single<WebClientResponse> invoke(Flow.Publisher<DataChunk> requestEntity
: bootstrap.connect(finalUri.getHost(), finalUri.getPort());

channelFuture.addListener((ChannelFutureListener) future -> {
LOGGER.finest(() -> "(client reqID: " + requestId + ") "
+ "Channel hashcode -> " + channelFuture.channel().hashCode());
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest(() -> "(client reqID: " + requestId + ") "
+ "Channel hashcode -> " + channelFuture.channel().hashCode());
}
channelFuture.channel().attr(REQUEST).set(clientRequest);
channelFuture.channel().attr(RESPONSE_RECEIVED).set(false);
channelFuture.channel().attr(RECEIVED).set(responseReceived);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2021 Oracle and/or its affiliates.
* Copyright (c) 2017, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -56,7 +56,6 @@
* The BareResponseImpl.
*/
class BareResponseImpl implements BareResponse {

private static final Logger LOGGER = Logger.getLogger(BareResponseImpl.class.getName());

// See HttpConversionUtil.ExtensionHeaderNames
Expand All @@ -76,6 +75,7 @@ class BareResponseImpl implements BareResponse {
private final HttpHeaders requestHeaders;
private final ChannelFuture channelClosedFuture;
private final GenericFutureListener<? extends Future<? super Void>> channelClosedListener;
private final CompletableFuture<ChannelFutureListener> originalEntityAnalyzed;

// Accessed by Subscriber method threads
private Flow.Subscription subscription;
Expand Down Expand Up @@ -103,6 +103,7 @@ class BareResponseImpl implements BareResponse {
CompletableFuture<ChannelFutureListener> requestEntityAnalyzed,
long requestId) {
this.requestContext = requestContext;
this.originalEntityAnalyzed = requestEntityAnalyzed;
this.requestEntityAnalyzed = requestEntityAnalyzed;
this.responseFuture = new CompletableFuture<>();
this.headersFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -192,26 +193,44 @@ public void writeStatusAndHeaders(Http.ResponseStatus status, Map<String, List<S
// Add keep alive header as per:
// http://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
// If already set (e.g. WebSocket upgrade), do not override
if (keepAlive) {
// if response Connection header is set explicitly to close, we can ignore the following
if (!keepAlive || HttpHeaderValues.CLOSE.contentEqualsIgnoreCase(response.headers().get(HttpHeaderNames.CONNECTION))) {
response.headers().remove(HttpHeaderNames.CONNECTION);
originalEntityAnalyzed.complete(ChannelFutureListener.CLOSE);
} else {
if (!requestContext.requestCompleted()) {
LOGGER.finer(() -> log("Request content not fully read with keep-alive: true", channel));
if (!requestContext.hasRequests() || requestContext.requestCancelled()) {
requestEntityAnalyzed = requestEntityAnalyzed.thenApply(listener -> {
if (listener.equals(ChannelFutureListener.CLOSE)) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
} else if (!headers.containsKey(HttpHeaderNames.CONNECTION.toString())) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
return listener;
});
//We are not sure which Connection header value should be set.
//If unhandled entity is only one content large, we can keep the keep-alive
channel.read();
} else {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
requestEntityAnalyzed.complete(ChannelFutureListener.CLOSE);
throw new IllegalStateException("Cannot request entity and send response without "
+ "waiting for it to be handled");

if (!isWebSocketUpgrade) {
if (requestContext.isDataRequested()) {
// there are pending requests, we have emitted some data and request was not explicitly canceled
// this is a bug in code, where entity is requested and not fully processed
// throwing an exception here is a breaking change (also this may be an intermittent problem
// as it may depend on thread race)
HttpRequest request = requestContext.request();
LOGGER.warning("Entity was requested and not fully consumed before a response is sent. "
+ "This is not supported. Connection will be closed. Please fix your route for "
+ request.method() + " " + request.uri());

// let's close this connection, as it is in an unexpected state
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
originalEntityAnalyzed.complete(ChannelFutureListener.CLOSE);
} else {
// we want to consume the entity and keep alive
// entity must be consumed here, so we do not close connection in forwarding handler
// because of unconsumed payload (the following code will only succeed if there is no subscriber)
requestContext.publisher()
.forEach(DataChunk::release)
.onComplete(() -> {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
originalEntityAnalyzed.complete(ChannelFutureListener.CLOSE_ON_FAILURE);
})
.onError(t -> {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
originalEntityAnalyzed.complete(ChannelFutureListener.CLOSE);
})
.ignoreElement();
}
}
} else if (!headers.containsKey(HttpHeaderNames.CONNECTION.toString())) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
Expand All @@ -220,8 +239,8 @@ public void writeStatusAndHeaders(Http.ResponseStatus status, Map<String, List<S

// Content length optimization attempt
if (!lengthOptimization) {
LOGGER.fine(() -> log("Writing headers %s", status));
requestEntityAnalyzed = requestEntityAnalyzed.thenApply(listener -> {
LOGGER.fine(() -> log("Writing headers %s", status));
Copy link
Member

@spericas spericas Feb 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the assignment in the line above, shouldn't we rename
requestEntityAnalyzed? Its name is too close to the new one added, yet its completion reflects more than just the request entity being processed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand - it is the same future. We just need to keep the original instance and complete on it, as the "last" isntance in the field will not reflect thenApply and other methods registered on previous instances.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's exactly the point, the last instance on this field is related to the writing of the response and no longer the analysis of the request entity (as its name suggests). My comment is just about naming, not semantics. Certainly not a stopper.

requestContext.runInScope(() -> orderedWrite(this::initWriteResponse));
return listener;
});
Expand Down Expand Up @@ -377,21 +396,7 @@ public void onSubscribe(Flow.Subscription subscription) {
return;
}
this.subscription = Objects.requireNonNull(subscription, "subscription is null");

// TyrusSupport controls order of writes manually
if (isWebSocketUpgrade) {
subscription.request(1);
} else {
// Callback deferring first request for data after:
// - Request stream has been completed
requestEntityAnalyzed.whenComplete((channelFutureListener, throwable) -> {
subscription.request(1);
});
if (keepAlive) {
//Auxiliary read, does nothing in case of pending read
channel.read();
}
}
subscription.request(1);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ public class ForwardingHandler extends SimpleChannelInboundHandler<Object> {
private CompletableFuture<ChannelFutureListener> requestEntityAnalyzed;
private CompletableFuture<?> prevRequestFuture;
private boolean lastContent;
private boolean hadContentAlready;

ForwardingHandler(Routing routing,
NettyWebServer webServer,
Expand All @@ -120,7 +119,6 @@ public class ForwardingHandler extends SimpleChannelInboundHandler<Object> {

private void reset() {
lastContent = false;
hadContentAlready = false;
isWebSocketUpgrade = false;
actualPayloadSize = 0L;
ignorePayload = false;
Expand Down Expand Up @@ -263,19 +261,6 @@ private void channelReadHttpContent(ChannelHandlerContext ctx, Object msg) {
// this is here to handle the case when the content is not readable but we didn't
// exceptionally complete the publisher and close the connection
throw new IllegalStateException("It is not expected to not have readable content.");
} else if (!requestContext.hasRequests()
&& HttpUtil.isKeepAlive(requestContext.request())
&& !requestEntityAnalyzed.isDone()) {
if (hadContentAlready) {
LOGGER.finest(() -> "More than one unhandled content present. Closing the connection.");
requestEntityAnalyzed.complete(ChannelFutureListener.CLOSE);
} else {
//We are checking the unhandled entity, but we cannot be sure if connection should be closed or not.
//Next content has to be checked if it is last chunk. If not close connection.
hadContentAlready = true;
LOGGER.finest(() -> "Requesting the next chunk to determine if the connection should be closed.");
ctx.channel().read();
}
}
}

Expand All @@ -290,7 +275,6 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {

@SuppressWarnings("checkstyle:methodlength")
private boolean channelReadHttpRequest(ChannelHandlerContext ctx, Context requestScope, Object msg) {
hadContentAlready = false;
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine(log("Received HttpRequest: %s. Remote address: %s. Scope id: %s",
ctx,
Expand Down Expand Up @@ -353,7 +337,7 @@ private boolean channelReadHttpRequest(ChannelHandlerContext ctx, Context reques
}

if (publisher.hasRequests()) {
LOGGER.finest(() -> log("Requesting next chunks from Netty", ctx));
LOGGER.finest(() -> log("Requesting next (%d, %d) chunks from Netty", ctx, n, demand));
ctx.channel().read();
} else {
LOGGER.finest(() -> log("No hook action required", ctx));
Expand All @@ -366,7 +350,12 @@ private boolean channelReadHttpRequest(ChannelHandlerContext ctx, Context reques
// If a problem with the request URI, return 400 response
BareRequestImpl bareRequest;
try {
bareRequest = new BareRequestImpl((HttpRequest) msg, publisher, webServer, ctx, sslEngine, requestId);
bareRequest = new BareRequestImpl(request,
requestContextRef.publisher(),
webServer,
ctx,
sslEngine,
requestId);
} catch (IllegalArgumentException e) {
send400BadRequest(ctx, request, e);
return true;
Expand All @@ -376,9 +365,19 @@ private boolean channelReadHttpRequest(ChannelHandlerContext ctx, Context reques
LOGGER.finest(log("Request id: %s", ctx, bareRequest.requestId()));
}

String contentLength = request.headers().get(HttpHeaderNames.CONTENT_LENGTH);

if ("0".equals(contentLength)
|| (contentLength == null
&& !"upgrade".equalsIgnoreCase(request.headers().get(HttpHeaderNames.CONNECTION))
&& !"chunked".equalsIgnoreCase(request.headers().get(HttpHeaderNames.TRANSFER_ENCODING))
&& !"multipart/byteranges".equalsIgnoreCase(request.headers().get(HttpHeaderNames.CONTENT_TYPE)))) {
// no entity
requestContextRef.complete();
}

// If context length is greater than maximum allowed, return 413 response
if (maxPayloadSize >= 0) {
String contentLength = request.headers().get(Http.Header.CONTENT_LENGTH);
if (contentLength != null) {
try {
long value = Long.parseLong(contentLength);
Expand Down Expand Up @@ -439,6 +438,10 @@ private boolean channelReadHttpRequest(ChannelHandlerContext ctx, Context reques
LOGGER.fine(log("Response complete: %s", ctx, System.identityHashCode(msg)));
}
});
/*
TODO we should only send continue in case the entity is request (e.g. we found a route and user started reading it)
This would solve connection close for 404 for requests with entity
*/
if (HttpUtil.is100ContinueExpected(request)) {
send100Continue(ctx, request);
}
Expand Down Expand Up @@ -536,7 +539,8 @@ private void send100Continue(ChannelHandlerContext ctx,
"");

FullHttpResponse response = toNettyResponse(transportResponse);
ctx.write(response);
// we should flush this immediately, as we need the client to send entity
ctx.writeAndFlush(response);
}

/**
Expand All @@ -555,6 +559,8 @@ private void send400BadRequest(ChannelHandlerContext ctx, HttpRequest request, T
t);

FullHttpResponse response = toNettyResponse(handlerResponse);
// 400 -> close connection
response.headers().add(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);

ctx.writeAndFlush(response)
.addListener(future -> ctx.close());
Expand All @@ -575,6 +581,8 @@ private void send413PayloadTooLarge(ChannelHandlerContext ctx, HttpRequest reque
"");

FullHttpResponse response = toNettyResponse(transportResponse);
// too big entity -> close connection
response.headers().add(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);

ctx.writeAndFlush(response)
.addListener(future -> ctx.close());
Expand All @@ -596,7 +604,6 @@ private FullHttpResponse toNettyResponse(TransportResponse handlerResponse) {

HttpHeaders nettyHeaders = response.headers();
headers.forEach(nettyHeaders::add);
nettyHeaders.add(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
return response;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2021 Oracle and/or its affiliates.
* Copyright (c) 2017, 2022 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,8 @@

import io.helidon.common.context.Context;
import io.helidon.common.context.Contexts;
import io.helidon.common.http.DataChunk;
import io.helidon.common.reactive.Multi;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.HttpRequest;
Expand All @@ -35,13 +37,19 @@ class RequestContext {
private final HttpRequest request;
private final Context scope;
private volatile boolean responseCompleted;
private volatile boolean emitted;

RequestContext(HttpRequestScopedPublisher publisher, HttpRequest request, Context scope) {
this.publisher = publisher;
this.request = request;
this.scope = scope;
}

Multi<DataChunk> publisher() {
return Multi.create(publisher)
.peek(something -> emitted = true);
}

HttpRequest request() {
return request;
}
Expand Down Expand Up @@ -78,6 +86,19 @@ boolean hasRequests() {
return publisher.hasRequests();
}

/**
* Has there been a request for content.
*
* @return {@code true} if data was requested and request was not cancelled
*/
boolean isDataRequested() {
return (hasRequests() || hasEmitted()) || requestCancelled();
}

boolean hasEmitted() {
return emitted;
}

/**
* Is request content cancelled.
*
Expand Down
Loading