Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record empty responses when retrying a peer task #5509

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.ethereum.eth.messages.snap.AccountRangeMessage;
Expand All @@ -42,8 +43,7 @@ private RetryingGetAccountRangeFromPeerTask(
final Bytes32 endKeyHash,
final BlockHeader blockHeader,
final MetricsSystem metricsSystem) {
super(
ethContext, 4, data -> data.accounts().isEmpty() && data.proofs().isEmpty(), metricsSystem);
super(ethContext, 4, metricsSystem);
this.ethContext = ethContext;
this.startKeyHash = startKeyHash;
this.endKeyHash = endKeyHash;
Expand All @@ -68,11 +68,16 @@ protected CompletableFuture<AccountRangeMessage.AccountRangeData> executePeerTas
GetAccountRangeFromPeerTask.forAccountRange(
ethContext, startKeyHash, endKeyHash, blockHeader, metricsSystem);
assignedPeer.ifPresent(task::assignPeer);
return executeSubTask(task::run)
.thenApply(
peerResult -> {
result.complete(peerResult.getResult());
return peerResult.getResult();
});
return executeSubTask(task::run).thenApply(PeerTaskResult::getResult);
}

@Override
protected boolean emptyResult(final AccountRangeMessage.AccountRangeData data) {
return data.accounts().isEmpty() && data.proofs().isEmpty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An empty response for SnapProtocol/v1 is not necessarily a reason to demote a peer. If we are requesting a range that is outside of the 128 block snap range, an empty response is in-protocol. We should add additional criteria to only demote peers that give empty range that is withing 128 blocks of head.

Otherwise we might end up with snap sync performance regression by dropping peers from which we ask for old ranges (while we have an old pivot block)

}

