Skip to content

Commit 1378afc

Browse files
committed
Record empty responses when retrying a peer task
Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
1 parent f19fb64 commit 1378afc

File tree

4 files changed

+22
-37
lines changed

4 files changed

+22
-37
lines changed

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTask.java

+16-3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
1818
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
19+
import org.hyperledger.besu.ethereum.eth.manager.exceptions.IncompleteResultsException;
1920
import org.hyperledger.besu.ethereum.eth.manager.exceptions.MaxRetriesReachedException;
2021
import org.hyperledger.besu.ethereum.eth.manager.exceptions.NoAvailablePeersException;
2122
import org.hyperledger.besu.ethereum.eth.manager.exceptions.PeerBreachedProtocolException;
@@ -93,9 +94,19 @@ protected void executeTask() {
9394
if (error != null) {
9495
handleTaskError(error);
9596
} else {
96-
// If we get a partial success, reset the retry counter.
97-
if (!isEmptyResponse.test(peerResult)) {
97+
// If we get a partial success, reset the retry counter, otherwise demote the peer
98+
if (isEmptyResponse.test(peerResult)) {
99+
// record this empty response, so that the peer will be disconnected if there were
100+
// too many
101+
assignedPeer.ifPresent(
102+
peer -> {
103+
peer.recordUselessResponse(getClass().getSimpleName());
104+
throw new IncompleteResultsException(
105+
"Empty response from peer " + peer.getShortNodeId());
106+
});
107+
} else {
98108
retryCount = 0;
109+
assignedPeer.ifPresent(EthPeer::recordUsefulResponse);
99110
}
100111
executeTaskTimed();
101112
}
@@ -140,7 +151,9 @@ protected void handleTaskError(final Throwable error) {
140151
}
141152

142153
protected boolean isRetryableError(final Throwable error) {
143-
return error instanceof TimeoutException || (!assignedPeer.isPresent() && isPeerFailure(error));
154+
return error instanceof IncompleteResultsException
155+
|| error instanceof TimeoutException
156+
|| (!assignedPeer.isPresent() && isPeerFailure(error));
144157
}
145158

146159
protected boolean isPeerFailure(final Throwable error) {

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java

+1-7
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.Optional;
2626
import java.util.Set;
2727
import java.util.concurrent.CompletableFuture;
28-
import java.util.concurrent.TimeoutException;
2928
import java.util.function.Predicate;
3029
import java.util.stream.Stream;
3130

@@ -106,11 +105,6 @@ protected void handleTaskError(final Throwable error) {
106105
super.handleTaskError(error);
107106
}
108107

109-
@Override
110-
protected boolean isRetryableError(final Throwable error) {
111-
return error instanceof TimeoutException || isPeerFailure(error);
112-
}
113-
114108
private Optional<EthPeer> selectNextPeer() {
115109
final Optional<EthPeer> maybeNextPeer = remainingPeersToTry().findFirst();
116110

@@ -136,7 +130,7 @@ private void refreshPeers() {
136130
// If we are at max connections, then refresh peers disconnecting one of the failed peers,
137131
// or the least useful
138132

139-
if (peers.peerCount() >= peers.getMaxPeers()) {
133+
if (peers.peerCount() >= peers.getPeerLowerBound()) {
140134
failedPeers.stream()
141135
.filter(peer -> !peer.isDisconnected())
142136
.findAny()

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetBlocksFromPeersTask.java

+5-13
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.hyperledger.besu.ethereum.core.BlockHeader;
1919
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
2020
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
21-
import org.hyperledger.besu.ethereum.eth.manager.exceptions.IncompleteResultsException;
2221
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
2322
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
2423
import org.hyperledger.besu.plugin.services.MetricsSystem;
@@ -44,7 +43,11 @@ protected RetryingGetBlocksFromPeersTask(
4443
final MetricsSystem metricsSystem,
4544
final int maxRetries,
4645
final List<BlockHeader> headers) {
47-
super(ethContext, metricsSystem, Objects::isNull, maxRetries);
46+
super(
47+
ethContext,
48+
metricsSystem,
49+
res -> Objects.isNull(res) || res.getResult().isEmpty(),
50+
maxRetries);
4851
this.protocolSchedule = protocolSchedule;
4952
this.headers = headers;
5053
}
@@ -77,22 +80,11 @@ protected CompletableFuture<PeerTaskResult<List<Block>>> executeTaskOnCurrentPee
7780
.addArgument(this::getRetryCount)
7881
.log();
7982

80-
if (peerResult.getResult().isEmpty()) {
81-
currentPeer.recordUselessResponse("GetBodiesFromPeerTask");
82-
throw new IncompleteResultsException(
83-
"No blocks returned by peer " + currentPeer.getShortNodeId());
84-
}
85-
8683
result.complete(peerResult);
8784
return peerResult;
8885
});
8986
}
9087

91-
@Override
92-
protected boolean isRetryableError(final Throwable error) {
93-
return super.isRetryableError(error) || error instanceof IncompleteResultsException;
94-
}
95-
9688
@Override
9789
protected void handleTaskError(final Throwable error) {
9890
if (getRetryCount() < getMaxRetries()) {

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetHeadersEndingAtFromPeerByHashTask.java

-14
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.hyperledger.besu.ethereum.core.BlockHeader;
2222
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
2323
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
24-
import org.hyperledger.besu.ethereum.eth.manager.exceptions.IncompleteResultsException;
2524
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
2625
import org.hyperledger.besu.plugin.services.MetricsSystem;
2726

@@ -98,21 +97,8 @@ protected CompletableFuture<List<BlockHeader>> executeTaskOnCurrentPeer(
9897
referenceHash,
9998
currentPeer,
10099
peerResult.getResult());
101-
if (peerResult.getResult().isEmpty()) {
102-
currentPeer.recordUselessResponse("GetHeadersFromPeerByHashTask");
103-
throw new IncompleteResultsException(
104-
"No block headers for hash "
105-
+ referenceHash
106-
+ " returned by peer "
107-
+ currentPeer.getShortNodeId());
108-
}
109100
result.complete(peerResult.getResult());
110101
return peerResult.getResult();
111102
});
112103
}
113-
114-
@Override
115-
protected boolean isRetryableError(final Throwable error) {
116-
return super.isRetryableError(error) || error instanceof IncompleteResultsException;
117-
}
118104
}

0 commit comments

Comments
 (0)