Skip to content

Commit

Permalink
Mark any exception before the first message is written as retryable (#…
Browse files Browse the repository at this point in the history
…1641)

Motivation:

If the `Channel` is already closed before we attempt the first write, we
wrap it with `AbortedFirstWrite` in `WriteStreamSubscriber` and re-wrap
with `RetryableClosureException` in `DefaultNettyConnection`. This
approach has a couple of disadvantages:
1. `AbortedFirstWrite` is not a `RetryableException` and if the
`closeReason` is not known in `DefaultNettyConnection`, the original
exception will be propagated as-is without `RetryableException` marker;
2. `WriteStreamSubscriber` prematurely sets `written` flag before we
attempt the first write, that may also fail even if the `Channel` is not
closed;

Modifications:
- `WriteStreamSubscriber`: mark internal state as `written` only after
the first msg successfully goes through the outbound channel pipeline;
- Rename `AbortedFirstWrite` -> `AbortedFirstWriteException` and make it
retryable `IOException` because it indicates only IO failures;
- Wrap any error that happened before the first msg is written with
`AbortedFirstWriteException`;
- Adjust `DefaultNettyConnection#enrichError` to correctly unwrap
`AbortedFirstWriteException`;
- Adjust tests for new behavior;

Result:

Any failure before the first message is written is marked as retryable.
  • Loading branch information