@Override
protected boolean successfulResult(final AccountRangeMessage.AccountRangeData peerResult) {
return !emptyResult(peerResult);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.plugin.services.MetricsSystem;
Expand All @@ -41,7 +42,7 @@ private RetryingGetBytecodeFromPeerTask(
final List<Bytes32> codeHashes,
final BlockHeader blockHeader,
final MetricsSystem metricsSystem) {
super(ethContext, 4, Map::isEmpty, metricsSystem);
super(ethContext, 4, metricsSystem);
this.ethContext = ethContext;
this.codeHashes = codeHashes;
this.blockHeader = blockHeader;
Expand All @@ -62,11 +63,16 @@ protected CompletableFuture<Map<Bytes32, Bytes>> executePeerTask(
final GetBytecodeFromPeerTask task =
GetBytecodeFromPeerTask.forBytecode(ethContext, codeHashes, blockHeader, metricsSystem);
assignedPeer.ifPresent(task::assignPeer);
return executeSubTask(task::run)
.thenApply(
peerResult -> {
result.complete(peerResult.getResult());
return peerResult.getResult();
});
return executeSubTask(task::run).thenApply(PeerTaskResult::getResult);
}

@Override
protected boolean emptyResult(final Map<Bytes32, Bytes> peerResult) {
return peerResult.isEmpty();
}

@Override
protected boolean successfulResult(final Map<Bytes32, Bytes> peerResult) {
return !emptyResult(peerResult);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.ethereum.eth.messages.snap.StorageRangeMessage;
Expand Down Expand Up @@ -45,7 +46,7 @@ private RetryingGetStorageRangeFromPeerTask(
final Bytes32 endKeyHash,
final BlockHeader blockHeader,
final MetricsSystem metricsSystem) {
super(ethContext, 4, data -> data.proofs().isEmpty() && data.slots().isEmpty(), metricsSystem);
super(ethContext, 4, metricsSystem);
this.ethContext = ethContext;
this.accountHashes = accountHashes;
this.startKeyHash = startKeyHash;
Expand All @@ -72,11 +73,16 @@ protected CompletableFuture<StorageRangeMessage.SlotRangeData> executePeerTask(
GetStorageRangeFromPeerTask.forStorageRange(
ethContext, accountHashes, startKeyHash, endKeyHash, blockHeader, metricsSystem);
assignedPeer.ifPresent(task::assignPeer);
return executeSubTask(task::run)
.thenApply(
peerResult -> {
result.complete(peerResult.getResult());
return peerResult.getResult();
});
return executeSubTask(task::run).thenApply(PeerTaskResult::getResult);
}

@Override
protected boolean emptyResult(final StorageRangeMessage.SlotRangeData peerResult) {
return peerResult.proofs().isEmpty() && peerResult.slots().isEmpty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, an empty response might be a signal that we are asking for old ranges.

}

@Override
protected boolean successfulResult(final StorageRangeMessage.SlotRangeData peerResult) {
return !emptyResult(peerResult);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
import org.hyperledger.besu.plugin.services.MetricsSystem;
Expand All @@ -40,7 +41,7 @@ private RetryingGetTrieNodeFromPeerTask(
final Map<Bytes, List<Bytes>> paths,
final BlockHeader blockHeader,
final MetricsSystem metricsSystem) {
super(ethContext, 4, Map::isEmpty, metricsSystem);
super(ethContext, 4, metricsSystem);
this.ethContext = ethContext;
this.paths = paths;
this.blockHeader = blockHeader;
Expand All @@ -61,11 +62,16 @@ protected CompletableFuture<Map<Bytes, Bytes>> executePeerTask(
final GetTrieNodeFromPeerTask task =
GetTrieNodeFromPeerTask.forTrieNodes(ethContext, paths, blockHeader, metricsSystem);
assignedPeer.ifPresent(task::assignPeer);
return executeSubTask(task::run)
.thenApply(
peerResult -> {
result.complete(peerResult.getResult());
return peerResult.getResult();
});
return executeSubTask(task::run).thenApply(PeerTaskResult::getResult);
}

@Override
protected boolean emptyResult(final Map<Bytes, Bytes> peerResult) {
return peerResult.isEmpty();
}

@Override
protected boolean successfulResult(final Map<Bytes, Bytes> peerResult) {
return !emptyResult(peerResult);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import org.hyperledger.besu.ethereum.eth.manager.EthContext;
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.IncompleteResultsException;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.MaxRetriesReachedException;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.NoAvailablePeersException;
import org.hyperledger.besu.ethereum.eth.manager.exceptions.PeerBreachedProtocolException;
Expand All @@ -27,15 +28,21 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A task that will retry a fixed number of times before completing the associated CompletableFuture
* exceptionally with a new {@link MaxRetriesReachedException}. If the future returned from {@link
* #executePeerTask(Optional)} is complete with a non-empty list the retry counter is reset.
* #executePeerTask(Optional)} is considered an empty result by {@link #emptyResult(Object)} the
* peer is demoted, if the result is complete according to {@link #successfulResult(Object)} then
* the final task result is set, otherwise the result is considered partial and the retry counter is
* reset.
*
* <p><b>Note:</b> extending classes should never set the final task result, using {@code
* result.complete} by themselves, but should return true from {@link #successfulResult(Object)}
* when done.
*
* @param <T> The type as a typed list that the peer task can get partial or full results in.
*/
Expand All @@ -44,26 +51,20 @@ public abstract class AbstractRetryingPeerTask<T> extends AbstractEthTask<T> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractRetryingPeerTask.class);
private final EthContext ethContext;
private final int maxRetries;
private final Predicate<T> isEmptyResponse;
private final MetricsSystem metricsSystem;
private int retryCount = 0;
private Optional<EthPeer> assignedPeer = Optional.empty();

/**
* @param ethContext The context of the current Eth network we are attached to.
* @param maxRetries Maximum number of retries to accept before completing exceptionally.
* @param isEmptyResponse Test if the response received was empty.
* @param metricsSystem The metrics system used to measure task.
*/
protected AbstractRetryingPeerTask(
final EthContext ethContext,
final int maxRetries,
final Predicate<T> isEmptyResponse,
final MetricsSystem metricsSystem) {
final EthContext ethContext, final int maxRetries, final MetricsSystem metricsSystem) {
super(metricsSystem);
this.ethContext = ethContext;
this.maxRetries = maxRetries;
this.isEmptyResponse = isEmptyResponse;
this.metricsSystem = metricsSystem;
}

Expand Down Expand Up @@ -93,26 +94,36 @@ protected void executeTask() {
if (error != null) {
handleTaskError(error);
} else {
// If we get a partial success, reset the retry counter.
if (!isEmptyResponse.test(peerResult)) {
retryCount = 0;
if (successfulResult(peerResult)) {
result.complete(peerResult);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder how it worked before without this success part. Is this something that is done automatically and that we don't really need?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before each extending task had to set the final result by itself, using result.complete, and another thing that was a bit confusing to me, is that the returned value of a task is not the final result until you set result.complete and so I tried to centralize this part here and make the writing of extending tasks easier.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea

} else {
if (emptyResult(peerResult)) {
// record this empty response, so that the peer will be disconnected if there
// were too many
assignedPeer.ifPresent(
peer -> peer.recordUselessResponse(getClass().getSimpleName()));
} else {
// If we get a partial success, reset the retry counter
retryCount = 0;
}
// retry
executeTaskTimed();
}
executeTaskTimed();
}
});
}

protected abstract CompletableFuture<T> executePeerTask(Optional<EthPeer> assignedPeer);

protected void handleTaskError(final Throwable error) {
final Throwable cause = ExceptionUtils.rootCause(error);
if (!isRetryableError(cause)) {
final Throwable rootCause = ExceptionUtils.rootCause(error);
if (!isRetryableError(rootCause)) {
// Complete exceptionally
result.completeExceptionally(cause);
result.completeExceptionally(rootCause);
return;
}

if (cause instanceof NoAvailablePeersException) {
if (rootCause instanceof NoAvailablePeersException) {
LOG.debug(
"No useful peer found, wait max 5 seconds for new peer to connect: current peers {}",
ethContext.getEthPeers().peerCount());
Expand All @@ -130,7 +141,7 @@ protected void handleTaskError(final Throwable error) {
LOG.debug(
"Retrying after recoverable failure from peer task {}: {}",
this.getClass().getSimpleName(),
cause.getMessage());
rootCause.getMessage());
// Wait before retrying on failure
executeSubTask(
() ->
Expand All @@ -139,14 +150,16 @@ protected void handleTaskError(final Throwable error) {
.scheduleFutureTask(this::executeTaskTimed, Duration.ofSeconds(1)));
}

protected boolean isRetryableError(final Throwable error) {
return error instanceof TimeoutException || (!assignedPeer.isPresent() && isPeerFailure(error));
protected boolean isRetryableError(final Throwable rootCause) {
return rootCause instanceof IncompleteResultsException
|| rootCause instanceof TimeoutException
|| (!assignedPeer.isPresent() && isPeerFailure(rootCause));
}

protected boolean isPeerFailure(final Throwable error) {
return error instanceof PeerBreachedProtocolException
|| error instanceof PeerDisconnectedException
|| error instanceof NoAvailablePeersException;
protected boolean isPeerFailure(final Throwable rootCause) {
return rootCause instanceof PeerBreachedProtocolException
|| rootCause instanceof PeerDisconnectedException
|| rootCause instanceof NoAvailablePeersException;
}

protected EthContext getEthContext() {
Expand All @@ -164,4 +177,23 @@ public int getRetryCount() {
public int getMaxRetries() {
return maxRetries;
}

/**
* Identify if the result is empty.
*
* @param peerResult the result to check
* @return true if the result is empty and the peer should be demoted and the request retried
*/
protected abstract boolean emptyResult(final T peerResult);

/**
* Identify a successful and complete result. Partial results that are not considered successful
* should return false, so that the request is retried. This check has precedence over the {@link
* #emptyResult(Object)}, so if an empty result is also successful the task completes successfully
* with an empty result.
*
* @param peerResult the result to check
* @return true if the result is successful and can be set as the task result
*/
protected abstract boolean successfulResult(final T peerResult);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@
import org.hyperledger.besu.ethereum.eth.manager.exceptions.NoAvailablePeersException;
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.util.ExceptionUtils;

import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.stream.Stream;

import org.slf4j.Logger;
Expand All @@ -41,11 +40,8 @@ public abstract class AbstractRetryingSwitchingPeerTask<T> extends AbstractRetry
private final Set<EthPeer> failedPeers = new HashSet<>();

protected AbstractRetryingSwitchingPeerTask(
final EthContext ethContext,
final MetricsSystem metricsSystem,
final Predicate<T> isEmptyResponse,
final int maxRetries) {
super(ethContext, maxRetries, isEmptyResponse, metricsSystem);
final EthContext ethContext, final MetricsSystem metricsSystem, final int maxRetries) {
super(ethContext, maxRetries, metricsSystem);
}

@Override
Expand Down Expand Up @@ -93,24 +89,19 @@ protected CompletableFuture<T> executePeerTask(final Optional<EthPeer> assignedP
.addArgument(peerToUse)
.addArgument(this::getRetryCount)
.log();
result.complete(peerResult);
return peerResult;
});
}

@Override
protected void handleTaskError(final Throwable error) {
if (isPeerFailure(error)) {
getAssignedPeer().ifPresent(peer -> failedPeers.add(peer));
final Throwable rootCause = ExceptionUtils.rootCause(error);
if (isPeerFailure(rootCause)) {
getAssignedPeer().ifPresent(failedPeers::add);
}
super.handleTaskError(error);
}

@Override
protected boolean isRetryableError(final Throwable error) {
return error instanceof TimeoutException || isPeerFailure(error);
}

private Optional<EthPeer> selectNextPeer() {
final Optional<EthPeer> maybeNextPeer = remainingPeersToTry().findFirst();

Expand All @@ -136,7 +127,7 @@ private void refreshPeers() {
// If we are at max connections, then refresh peers disconnecting one of the failed peers,
// or the least useful

if (peers.peerCount() >= peers.getMaxPeers()) {
if (peers.peerCount() >= peers.getPeerLowerBound()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be upperBound?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to force Besu to actively search for new peers, and my understanding is that this happens when the number of peers is below the lower bound

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a comment (or edit the existing comment) because the code is now doing something slightly different

failedPeers.stream()
.filter(peer -> !peer.isDisconnected())
.findAny()
Expand Down
Loading