Skip to content

Commit

Permalink
shutdownOutput() for SSL connections prematurely closes the Channel (
Browse files Browse the repository at this point in the history
…#1502)

Motivation:

Before `RequestResponseCloseHandler` invokes `shutdownOutput()`, it forces
`SslHandler` to close `SSLEngine` and send `close_notify` to the remote peer.
Because `SSLEngine` is closed, `SslHandler` will generate
`SslCloseCompletionEvent` on the next read regardless of receiving the
`close_notify` alert from the remote peer or not. This premature event is
handled incorrectly by the `DefaultNettyConnection` and forces the channel
closure, breaking graceful closure handling on the server-side.

Modifications:

- Move handling of `SslCloseCompletionEvent` to the `CloseHandler` that is
aware of the current state;
- `RequestResponseCloseHandler` now ignores `SslCloseCompletionEvent` if it
knows that the outbound was shutdown. It will wait for the `FIN` from
remote peer to finish graceful closure;
- Enhance `RequestResponseCloseHandlerTest` to validate new behavior;

Result:

1. `shutdownOutput()` does not break graceful closure.
2. #1192 use-case
`[4: protocol=HTTP_1 initiateClosureFromClient=false useUds=false viaProxy=true]`
is not flaky anymore.
  • Loading branch information
idelpivnitskiy authored May 5, 2021
1 parent ad7e8e1 commit 2bc89b5
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.channel.EventLoop;
import io.netty.channel.socket.DuplexChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslCloseCompletionEvent;

import java.nio.channels.ClosedChannelException;
import java.util.function.Consumer;
Expand Down Expand Up @@ -129,6 +130,16 @@ public static CloseHandler forNonPipelined(boolean isClient, ChannelConfig confi
*/
abstract void channelClosedOutbound(ChannelHandlerContext ctx);

/**
* Signal {@link Channel} observed {@link SslCloseCompletionEvent#SUCCESS}.
* <p>
* Received <a href="https://tools.ietf.org/html/rfc5246#section-7.2.1">close_notify</a> alert from the peer.
* This message notifies that the sender will not send any more messages on this connection.
*
* @param ctx {@link ChannelHandlerContext}
*/
abstract void channelCloseNotify(ChannelHandlerContext ctx);

/**
* Request {@link Channel} inbound close, to be emitted from the {@link EventLoop} for the channel.
* <p>
Expand Down Expand Up @@ -250,6 +261,10 @@ void channelClosedInbound(final ChannelHandlerContext ctx) {
void channelClosedOutbound(final ChannelHandlerContext ctx) {
}

@Override
void channelCloseNotify(final ChannelHandlerContext ctx) {
}

@Override
void closeChannelInbound(final Channel channel) {
channel.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -643,14 +643,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
connection.channelOutboundListener.channelClosed(StacklessClosedChannelException.newInstance(
DefaultNettyConnection.class, "userEventTriggered(ChannelOutputShutdownEvent)"));
} else if (evt == SslCloseCompletionEvent.SUCCESS) {
// Received "close_notify" alert from the peer: https://tools.ietf.org/html/rfc5246#section-7.2.1.
// This message notifies that the sender will not send any more messages on this connection.

// Notify close handler first to enhance error reporting and prevent LB from selecting this connection
connection.closeHandler.channelClosedInbound(ctx);
// We MUST respond with a "close_notify" alert and close down the connection immediately,
// discarding any pending writes.
connection.closeHandler.closeChannelOutbound(ctx.channel());
connection.closeHandler.channelCloseNotify(ctx);
} else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
// Notify close handler first to enhance error reporting and prevent LB from selecting this connection
connection.closeHandler.channelClosedInbound(ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.function.Consumer;
import javax.annotation.Nullable;
Expand All @@ -31,6 +33,8 @@
import static java.util.Objects.requireNonNull;

final class NonPipelinedCloseHandler extends CloseHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(NonPipelinedCloseHandler.class);

private static final byte READ = 0x1;
private static final byte WRITE = 0x2;
private static final byte IN_CLOSED = 0x4;
Expand Down Expand Up @@ -62,7 +66,7 @@ public void protocolPayloadBeginInbound(final ChannelHandlerContext ctx) {
@Override
public void protocolPayloadEndInbound(final ChannelHandlerContext ctx) {
state = unset(state, READ);
inboundEventCheckClose(ctx.channel());
inboundEventCheckClose(ctx.channel(), closeEvent);
}

@Override
Expand All @@ -75,22 +79,24 @@ public void protocolPayloadEndOutbound(final ChannelHandlerContext ctx, final Ch
ctx.pipeline().fireUserEventTriggered(OutboundDataEndEvent.INSTANCE);
promise.addListener(f -> {
state = unset(state, WRITE);
outboundEventCheckClose(ctx.channel());
outboundEventCheckClose(ctx.channel(), closeEvent);
});
}

@Override
public void protocolClosingInbound(final ChannelHandlerContext ctx) {
state = set(state, IN_CLOSED);
storeCloseRequestAndEmit(PROTOCOL_CLOSING_INBOUND);
inboundEventCheckClose(ctx.channel());
final CloseEvent evt = PROTOCOL_CLOSING_INBOUND;
storeCloseRequestAndEmit(evt);
inboundEventCheckClose(ctx.channel(), evt);
}

@Override
public void protocolClosingOutbound(final ChannelHandlerContext ctx) {
state = set(state, OUT_CLOSED);
storeCloseRequestAndEmit(PROTOCOL_CLOSING_OUTBOUND);
outboundEventCheckClose(ctx.channel());
final CloseEvent evt = PROTOCOL_CLOSING_OUTBOUND;
storeCloseRequestAndEmit(evt);
outboundEventCheckClose(ctx.channel(), evt);
}

@Override
Expand All @@ -100,44 +106,57 @@ void registerEventHandler(final Channel channel, final Consumer<CloseEvent> even

@Override
void channelClosedInbound(final ChannelHandlerContext ctx) {
state = unset(set(state, IN_CLOSED), READ);
storeCloseRequestAndEmit(CHANNEL_CLOSED_INBOUND);
inboundEventCheckClose(ctx.channel());
if (!has(state, IN_CLOSED)) {
state = unset(set(state, IN_CLOSED), READ);
final CloseEvent evt = CHANNEL_CLOSED_INBOUND;
storeCloseRequestAndEmit(evt);
inboundEventCheckClose(ctx.channel(), evt);
}
}

@Override
void channelClosedOutbound(final ChannelHandlerContext ctx) {
state = unset(set(state, OUT_CLOSED), WRITE);
storeCloseRequestAndEmit(CHANNEL_CLOSED_OUTBOUND);
outboundEventCheckClose(ctx.channel());
if (!has(state, OUT_CLOSED)) {
state = unset(set(state, OUT_CLOSED), WRITE);
final CloseEvent evt = CHANNEL_CLOSED_OUTBOUND;
storeCloseRequestAndEmit(evt);
outboundEventCheckClose(ctx.channel(), evt);
}
}

@Override
void channelCloseNotify(final ChannelHandlerContext ctx) {
channelClosedInbound(ctx);
closeChannelOutbound(ctx.channel());
}

@Override
void closeChannelInbound(final Channel channel) {
state = set(state, IN_CLOSED);
// todo storeCloseRequestAndEmit ?
inboundEventCheckClose(channel);
inboundEventCheckClose(channel, closeEvent);
}

@Override
void closeChannelOutbound(final Channel channel) {
state = set(state, OUT_CLOSED);
// todo storeCloseRequestAndEmit ?
outboundEventCheckClose(channel);
outboundEventCheckClose(channel, closeEvent);
}

@Override
void gracefulUserClosing(final Channel channel) {
state = set(state, GRACEFUL_CLOSE);
storeCloseRequestAndEmit(GRACEFUL_USER_CLOSING);
final CloseEvent evt = GRACEFUL_USER_CLOSING;
storeCloseRequestAndEmit(evt);
if (!isAnySet(state, READ_WRITE)) {
closeChannel(channel);
closeChannel(channel, evt);
}
}

private void inboundEventCheckClose(final Channel channel) {
private void inboundEventCheckClose(final Channel channel, @Nullable final CloseEvent evt) {
if (isAllSet(state, OUT_CLOSED) || (isAnySet(state, GRACEFUL_IN_CLOSED) && !isAllSet(state, WRITE))) {
closeChannel(channel);
closeChannel(channel, evt);
} else if (isAllSet(state, CLIENT_IN_WRITE)) {
// If a client inbound has closed while writing we abort the write because we can't be sure if the write
// will ever complete or receive any additional feedback form the server.
Expand All @@ -146,9 +165,9 @@ private void inboundEventCheckClose(final Channel channel) {
}
}

private void outboundEventCheckClose(final Channel channel) {
private void outboundEventCheckClose(final Channel channel, @Nullable final CloseEvent evt) {
if (isAllSet(state, IN_CLOSED) || (isAnySet(state, GRACEFUL_OUT_CLOSED) && !isAllSet(state, READ))) {
closeChannel(channel);
closeChannel(channel, evt);
}
}

Expand All @@ -159,9 +178,10 @@ private void storeCloseRequestAndEmit(final CloseEvent event) {
eventHandler.accept(event);
}

private void closeChannel(final Channel channel) {
private void closeChannel(final Channel channel, @Nullable final CloseEvent evt) {
if (!has(state, CLOSED)) {
state = set(state, ALL_CLOSED);
LOGGER.trace("{} Closing channel – evt: {}", channel, evt);
channel.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,25 +210,45 @@ public void protocolClosingOutbound(final ChannelHandlerContext ctx) {
@Override
void channelClosedInbound(final ChannelHandlerContext ctx) {
assert ctx.executor().inEventLoop();
state = set(state, IN_CLOSED);
// Use the actual event that initiated graceful closure:
final CloseEvent evt = has(state, CLOSING_SERVER_GRACEFULLY) ? closeEvent : CHANNEL_CLOSED_INBOUND;
assert evt != null;
storeCloseRequestAndEmit(evt);
maybeCloseChannelOnHalfClosed(ctx.channel(), evt);
state = unset(state, READ);
if (!has(state, IN_CLOSED)) {
state = set(state, IN_CLOSED);
// Use the actual event that initiated graceful closure:
final CloseEvent evt = has(state, CLOSING_SERVER_GRACEFULLY) ? closeEvent : CHANNEL_CLOSED_INBOUND;
assert evt != null;
storeCloseRequestAndEmit(evt);
maybeCloseChannelOnHalfClosed(ctx.channel(), evt);
state = unset(state, READ);
}
}

@Override
void channelClosedOutbound(final ChannelHandlerContext ctx) {
assert ctx.executor().inEventLoop();
state = set(state, OUT_CLOSED);
storeCloseRequestAndEmit(CHANNEL_CLOSED_OUTBOUND);
if (!has(state, CLOSING_SERVER_GRACEFULLY)) {
// Only try to close when we are not closing server gracefully
maybeCloseChannelOnHalfClosed(ctx.channel(), CHANNEL_CLOSED_OUTBOUND);
if (!has(state, OUT_CLOSED)) {
state = set(state, OUT_CLOSED);
storeCloseRequestAndEmit(CHANNEL_CLOSED_OUTBOUND);
if (!has(state, CLOSING_SERVER_GRACEFULLY)) {
// Only try to close when we are not closing server gracefully
maybeCloseChannelOnHalfClosed(ctx.channel(), CHANNEL_CLOSED_OUTBOUND);
}
state = unset(state, WRITE);
}
state = unset(state, WRITE);
}

@Override
void channelCloseNotify(final ChannelHandlerContext ctx) {
if (hasAny(state, OUT_CLOSED, CLOSING_SERVER_GRACEFULLY)) {
// We already closed outbound side of the channel, which triggers closure of SSLEngine and results in
// SslCloseCompletionEvent#SUCCESS event generated immediately. Connection is already in a closing state,
// we should ignore this event and wait for ChannelInputShutdownReadComplete from the remote peer.
return;
}

// Notify close handler first to enhance error reporting and prevent LB from selecting this connection.
channelClosedInbound(ctx);
// We MUST respond with a "close_notify" alert and close down the connection immediately, discarding any pending
// writes.
closeChannelOutbound(ctx.channel());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandlerTest.Scenarios.Events.IC;
import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandlerTest.Scenarios.Events.ID;
import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandlerTest.Scenarios.Events.IE;
import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandlerTest.Scenarios.Events.IH;
import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandlerTest.Scenarios.Events.IS;
import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandlerTest.Scenarios.Events.OB;
import static io.servicetalk.transport.netty.internal.RequestResponseCloseHandlerTest.Scenarios.Events.OC;
Expand Down Expand Up @@ -354,6 +355,7 @@ public void setup() {
outputShutdown.set(true);
LOGGER.debug("channel.shutdownOutput()");
h.channelClosedOutbound(ctx); // ChannelOutputShutdownEvent observed from transport
h.channelCloseNotify(ctx); // SslHandler generates SslCloseCompletionEvent immediately
return future;
});
when(channel.close()).then(__ -> {
Expand Down Expand Up @@ -443,9 +445,15 @@ public void simulate() {
break;
case IH:
order.verify(channel).shutdownInput();
order.verify(h).channelClosedInbound(ctx);
break;
case OH:
order.verify(channel).shutdownOutput();
order.verify(h).channelClosedOutbound(ctx);
// Verify shutdownOutput() triggers channelCloseNotify, but actually it is no-op
order.verify(h).channelCloseNotify(ctx);
order.verify(h, never()).channelClosedInbound(ctx);
order.verify(h, never()).closeChannelOutbound(channel);
break;
case FC:
order.verify(channel).close();
Expand Down Expand Up @@ -488,8 +496,14 @@ public void simulate() {
verify(h, never()).protocolClosingOutbound(ctx);
break;
case IS:
if (!events.contains(IH)) {
verify(h, never()).channelClosedInbound(ctx);
}
break;
case OS:
// These may be called implicitly, skip verify
if (!events.contains(OH)) {
verify(h, never()).channelClosedOutbound(ctx);
}
break;
case SR:
verify(scc, never()).setSoLinger(0);
Expand Down

0 comments on commit 2bc89b5

Please sign in to comment.