Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't re-use *Exceptions when stored in static fields if enableSuppre… #958

Merged
merged 6 commits into from
Mar 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
/**
* Thrown when no host is available but at least one is required.
*/
public final class NoAvailableHostException extends RuntimeException implements RetryableException {
public class NoAvailableHostException extends RuntimeException implements RetryableException {
private static final long serialVersionUID = 5340791072245425967L;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public final class RepeatStrategies {
public static final class TerminateRepeatException extends Exception {
private static final long serialVersionUID = -1725458427890873886L;

// It is fine to reuse this instance and let it escape to the user as enableSuppression is set to false.
static final TerminateRepeatException INSTANCE = new TerminateRepeatException();

// Package-private as the user should never instance it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ private static final class DefaultH2ClientParentConnection extends AbstractH2Par
bs = new Http2StreamChannelBootstrap(connection.channel());
}

@Override
boolean hasSubscriber() {
return subscriber != null;
}

@Override
void tryCompleteSubscriber() {
if (subscriber != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.servicetalk.transport.netty.internal.FlushStrategyHolder;
import io.servicetalk.transport.netty.internal.NettyChannelListenableAsyncCloseable;
import io.servicetalk.transport.netty.internal.NettyConnectionContext;
import io.servicetalk.transport.netty.internal.StacklessClosedChannelException;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -49,7 +50,6 @@

import java.net.SocketAddress;
import java.net.SocketOption;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.Delayed;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
Expand All @@ -63,7 +63,6 @@
import static io.servicetalk.concurrent.api.Processors.newCompletableProcessor;
import static io.servicetalk.concurrent.api.Processors.newSingleProcessor;
import static io.servicetalk.concurrent.api.SourceAdapters.fromSource;
import static io.servicetalk.concurrent.internal.ThrowableUtils.unknownStackTrace;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_2_0;
import static io.servicetalk.http.netty.H2ToStH1Utils.DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_MILLIS;
import static io.servicetalk.transport.netty.internal.NettyIoExecutors.fromNettyEventLoop;
Expand All @@ -73,11 +72,7 @@

class H2ParentConnectionContext extends NettyChannelListenableAsyncCloseable implements NettyConnectionContext,
HttpConnectionContext {
private static final ClosedChannelException CLOSED_CHANNEL_INACTIVE = unknownStackTrace(
new ClosedChannelException(), H2ClientParentConnectionContext.class, "channelInactive(..)");
private static final ClosedChannelException CLOSED_HANDLER_REMOVED =
unknownStackTrace(new ClosedChannelException(), H2ClientParentConnectionContext.class,
"handlerRemoved(..)");

private static final AtomicIntegerFieldUpdater<H2ParentConnectionContext> activeChildChannelsUpdater =
AtomicIntegerFieldUpdater.newUpdater(H2ParentConnectionContext.class, "activeChildChannels");
private static final Logger LOGGER = LoggerFactory.getLogger(H2ParentConnectionContext.class);
Expand Down Expand Up @@ -264,6 +259,8 @@ abstract static class AbstractH2ParentConnection extends ChannelInboundHandlerAd
this.waitForSslHandshake = waitForSslHandshake;
}

abstract boolean hasSubscriber();

abstract void tryCompleteSubscriber();

abstract void tryFailSubscriber(Throwable cause);
Expand Down Expand Up @@ -291,13 +288,19 @@ public final void channelActive(ChannelHandlerContext ctx) {

@Override
public final void channelInactive(ChannelHandlerContext ctx) {
tryFailSubscriber(CLOSED_CHANNEL_INACTIVE);
if (hasSubscriber()) {
tryFailSubscriber(StacklessClosedChannelException.newInstance(
H2ParentConnectionContext.class, "channelInactive(...)"));
}
doConnectionCleanup();
}

@Override
public final void handlerRemoved(ChannelHandlerContext ctx) {
tryFailSubscriber(CLOSED_HANDLER_REMOVED);
if (hasSubscriber()) {
tryFailSubscriber(StacklessClosedChannelException.newInstance(
H2ParentConnectionContext.class, "handlerRemoved(...)"));
}
doConnectionCleanup();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ private static final class DefaultH2ServerParentConnection extends AbstractH2Par
this.subscriber = requireNonNull(subscriber);
}

@Override
boolean hasSubscriber() {
return subscriber != null;
}

@Override
void tryCompleteSubscriber() {
if (subscriber != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.servicetalk.concurrent.api.Single;
import io.servicetalk.concurrent.api.internal.SpScPublisherProcessor;
import io.servicetalk.concurrent.internal.SequentialCancellable;
import io.servicetalk.concurrent.internal.ThrowableUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -57,7 +58,6 @@
import static io.servicetalk.concurrent.api.Single.failed;
import static io.servicetalk.concurrent.api.Single.succeeded;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.concurrent.internal.ThrowableUtils.unknownStackTrace;
import static java.util.Collections.binarySearch;
import static java.util.Collections.emptyList;
import static java.util.Comparator.comparing;
Expand Down Expand Up @@ -87,12 +87,6 @@ public final class RoundRobinLoadBalancer<ResolvedAddress, C extends LoadBalance
implements LoadBalancer<C> {

private static final Logger LOGGER = LoggerFactory.getLogger(RoundRobinLoadBalancer.class);
private static final IllegalStateException LB_CLOSED_SELECT_CNX_EXCEPTION =
unknownStackTrace(new IllegalStateException("LoadBalancer has closed"), RoundRobinLoadBalancer.class,
"selectConnection0(...)");
private static final NoAvailableHostException NO_ACTIVE_HOSTS_SELECT_CNX_EXCEPTION =
unknownStackTrace(new NoAvailableHostException("No hosts are available to connect."),
RoundRobinLoadBalancer.class, "selectConnection0(...)");

private static final AtomicReferenceFieldUpdater<RoundRobinLoadBalancer, List> activeHostsUpdater =
newUpdater(RoundRobinLoadBalancer.class, List.class, "activeHosts");
Expand Down Expand Up @@ -250,13 +244,14 @@ public Publisher<Object> eventStream() {

private Single<C> selectConnection0(Predicate<C> selector) {
if (closed) {
return failed(LB_CLOSED_SELECT_CNX_EXCEPTION);
return failed(new IllegalStateException("LoadBalancer has closed"));
}

final List<Host<ResolvedAddress, C>> activeHosts = this.activeHosts;
if (activeHosts.isEmpty()) {
// This is the case when SD has emitted some items but none of the hosts are active.
return failed(NO_ACTIVE_HOSTS_SELECT_CNX_EXCEPTION);
return failed(StacklessNoAvailableHostException.newInstance(
"No hosts are available to connect.", RoundRobinLoadBalancer.class, "selectConnection0(...)"));
}

final int cursor = (indexUpdater.getAndIncrement(this) & Integer.MAX_VALUE) % activeHosts.size();
Expand Down Expand Up @@ -312,7 +307,7 @@ private Single<C> selectConnection0(Predicate<C> selector) {
existing = connections;
}

return failed(LB_CLOSED_SELECT_CNX_EXCEPTION);
return failed(new IllegalStateException("LoadBalancer has closed"));
}
return succeeded(newCnx);
}
Expand Down Expand Up @@ -433,4 +428,19 @@ private static final class MutableAddressHost<Addr, C extends ListenableAsyncClo
@Nullable
Addr mutableAddress;
}

private static final class StacklessNoAvailableHostException extends NoAvailableHostException {
private StacklessNoAvailableHostException(final String message) {
super(message);
}

@Override
public Throwable fillInStackTrace() {
return this;
}

public static StacklessNoAvailableHostException newInstance(String message, Class<?> clazz, String method) {
return ThrowableUtils.unknownStackTrace(new StacklessNoAvailableHostException(message), clazz, method);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@

import java.net.SocketAddress;
import java.net.SocketOption;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Supplier;
import javax.annotation.Nullable;
Expand All @@ -62,7 +61,6 @@
import static io.servicetalk.concurrent.api.Publisher.failed;
import static io.servicetalk.concurrent.api.SourceAdapters.fromSource;
import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
import static io.servicetalk.concurrent.internal.ThrowableUtils.unknownStackTrace;
import static io.servicetalk.transport.netty.internal.ChannelSet.CHANNEL_CLOSEABLE_KEY;
import static io.servicetalk.transport.netty.internal.CloseHandler.UNSUPPORTED_PROTOCOL_CLOSE_HANDLER;
import static io.servicetalk.transport.netty.internal.Flush.composeFlushes;
Expand All @@ -86,12 +84,6 @@ public final class DefaultNettyConnection<Read, Write> extends NettyChannelListe
private static final ChannelOutboundListener PLACE_HOLDER_OUTBOUND_LISTENER = new NoopChannelOutboundListener();
private static final ChannelOutboundListener SINGLE_ITEM_OUTBOUND_LISTENER = new NoopChannelOutboundListener();

private static final ClosedChannelException CLOSED_CHANNEL_INACTIVE = unknownStackTrace(
new ClosedChannelException(), NettyToStChannelInboundHandler.class, "channelInactive(..)");
private static final ClosedChannelException CLOSED_FAIL_ACTIVE =
unknownStackTrace(new ClosedChannelException(), DefaultNettyConnection.class, "failIfWriteActive(..)");
private static final ClosedChannelException CLOSED_HANDLER_REMOVED =
unknownStackTrace(new ClosedChannelException(), NettyToStChannelInboundHandler.class, "handlerRemoved(..)");
private static final AtomicReferenceFieldUpdater<DefaultNettyConnection, ChannelOutboundListener>
writableListenerUpdater = newUpdater(DefaultNettyConnection.class, ChannelOutboundListener.class,
"channelOutboundListener");
Expand Down Expand Up @@ -450,7 +442,8 @@ private boolean failIfWriteActive(ChannelOutboundListener newChannelOutboundList
// never notify the write writeSubscriber of the inactive event. So if the channel is inactive we notify
// the writeSubscriber.
if (!channel().isActive()) {
newChannelOutboundListener.channelClosed(CLOSED_FAIL_ACTIVE);
newChannelOutboundListener.channelClosed(StacklessClosedChannelException.newInstance(
DefaultNettyConnection.class, "failIfWriteActive(...)"));
return false;
}
return true;
Expand Down Expand Up @@ -561,7 +554,10 @@ public void handlerAdded(ChannelHandlerContext ctx) {

@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
tryFailSubscriber(CLOSED_HANDLER_REMOVED);
if (subscriber != null) {
tryFailSubscriber(StacklessClosedChannelException.newInstance(
DefaultNettyConnection.class, "handlerRemoved(...)"));
}
}

@Override
Expand All @@ -587,7 +583,8 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
connection.channelOutboundListener.channelOutboundClosed();
} else if (evt == ChannelOutputShutdownEvent.INSTANCE) {
connection.closeHandler.channelClosedOutbound(ctx);
connection.channelOutboundListener.channelClosed(CLOSED_CHANNEL_INACTIVE);
connection.channelOutboundListener.channelClosed(StacklessClosedChannelException.newInstance(
DefaultNettyConnection.class, "userEventTriggered(...)"));
} else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
// Notify close handler first to enhance error reporting
connection.closeHandler.channelClosedInbound(ctx);
Expand Down Expand Up @@ -621,8 +618,10 @@ public void channelActive(ChannelHandlerContext ctx) {

@Override
public void channelInactive(ChannelHandlerContext ctx) {
tryFailSubscriber(CLOSED_CHANNEL_INACTIVE);
connection.channelOutboundListener.channelClosed(CLOSED_CHANNEL_INACTIVE);
Throwable closedChannelException = StacklessClosedChannelException.newInstance(
DefaultNettyConnection.class, "channelInactive(...)");
tryFailSubscriber(closedChannelException);
connection.channelOutboundListener.channelClosed(closedChannelException);
connection.nettyChannelPublisher.channelInboundClosed();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;

import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.function.Predicate;
Expand All @@ -34,12 +33,9 @@
import static io.servicetalk.concurrent.internal.FlowControlUtils.addWithOverflowProtection;
import static io.servicetalk.concurrent.internal.SubscriberUtils.isRequestNValid;
import static io.servicetalk.concurrent.internal.SubscriberUtils.newExceptionForInvalidRequestN;
import static io.servicetalk.concurrent.internal.ThrowableUtils.unknownStackTrace;
import static java.util.Objects.requireNonNull;

final class NettyChannelPublisher<T> extends SubscribablePublisher<T> {
private static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION =
unknownStackTrace(new ClosedChannelException(), NettyChannelPublisher.class, "channelInboundClosed");

// All state is only touched from eventloop.
private long requestCount;
Expand Down Expand Up @@ -120,8 +116,10 @@ void exceptionCaught(Throwable throwable) {

void channelInboundClosed() {
assertInEventloop();
fatalError = CLOSED_CHANNEL_EXCEPTION;
exceptionCaught(CLOSED_CHANNEL_EXCEPTION);
Throwable error = StacklessClosedChannelException.newInstance(
NettyChannelPublisher.class, "channelInboundClosed");
fatalError = error;
exceptionCaught(error);
}

// All private methods MUST be invoked from the eventloop.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright © 2020 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.transport.netty.internal;

import io.servicetalk.concurrent.internal.ThrowableUtils;

import java.nio.channels.ClosedChannelException;

/**
* {@link ClosedChannelException} that will not not fill in the stacktrace but use a cheaper way of producing
* limited stacktrace details for the user.
*/
public final class StacklessClosedChannelException extends ClosedChannelException {

private StacklessClosedChannelException() { }

@Override
public Throwable fillInStackTrace() {
// Don't fill in the stacktrace to reduce performance overhead
return this;
}

/**
* Creates a new {@link StacklessClosedChannelException} instance.
*
* @param clazz The class in which this {@link StacklessClosedChannelException} will be used.
* @param method The method from which it will be thrown.
* @return a new instance.
*/
public static StacklessClosedChannelException newInstance(Class<?> clazz, String method) {
return ThrowableUtils.unknownStackTrace(new StacklessClosedChannelException(), clazz, method);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ public void testNoErrorEnrichmentWithoutCloseHandlerOnError() {
writeListener.verifyFailure(exCaptor);

// Exception should NOT be of type CloseEventObservedException
assertThat(exCaptor.getValue(), instanceOf(ClosedChannelException.class));
assertThat(exCaptor.getValue(), instanceOf(StacklessClosedChannelException.class));
assertThat(exCaptor.getValue().getCause(), nullValue());
assertThat(exCaptor.getValue().getStackTrace()[0].getClassName(),
equalTo(DefaultNettyConnection.class.getName()));
Expand Down