-
Notifications
You must be signed in to change notification settings - Fork 898
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Record empty responses when retrying a peer task #5509
Changes from 4 commits
1378afc
a20c5ed
82051cb
c695c32
ea4a469
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
import org.hyperledger.besu.ethereum.core.BlockHeader; | ||
import org.hyperledger.besu.ethereum.eth.manager.EthContext; | ||
import org.hyperledger.besu.ethereum.eth.manager.EthPeer; | ||
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult; | ||
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask; | ||
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask; | ||
import org.hyperledger.besu.ethereum.eth.messages.snap.StorageRangeMessage; | ||
|
@@ -45,7 +46,7 @@ private RetryingGetStorageRangeFromPeerTask( | |
final Bytes32 endKeyHash, | ||
final BlockHeader blockHeader, | ||
final MetricsSystem metricsSystem) { | ||
super(ethContext, 4, data -> data.proofs().isEmpty() && data.slots().isEmpty(), metricsSystem); | ||
super(ethContext, 4, metricsSystem); | ||
this.ethContext = ethContext; | ||
this.accountHashes = accountHashes; | ||
this.startKeyHash = startKeyHash; | ||
|
@@ -72,11 +73,16 @@ protected CompletableFuture<StorageRangeMessage.SlotRangeData> executePeerTask( | |
GetStorageRangeFromPeerTask.forStorageRange( | ||
ethContext, accountHashes, startKeyHash, endKeyHash, blockHeader, metricsSystem); | ||
assignedPeer.ifPresent(task::assignPeer); | ||
return executeSubTask(task::run) | ||
.thenApply( | ||
peerResult -> { | ||
result.complete(peerResult.getResult()); | ||
return peerResult.getResult(); | ||
}); | ||
return executeSubTask(task::run).thenApply(PeerTaskResult::getResult); | ||
} | ||
|
||
@Override | ||
protected boolean emptyResult(final StorageRangeMessage.SlotRangeData peerResult) { | ||
return peerResult.proofs().isEmpty() && peerResult.slots().isEmpty(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here, an empty response might be a signal that we are asking for old ranges. |
||
} | ||
|
||
@Override | ||
protected boolean successfulResult(final StorageRangeMessage.SlotRangeData peerResult) { | ||
return !emptyResult(peerResult); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ | |
|
||
import org.hyperledger.besu.ethereum.eth.manager.EthContext; | ||
import org.hyperledger.besu.ethereum.eth.manager.EthPeer; | ||
import org.hyperledger.besu.ethereum.eth.manager.exceptions.IncompleteResultsException; | ||
import org.hyperledger.besu.ethereum.eth.manager.exceptions.MaxRetriesReachedException; | ||
import org.hyperledger.besu.ethereum.eth.manager.exceptions.NoAvailablePeersException; | ||
import org.hyperledger.besu.ethereum.eth.manager.exceptions.PeerBreachedProtocolException; | ||
|
@@ -27,15 +28,21 @@ | |
import java.util.Optional; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.TimeoutException; | ||
import java.util.function.Predicate; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
/** | ||
* A task that will retry a fixed number of times before completing the associated CompletableFuture | ||
* exceptionally with a new {@link MaxRetriesReachedException}. If the future returned from {@link | ||
* #executePeerTask(Optional)} is complete with a non-empty list the retry counter is reset. | ||
* #executePeerTask(Optional)} is considered an empty result by {@link #emptyResult(Object)} the | ||
* peer is demoted, if the result is complete according to {@link #successfulResult(Object)} then | ||
* the final task result is set, otherwise the result is considered partial and the retry counter is | ||
* reset. | ||
* | ||
* <p><b>Note:</b> extending classes should never set the final task result, using {@code | ||
* result.complete} by themselves, but should return true from {@link #successfulResult(Object)} | ||
* when done. | ||
* | ||
* @param <T> The type as a typed list that the peer task can get partial or full results in. | ||
*/ | ||
|
@@ -44,26 +51,20 @@ public abstract class AbstractRetryingPeerTask<T> extends AbstractEthTask<T> { | |
private static final Logger LOG = LoggerFactory.getLogger(AbstractRetryingPeerTask.class); | ||
private final EthContext ethContext; | ||
private final int maxRetries; | ||
private final Predicate<T> isEmptyResponse; | ||
private final MetricsSystem metricsSystem; | ||
private int retryCount = 0; | ||
private Optional<EthPeer> assignedPeer = Optional.empty(); | ||
|
||
/** | ||
* @param ethContext The context of the current Eth network we are attached to. | ||
* @param maxRetries Maximum number of retries to accept before completing exceptionally. | ||
* @param isEmptyResponse Test if the response received was empty. | ||
* @param metricsSystem The metrics system used to measure task. | ||
*/ | ||
protected AbstractRetryingPeerTask( | ||
final EthContext ethContext, | ||
final int maxRetries, | ||
final Predicate<T> isEmptyResponse, | ||
final MetricsSystem metricsSystem) { | ||
final EthContext ethContext, final int maxRetries, final MetricsSystem metricsSystem) { | ||
super(metricsSystem); | ||
this.ethContext = ethContext; | ||
this.maxRetries = maxRetries; | ||
this.isEmptyResponse = isEmptyResponse; | ||
this.metricsSystem = metricsSystem; | ||
} | ||
|
||
|
@@ -93,26 +94,36 @@ protected void executeTask() { | |
if (error != null) { | ||
handleTaskError(error); | ||
} else { | ||
// If we get a partial success, reset the retry counter. | ||
if (!isEmptyResponse.test(peerResult)) { | ||
retryCount = 0; | ||
if (successfulResult(peerResult)) { | ||
result.complete(peerResult); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder how it worked before without this success part. Is this something that is done automatically and that we don't really need? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Before each extending task had to set the final result by itself, using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good idea |
||
} else { | ||
if (emptyResult(peerResult)) { | ||
// record this empty response, so that the peer will be disconnected if there | ||
// were too many | ||
assignedPeer.ifPresent( | ||
peer -> peer.recordUselessResponse(getClass().getSimpleName())); | ||
} else { | ||
// If we get a partial success, reset the retry counter | ||
retryCount = 0; | ||
} | ||
// retry | ||
executeTaskTimed(); | ||
} | ||
executeTaskTimed(); | ||
} | ||
}); | ||
} | ||
|
||
protected abstract CompletableFuture<T> executePeerTask(Optional<EthPeer> assignedPeer); | ||
|
||
protected void handleTaskError(final Throwable error) { | ||
final Throwable cause = ExceptionUtils.rootCause(error); | ||
if (!isRetryableError(cause)) { | ||
final Throwable rootCause = ExceptionUtils.rootCause(error); | ||
if (!isRetryableError(rootCause)) { | ||
// Complete exceptionally | ||
result.completeExceptionally(cause); | ||
result.completeExceptionally(rootCause); | ||
return; | ||
} | ||
|
||
if (cause instanceof NoAvailablePeersException) { | ||
if (rootCause instanceof NoAvailablePeersException) { | ||
LOG.debug( | ||
"No useful peer found, wait max 5 seconds for new peer to connect: current peers {}", | ||
ethContext.getEthPeers().peerCount()); | ||
|
@@ -130,7 +141,7 @@ protected void handleTaskError(final Throwable error) { | |
LOG.debug( | ||
"Retrying after recoverable failure from peer task {}: {}", | ||
this.getClass().getSimpleName(), | ||
cause.getMessage()); | ||
rootCause.getMessage()); | ||
// Wait before retrying on failure | ||
executeSubTask( | ||
() -> | ||
|
@@ -139,14 +150,16 @@ protected void handleTaskError(final Throwable error) { | |
.scheduleFutureTask(this::executeTaskTimed, Duration.ofSeconds(1))); | ||
} | ||
|
||
protected boolean isRetryableError(final Throwable error) { | ||
return error instanceof TimeoutException || (!assignedPeer.isPresent() && isPeerFailure(error)); | ||
protected boolean isRetryableError(final Throwable rootCause) { | ||
return rootCause instanceof IncompleteResultsException | ||
|| rootCause instanceof TimeoutException | ||
|| (!assignedPeer.isPresent() && isPeerFailure(rootCause)); | ||
} | ||
|
||
protected boolean isPeerFailure(final Throwable error) { | ||
return error instanceof PeerBreachedProtocolException | ||
|| error instanceof PeerDisconnectedException | ||
|| error instanceof NoAvailablePeersException; | ||
protected boolean isPeerFailure(final Throwable rootCause) { | ||
return rootCause instanceof PeerBreachedProtocolException | ||
|| rootCause instanceof PeerDisconnectedException | ||
|| rootCause instanceof NoAvailablePeersException; | ||
} | ||
|
||
protected EthContext getEthContext() { | ||
|
@@ -164,4 +177,23 @@ public int getRetryCount() { | |
public int getMaxRetries() { | ||
return maxRetries; | ||
} | ||
|
||
/** | ||
* Identify if the result is empty. | ||
* | ||
* @param peerResult the result to check | ||
* @return true if the result is empty and the peer should be demoted and the request retried | ||
*/ | ||
protected abstract boolean emptyResult(final T peerResult); | ||
|
||
/** | ||
* Identify a successful and complete result. Partial results that are not considered successful | ||
* should return false, so that the request is retried. This check has precedence over the {@link | ||
* #emptyResult(Object)}, so if an empty result is also successful the task completes successfully | ||
* with an empty result. | ||
* | ||
* @param peerResult the result to check | ||
* @return true if the result is successful and can be set as the task result | ||
*/ | ||
protected abstract boolean successfulResult(final T peerResult); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,13 +20,12 @@ | |
import org.hyperledger.besu.ethereum.eth.manager.exceptions.NoAvailablePeersException; | ||
import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason; | ||
import org.hyperledger.besu.plugin.services.MetricsSystem; | ||
import org.hyperledger.besu.util.ExceptionUtils; | ||
|
||
import java.util.HashSet; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.TimeoutException; | ||
import java.util.function.Predicate; | ||
import java.util.stream.Stream; | ||
|
||
import org.slf4j.Logger; | ||
|
@@ -41,11 +40,8 @@ public abstract class AbstractRetryingSwitchingPeerTask<T> extends AbstractRetry | |
private final Set<EthPeer> failedPeers = new HashSet<>(); | ||
|
||
protected AbstractRetryingSwitchingPeerTask( | ||
final EthContext ethContext, | ||
final MetricsSystem metricsSystem, | ||
final Predicate<T> isEmptyResponse, | ||
final int maxRetries) { | ||
super(ethContext, maxRetries, isEmptyResponse, metricsSystem); | ||
final EthContext ethContext, final MetricsSystem metricsSystem, final int maxRetries) { | ||
super(ethContext, maxRetries, metricsSystem); | ||
} | ||
|
||
@Override | ||
|
@@ -93,24 +89,19 @@ protected CompletableFuture<T> executePeerTask(final Optional<EthPeer> assignedP | |
.addArgument(peerToUse) | ||
.addArgument(this::getRetryCount) | ||
.log(); | ||
result.complete(peerResult); | ||
return peerResult; | ||
}); | ||
} | ||
|
||
@Override | ||
protected void handleTaskError(final Throwable error) { | ||
if (isPeerFailure(error)) { | ||
getAssignedPeer().ifPresent(peer -> failedPeers.add(peer)); | ||
final Throwable rootCause = ExceptionUtils.rootCause(error); | ||
if (isPeerFailure(rootCause)) { | ||
getAssignedPeer().ifPresent(failedPeers::add); | ||
} | ||
super.handleTaskError(error); | ||
} | ||
|
||
@Override | ||
protected boolean isRetryableError(final Throwable error) { | ||
return error instanceof TimeoutException || isPeerFailure(error); | ||
} | ||
|
||
private Optional<EthPeer> selectNextPeer() { | ||
final Optional<EthPeer> maybeNextPeer = remainingPeersToTry().findFirst(); | ||
|
||
|
@@ -136,7 +127,7 @@ private void refreshPeers() { | |
// If we are at max connections, then refresh peers disconnecting one of the failed peers, | ||
// or the least useful | ||
|
||
if (peers.peerCount() >= peers.getMaxPeers()) { | ||
if (peers.peerCount() >= peers.getPeerLowerBound()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this be upperBound? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I want to force Besu to actively search for new peers, and my understanding is that this happens when the number of peers is below the lower bound There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe add a comment (or edit the existing comment) because the code is now doing something slightly different |
||
failedPeers.stream() | ||
.filter(peer -> !peer.isDisconnected()) | ||
.findAny() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An empty response for SnapProtocol/v1 is not necessarily a reason to demote a peer. If we are requesting a range that is outside of the 128 block snap range, an empty response is in-protocol. We should add additional criteria to only demote peers that give empty range that is withing 128 blocks of head.
Otherwise we might end up with snap sync performance regression by dropping peers from which we ask for old ranges (while we have an old pivot block)