idelpivnitskiy authored and bondolo committed Jul 2, 2021
1 parent 1c19c0a commit a0c4a43
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ static boolean responseMayHaveContent(final int statusCode,

static ScanWithMapper<Object, Object> insertTrailersMapper() {
return new ScanWithMapper<Object, Object>() {
@Nullable

private boolean sawHeaders;

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.servicetalk.concurrent.api.TestSubscription;
import io.servicetalk.concurrent.test.internal.TestPublisherSubscriber;
import io.servicetalk.transport.api.ConnectionInfo.Protocol;
import io.servicetalk.transport.api.RetryableException;
import io.servicetalk.transport.netty.internal.DefaultNettyConnection;
import io.servicetalk.transport.netty.internal.EmbeddedDuplexChannel;
import io.servicetalk.transport.netty.internal.FlushStrategy;
Expand Down Expand Up @@ -323,13 +324,19 @@ void readCancelClosesConnectionThenWriteDoesNotSubscribe() {
@Test
void writeErrorFailsPendingReadsDoesNotSubscribeToPendingWrites() {
toSource(requester.write(writePublisher1)).subscribe(readSubscriber);
assertTrue(writePublisher1.isSubscribed());
Subscription readSubscription = readSubscriber.awaitSubscription();
readSubscription.request(1);
toSource(requester.write(writePublisher2)).subscribe(readSubscriber2);

writePublisher1.onError(DELIBERATE_EXCEPTION);
assertThat(readSubscriber.awaitOnError(), is(DELIBERATE_EXCEPTION));
assertThat(readSubscriber2.awaitOnError(), is(instanceOf(ClosedChannelException.class)));
final Throwable firstError = readSubscriber.awaitOnError();
assertThat(firstError, instanceOf(RetryableException.class));
assertThat(firstError.getCause(), is(DELIBERATE_EXCEPTION));
final Throwable secondError = readSubscriber2.awaitOnError();
assertThat(secondError, instanceOf(RetryableException.class));
assertThat(secondError, instanceOf(ClosedChannelException.class));
assertThat(secondError.getCause(), instanceOf(ClosedChannelException.class));
assertTrue(writePublisher2.isSubscribed());
assertFalse(channel.isOpen());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.transport.api.ConnectionInfo;
import io.servicetalk.transport.api.ExecutionContext;
import io.servicetalk.transport.api.RetryableException;
import io.servicetalk.transport.netty.internal.ChannelInitializer;
import io.servicetalk.transport.netty.internal.NettyConnection;

Expand All @@ -29,6 +30,7 @@
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.mockito.ArgumentCaptor;

import java.util.concurrent.ExecutionException;
import java.util.function.Function;
Expand All @@ -37,7 +39,11 @@
import static io.servicetalk.concurrent.api.Publisher.from;
import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION;
import static io.servicetalk.transport.api.ServiceTalkSocketOptions.IDLE_TIMEOUT;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentCaptor.forClass;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -136,7 +142,10 @@ public void testConnectionClosed() throws Exception {
Publisher.failed(DELIBERATE_EXCEPTION)).toFuture().get());
verify(clientDataObserver).onNewWrite();
verify(clientWriteObserver).requestedToWrite(anyLong());
verify(clientWriteObserver).writeFailed(DELIBERATE_EXCEPTION);
ArgumentCaptor<Throwable> exceptionCaptor = forClass(Throwable.class);
verify(clientWriteObserver).writeFailed(exceptionCaptor.capture());
assertThat(exceptionCaptor.getValue(), instanceOf(RetryableException.class));
assertThat(exceptionCaptor.getValue().getCause(), is(DELIBERATE_EXCEPTION));
break;
case CLIENT_IDLE_TIMEOUT:
case SERVER_IDLE_TIMEOUT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ public enum CloseEvent {
this.description = description;
}

Throwable wrapError(@Nullable Throwable cause, Channel channel) {
CloseEventObservedException wrapError(@Nullable Throwable cause, Channel channel) {
return new CloseEventObservedException(cause, this, channel);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@
import io.servicetalk.transport.api.DefaultExecutionContext;
import io.servicetalk.transport.api.ExecutionContext;
import io.servicetalk.transport.api.ExecutionStrategy;
import io.servicetalk.transport.api.RetryableException;
import io.servicetalk.transport.api.ServiceTalkSocketOptions;
import io.servicetalk.transport.netty.internal.CloseHandler.AbortWritesEvent;
import io.servicetalk.transport.netty.internal.CloseHandler.CloseEvent;
import io.servicetalk.transport.netty.internal.CloseHandler.CloseEventObservedException;
import io.servicetalk.transport.netty.internal.NoopTransportObserver.NoopConnectionObserver;
import io.servicetalk.transport.netty.internal.NoopTransportObserver.NoopDataObserver;
import io.servicetalk.transport.netty.internal.WriteStreamSubscriber.AbortedFirstWrite;
import io.servicetalk.transport.netty.internal.WriteStreamSubscriber.AbortedFirstWriteException;

import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
Expand All @@ -61,6 +62,7 @@

import java.net.SocketAddress;
import java.net.SocketOption;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand Down Expand Up @@ -337,30 +339,34 @@ public void onComplete() {

private Throwable enrichError(final Throwable t) {
Throwable throwable;
if (t instanceof AbortedFirstWrite) {
final Throwable cause = t.getCause();
if (closeReason != null) {
throwable = new RetryableClosureException(wrapIfReasonIsKnown(cause));
CloseEvent closeReason;
if (t instanceof AbortedFirstWriteException) {
if ((closeReason = this.closeReason) != null) {
throwable = new RetryableClosedChannelException(wrapWithCloseReason(closeReason, t.getCause()));
} else if (t.getCause() instanceof RetryableException) {
// Unwrap additional layer of RetryableException if the cause is already retryable
throwable = t.getCause();
} else if (t.getCause() instanceof ClosedChannelException) {
throwable = new RetryableClosedChannelException((ClosedChannelException) t.getCause());
} else {
throwable = cause;
throwable = t;
}
} else if (t instanceof RetryableClosureException) {
} else if (t instanceof RetryableClosedChannelException) {
throwable = t;
} else {
throwable = wrapIfReasonIsKnown(t);
if ((closeReason = this.closeReason) != null) {
throwable = wrapWithCloseReason(closeReason, t);
} else {
throwable = enrichProtocolError.apply(t);
}
}
throwable = enrichProtocolError.apply(throwable);
transportError.onSuccess(throwable);
return throwable;
}

private Throwable wrapIfReasonIsKnown(final Throwable t) {
final CloseEvent closeReason = this.closeReason;
if (closeReason == null) {
return t;
}
private ClosedChannelException wrapWithCloseReason(final CloseEvent closeReason, final Throwable t) {
if (t instanceof CloseEventObservedException && ((CloseEventObservedException) t).event() == closeReason) {
return t;
return (ClosedChannelException) t;
}
return closeReason.wrapError(t, channel());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,16 @@
/**
* Indicates that an error happened due to connection closure, but is retryable.
*/
class RetryableClosureException extends ClosedChannelException implements RetryableException {
final class RetryableClosedChannelException extends ClosedChannelException implements RetryableException {
private static final long serialVersionUID = 2006969744518089407L;

RetryableClosureException(final Throwable cause) {
RetryableClosedChannelException(final ClosedChannelException cause) {
initCause(cause);
}

@Override
public Throwable fillInStackTrace() {
// We don't need stack trace because it's just a retryable wrapper for original ClosedChannelException
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.servicetalk.concurrent.internal.ConcurrentSubscription;
import io.servicetalk.concurrent.internal.FlowControlUtils;
import io.servicetalk.transport.api.ConnectionObserver.WriteObserver;
import io.servicetalk.transport.api.RetryableException;
import io.servicetalk.transport.netty.internal.DefaultNettyConnection.ChannelOutboundListener;

import io.netty.channel.Channel;
Expand All @@ -35,6 +36,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
Expand Down Expand Up @@ -339,12 +341,12 @@ boolean isWritable() {

void writeNext(Object msg) {
assert eventLoop.inEventLoop();
if (!written) {
written = true;
}
activeWrites++;
listenersOnWriteBoundaries.addLast(WRITE_BOUNDARY);
channel.write(msg, this);
if (!written) {
written = true;
}
}

void sourceTerminated(@Nullable Throwable cause) {
Expand Down Expand Up @@ -390,7 +392,7 @@ void close(Throwable cause) {
setFlag(CHANNEL_CLOSED);
// subscriber has not terminated, no writes are pending and channel has closed so terminate the
// subscriber with a failure.
tryFailure(!written ? new AbortedFirstWrite(cause) : cause);
tryFailure(cause);
}
}

Expand Down Expand Up @@ -481,17 +483,19 @@ private void terminateSubscriber(@Nullable Throwable cause) {
closeHandler.closeChannelOutbound(channel);
}
} else {
final Throwable enrichedCause = enrichProtocolError.apply(cause);
Throwable enrichedCause = enrichProtocolError.apply(cause);
assignConnectionError(channel, enrichedCause);
enrichedCause = !written ? new AbortedFirstWriteException(enrichedCause) : enrichedCause;
try {
observer.writeFailed(enrichedCause);
assignConnectionError(channel, enrichedCause);
subscriber.onError(enrichedCause);
} catch (Throwable t) {
t.addSuppressed(enrichedCause);
tryFailureOrLog(t);
}
if (!hasFlag(CHANNEL_CLOSED)) {
// Close channel on error.
ChannelCloseUtils.close(channel, enrichedCause);
// Close channel on error, connection error is already assigned to the channel's attribute
channel.close();
}
}
// Notify listeners after the subscriber is terminated. Otherwise, WriteStreamSubscriber#channelClosed may
Expand Down Expand Up @@ -542,9 +546,16 @@ private void setFlag(final byte flag) {
}
}

static final class AbortedFirstWrite extends Exception {
AbortedFirstWrite(final Throwable cause) {
super(null, cause, false, false);
static final class AbortedFirstWriteException extends IOException implements RetryableException {
private static final long serialVersionUID = -5626706348233302247L;

AbortedFirstWriteException(final Throwable cause) {
super(cause);
}

@Override
public Throwable fillInStackTrace() {
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.servicetalk.concurrent.test.internal.TestPublisherSubscriber;
import io.servicetalk.transport.api.ConnectionInfo.Protocol;
import io.servicetalk.transport.netty.internal.NoopTransportObserver.NoopConnectionObserver;
import io.servicetalk.transport.netty.internal.WriteStreamSubscriber.AbortedFirstWriteException;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundBuffer;
Expand Down Expand Up @@ -165,7 +166,9 @@ void testWriteActiveWithPublisher() {
@Test
void testPublisherErrorFailsWrite() {
toSource(conn.write(Publisher.failed(DELIBERATE_EXCEPTION))).subscribe(writeListener);
assertThat(writeListener.awaitOnError(), is(DELIBERATE_EXCEPTION));
final Throwable error = writeListener.awaitOnError();
assertThat(error, instanceOf(AbortedFirstWriteException.class));
assertThat(error.getCause(), is(DELIBERATE_EXCEPTION));
}

@Test
Expand Down Expand Up @@ -304,7 +307,7 @@ void testErrorEnrichmentWithCloseHandlerOnWriteError() throws Exception {
toSource(conn.read()).subscribe(subscriber);
Throwable cause = writeListener.awaitOnError(); // ClosedChannelException was translated
// Exception should be of type CloseEventObservedException
assertThat(cause, instanceOf(RetryableClosureException.class));
assertThat(cause, instanceOf(RetryableClosedChannelException.class));
assertThat(cause.getCause(), instanceOf(ClosedChannelException.class));
assertThat(cause.getCause().getMessage(), equalTo(
"CHANNEL_CLOSED_OUTBOUND(The transport backing this connection has been shutdown (write)) " +
Expand All @@ -327,8 +330,10 @@ void testNoErrorEnrichmentWithoutCloseHandlerOnError() {
channel.close().syncUninterruptibly();
toSource(conn.write(publisher)).subscribe(writeListener);

Throwable cause = writeListener.awaitOnError();
// Exception should NOT be of type CloseEventObservedException
Throwable error = writeListener.awaitOnError();
assertThat(error, instanceOf(RetryableClosedChannelException.class));
Throwable cause = error.getCause();
// Error cause should NOT be of type CloseEventObservedException
assertThat(cause, instanceOf(StacklessClosedChannelException.class));
assertThat(cause.getCause(), nullValue());
assertThat(cause.getStackTrace()[0].getClassName(), equalTo(DefaultNettyConnection.class.getName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ void afterRequestBeforeSendingResponse() {
PublisherSource.Processor<String, String> writeSource = newPublisherProcessor();
StepVerifiers.create(conn.write(fromSource(writeSource)))
.then(this::closeNotifyAndVerifyClosing)
.expectError(RetryableClosureException.class)
.expectError(RetryableClosedChannelException.class)
.verify();
}

Expand All @@ -88,7 +88,7 @@ void whileReadingRequestBeforeSendingResponse() {
closeNotifyAndVerifyClosing();
})
.expectNext(BEGIN)
.expectError(RetryableClosureException.class)
.expectError(RetryableClosedChannelException.class)
.verify();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

import io.servicetalk.concurrent.PublisherSource.Subscription;
import io.servicetalk.transport.netty.internal.NoopTransportObserver.NoopWriteObserver;
import io.servicetalk.transport.netty.internal.WriteStreamSubscriber.AbortedFirstWriteException;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;

import java.nio.channels.ClosedChannelException;

Expand All @@ -29,7 +31,9 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentCaptor.forClass;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -82,7 +86,10 @@ void testMultipleItem() {
@Test
void testOnErrorNoWrite() throws InterruptedException {
subscriber.onError(DELIBERATE_EXCEPTION);
verify(this.completableSubscriber).onError(DELIBERATE_EXCEPTION);
ArgumentCaptor<Throwable> exceptionCaptor = forClass(Throwable.class);
verify(this.completableSubscriber).onError(exceptionCaptor.capture());
assertThat(exceptionCaptor.getValue(), instanceOf(AbortedFirstWriteException.class));
assertThat(exceptionCaptor.getValue().getCause(), is(DELIBERATE_EXCEPTION));
assertChannelClose();
}

Expand Down

0 comments on commit a0c4a43

Please sign in to comment.