Skip to content

Commit da95d50

Browse files
committed
Review and consolidate the way result is set in retrying tasks
Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
1 parent 1378afc commit da95d50

16 files changed

+194
-126
lines changed

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetAccountRangeFromPeerTask.java

+13-8
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.hyperledger.besu.ethereum.core.BlockHeader;
1818
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
1919
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
20+
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
2021
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
2122
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
2223
import org.hyperledger.besu.ethereum.eth.messages.snap.AccountRangeMessage;
@@ -42,8 +43,7 @@ private RetryingGetAccountRangeFromPeerTask(
4243
final Bytes32 endKeyHash,
4344
final BlockHeader blockHeader,
4445
final MetricsSystem metricsSystem) {
45-
super(
46-
ethContext, 4, data -> data.accounts().isEmpty() && data.proofs().isEmpty(), metricsSystem);
46+
super(ethContext, 4, metricsSystem);
4747
this.ethContext = ethContext;
4848
this.startKeyHash = startKeyHash;
4949
this.endKeyHash = endKeyHash;
@@ -68,11 +68,16 @@ protected CompletableFuture<AccountRangeMessage.AccountRangeData> executePeerTas
6868
GetAccountRangeFromPeerTask.forAccountRange(
6969
ethContext, startKeyHash, endKeyHash, blockHeader, metricsSystem);
7070
assignedPeer.ifPresent(task::assignPeer);
71-
return executeSubTask(task::run)
72-
.thenApply(
73-
peerResult -> {
74-
result.complete(peerResult.getResult());
75-
return peerResult.getResult();
76-
});
71+
return executeSubTask(task::run).thenApply(PeerTaskResult::getResult);
72+
}
73+
74+
@Override
75+
protected boolean emptyResult(final AccountRangeMessage.AccountRangeData data) {
76+
return data.accounts().isEmpty() && data.proofs().isEmpty();
77+
}
78+
79+
@Override
80+
protected boolean completeResult(final AccountRangeMessage.AccountRangeData peerResult) {
81+
return !emptyResult(peerResult);
7782
}
7883
}

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetBytecodeFromPeerTask.java

+13-7
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.hyperledger.besu.ethereum.core.BlockHeader;
1818
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
1919
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
20+
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
2021
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
2122
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
2223
import org.hyperledger.besu.plugin.services.MetricsSystem;
@@ -41,7 +42,7 @@ private RetryingGetBytecodeFromPeerTask(
4142
final List<Bytes32> codeHashes,
4243
final BlockHeader blockHeader,
4344
final MetricsSystem metricsSystem) {
44-
super(ethContext, 4, Map::isEmpty, metricsSystem);
45+
super(ethContext, 4, metricsSystem);
4546
this.ethContext = ethContext;
4647
this.codeHashes = codeHashes;
4748
this.blockHeader = blockHeader;
@@ -62,11 +63,16 @@ protected CompletableFuture<Map<Bytes32, Bytes>> executePeerTask(
6263
final GetBytecodeFromPeerTask task =
6364
GetBytecodeFromPeerTask.forBytecode(ethContext, codeHashes, blockHeader, metricsSystem);
6465
assignedPeer.ifPresent(task::assignPeer);
65-
return executeSubTask(task::run)
66-
.thenApply(
67-
peerResult -> {
68-
result.complete(peerResult.getResult());
69-
return peerResult.getResult();
70-
});
66+
return executeSubTask(task::run).thenApply(PeerTaskResult::getResult);
67+
}
68+
69+
@Override
70+
protected boolean emptyResult(final Map<Bytes32, Bytes> peerResult) {
71+
return peerResult.isEmpty();
72+
}
73+
74+
@Override
75+
protected boolean completeResult(final Map<Bytes32, Bytes> peerResult) {
76+
return !emptyResult(peerResult);
7177
}
7278
}

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetStorageRangeFromPeerTask.java

+13-7
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.hyperledger.besu.ethereum.core.BlockHeader;
1818
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
1919
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
20+
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
2021
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
2122
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
2223
import org.hyperledger.besu.ethereum.eth.messages.snap.StorageRangeMessage;
@@ -45,7 +46,7 @@ private RetryingGetStorageRangeFromPeerTask(
4546
final Bytes32 endKeyHash,
4647
final BlockHeader blockHeader,
4748
final MetricsSystem metricsSystem) {
48-
super(ethContext, 4, data -> data.proofs().isEmpty() && data.slots().isEmpty(), metricsSystem);
49+
super(ethContext, 4, metricsSystem);
4950
this.ethContext = ethContext;
5051
this.accountHashes = accountHashes;
5152
this.startKeyHash = startKeyHash;
@@ -72,11 +73,16 @@ protected CompletableFuture<StorageRangeMessage.SlotRangeData> executePeerTask(
7273
GetStorageRangeFromPeerTask.forStorageRange(
7374
ethContext, accountHashes, startKeyHash, endKeyHash, blockHeader, metricsSystem);
7475
assignedPeer.ifPresent(task::assignPeer);
75-
return executeSubTask(task::run)
76-
.thenApply(
77-
peerResult -> {
78-
result.complete(peerResult.getResult());
79-
return peerResult.getResult();
80-
});
76+
return executeSubTask(task::run).thenApply(PeerTaskResult::getResult);
77+
}
78+
79+
@Override
80+
protected boolean emptyResult(final StorageRangeMessage.SlotRangeData peerResult) {
81+
return peerResult.proofs().isEmpty() && peerResult.slots().isEmpty();
82+
}
83+
84+
@Override
85+
protected boolean completeResult(final StorageRangeMessage.SlotRangeData peerResult) {
86+
return !emptyResult(peerResult);
8187
}
8288
}

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/RetryingGetTrieNodeFromPeerTask.java

+13-7
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.hyperledger.besu.ethereum.core.BlockHeader;
1818
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
1919
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
20+
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask;
2021
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractRetryingPeerTask;
2122
import org.hyperledger.besu.ethereum.eth.manager.task.EthTask;
2223
import org.hyperledger.besu.plugin.services.MetricsSystem;
@@ -40,7 +41,7 @@ private RetryingGetTrieNodeFromPeerTask(
4041
final Map<Bytes, List<Bytes>> paths,
4142
final BlockHeader blockHeader,
4243
final MetricsSystem metricsSystem) {
43-
super(ethContext, 4, Map::isEmpty, metricsSystem);
44+
super(ethContext, 4, metricsSystem);
4445
this.ethContext = ethContext;
4546
this.paths = paths;
4647
this.blockHeader = blockHeader;
@@ -61,11 +62,16 @@ protected CompletableFuture<Map<Bytes, Bytes>> executePeerTask(
6162
final GetTrieNodeFromPeerTask task =
6263
GetTrieNodeFromPeerTask.forTrieNodes(ethContext, paths, blockHeader, metricsSystem);
6364
assignedPeer.ifPresent(task::assignPeer);
64-
return executeSubTask(task::run)
65-
.thenApply(
66-
peerResult -> {
67-
result.complete(peerResult.getResult());
68-
return peerResult.getResult();
69-
});
65+
return executeSubTask(task::run).thenApply(AbstractPeerTask.PeerTaskResult::getResult);
66+
}
67+
68+
@Override
69+
protected boolean emptyResult(final Map<Bytes, Bytes> peerResult) {
70+
return peerResult.isEmpty();
71+
}
72+
73+
@Override
74+
protected boolean completeResult(final Map<Bytes, Bytes> peerResult) {
75+
return !emptyResult(peerResult);
7076
}
7177
}

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTask.java

+18-21
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import java.util.Optional;
2929
import java.util.concurrent.CompletableFuture;
3030
import java.util.concurrent.TimeoutException;
31-
import java.util.function.Predicate;
3231

3332
import org.slf4j.Logger;
3433
import org.slf4j.LoggerFactory;
@@ -45,26 +44,20 @@ public abstract class AbstractRetryingPeerTask<T> extends AbstractEthTask<T> {
4544
private static final Logger LOG = LoggerFactory.getLogger(AbstractRetryingPeerTask.class);
4645
private final EthContext ethContext;
4746
private final int maxRetries;
48-
private final Predicate<T> isEmptyResponse;
4947
private final MetricsSystem metricsSystem;
5048
private int retryCount = 0;
5149
private Optional<EthPeer> assignedPeer = Optional.empty();
5250

5351
/**
5452
* @param ethContext The context of the current Eth network we are attached to.
5553
* @param maxRetries Maximum number of retries to accept before completing exceptionally.
56-
* @param isEmptyResponse Test if the response received was empty.
5754
* @param metricsSystem The metrics system used to measure task.
5855
*/
5956
protected AbstractRetryingPeerTask(
60-
final EthContext ethContext,
61-
final int maxRetries,
62-
final Predicate<T> isEmptyResponse,
63-
final MetricsSystem metricsSystem) {
57+
final EthContext ethContext, final int maxRetries, final MetricsSystem metricsSystem) {
6458
super(metricsSystem);
6559
this.ethContext = ethContext;
6660
this.maxRetries = maxRetries;
67-
this.isEmptyResponse = isEmptyResponse;
6861
this.metricsSystem = metricsSystem;
6962
}
7063

@@ -94,21 +87,21 @@ protected void executeTask() {
9487
if (error != null) {
9588
handleTaskError(error);
9689
} else {
97-
// If we get a partial success, reset the retry counter, otherwise demote the peer
98-
if (isEmptyResponse.test(peerResult)) {
99-
// record this empty response, so that the peer will be disconnected if there were
100-
// too many
101-
assignedPeer.ifPresent(
102-
peer -> {
103-
peer.recordUselessResponse(getClass().getSimpleName());
104-
throw new IncompleteResultsException(
105-
"Empty response from peer " + peer.getShortNodeId());
106-
});
90+
if (completeResult(peerResult)) {
91+
result.complete(peerResult);
10792
} else {
108-
retryCount = 0;
109-
assignedPeer.ifPresent(EthPeer::recordUsefulResponse);
93+
if (emptyResult(peerResult)) {
94+
// record this empty response, so that the peer will be disconnected if there
95+
// were too many
96+
assignedPeer.ifPresent(
97+
peer -> peer.recordUselessResponse(getClass().getSimpleName()));
98+
} else {
99+
// If we get a partial success, reset the retry counter
100+
retryCount = 0;
101+
}
102+
// retry
103+
executeTaskTimed();
110104
}
111-
executeTaskTimed();
112105
}
113106
});
114107
}
@@ -177,4 +170,8 @@ public int getRetryCount() {
177170
public int getMaxRetries() {
178171
return maxRetries;
179172
}
173+
174+
protected abstract boolean emptyResult(final T peerResult);
175+
176+
protected abstract boolean completeResult(final T peerResult);
180177
}

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java

+3-8
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.Optional;
2626
import java.util.Set;
2727
import java.util.concurrent.CompletableFuture;
28-
import java.util.function.Predicate;
2928
import java.util.stream.Stream;
3029

3130
import org.slf4j.Logger;
@@ -40,11 +39,8 @@ public abstract class AbstractRetryingSwitchingPeerTask<T> extends AbstractRetry
4039
private final Set<EthPeer> failedPeers = new HashSet<>();
4140

4241
protected AbstractRetryingSwitchingPeerTask(
43-
final EthContext ethContext,
44-
final MetricsSystem metricsSystem,
45-
final Predicate<T> isEmptyResponse,
46-
final int maxRetries) {
47-
super(ethContext, maxRetries, isEmptyResponse, metricsSystem);
42+
final EthContext ethContext, final MetricsSystem metricsSystem, final int maxRetries) {
43+
super(ethContext, maxRetries, metricsSystem);
4844
}
4945

5046
@Override
@@ -92,15 +88,14 @@ protected CompletableFuture<T> executePeerTask(final Optional<EthPeer> assignedP
9288
.addArgument(peerToUse)
9389
.addArgument(this::getRetryCount)
9490
.log();
95-
result.complete(peerResult);
9691
return peerResult;
9792
});
9893
}
9994

10095
@Override
10196
protected void handleTaskError(final Throwable error) {
10297
if (isPeerFailure(error)) {
103-
getAssignedPeer().ifPresent(peer -> failedPeers.add(peer));
98+
getAssignedPeer().ifPresent(failedPeers::add);
10499
}
105100
super.handleTaskError(error);
106101
}

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetBlockFromPeersTask.java

+11-9
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,10 @@
1818
import org.hyperledger.besu.ethereum.core.Block;
1919
import org.hyperledger.besu.ethereum.eth.manager.EthContext;
2020
import org.hyperledger.besu.ethereum.eth.manager.EthPeer;
21-
import org.hyperledger.besu.ethereum.eth.manager.exceptions.IncompleteResultsException;
2221
import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult;
2322
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
2423
import org.hyperledger.besu.plugin.services.MetricsSystem;
2524

26-
import java.util.Objects;
2725
import java.util.Optional;
2826
import java.util.concurrent.CompletableFuture;
2927

@@ -46,7 +44,7 @@ protected RetryingGetBlockFromPeersTask(
4644
final int maxRetries,
4745
final Optional<Hash> maybeBlockHash,
4846
final long blockNumber) {
49-
super(ethContext, metricsSystem, Objects::isNull, maxRetries);
47+
super(ethContext, metricsSystem, maxRetries);
5048
this.protocolSchedule = protocolSchedule;
5149
this.maybeBlockHash = maybeBlockHash;
5250
this.blockNumber = blockNumber;
@@ -80,16 +78,10 @@ protected CompletableFuture<PeerTaskResult<Block>> executeTaskOnCurrentPeer(
8078
.addArgument(peerResult.getPeer())
8179
.addArgument(this::getRetryCount)
8280
.log();
83-
result.complete(peerResult);
8481
return peerResult;
8582
});
8683
}
8784

88-
@Override
89-
protected boolean isRetryableError(final Throwable error) {
90-
return super.isRetryableError(error) || error instanceof IncompleteResultsException;
91-
}
92-
9385
@Override
9486
protected void handleTaskError(final Throwable error) {
9587
if (getRetryCount() < getMaxRetries()) {
@@ -109,6 +101,16 @@ protected void handleTaskError(final Throwable error) {
109101
super.handleTaskError(error);
110102
}
111103

104+
@Override
105+
protected boolean emptyResult(final PeerTaskResult<Block> peerResult) {
106+
return peerResult.getResult() == null;
107+
}
108+
109+
@Override
110+
protected boolean completeResult(final PeerTaskResult<Block> peerResult) {
111+
return !emptyResult(peerResult);
112+
}
113+
112114
private String logBlockNumberMaybeHash() {
113115
return blockNumber + maybeBlockHash.map(h -> " (" + h.toHexString() + ")").orElse("");
114116
}

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetBlocksFromPeersTask.java

+11-8
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.hyperledger.besu.plugin.services.MetricsSystem;
2424

2525
import java.util.List;
26-
import java.util.Objects;
2726
import java.util.concurrent.CompletableFuture;
2827

2928
import org.slf4j.Logger;
@@ -43,11 +42,7 @@ protected RetryingGetBlocksFromPeersTask(
4342
final MetricsSystem metricsSystem,
4443
final int maxRetries,
4544
final List<BlockHeader> headers) {
46-
super(
47-
ethContext,
48-
metricsSystem,
49-
res -> Objects.isNull(res) || res.getResult().isEmpty(),
50-
maxRetries);
45+
super(ethContext, metricsSystem, maxRetries);
5146
this.protocolSchedule = protocolSchedule;
5247
this.headers = headers;
5348
}
@@ -79,8 +74,6 @@ protected CompletableFuture<PeerTaskResult<List<Block>>> executeTaskOnCurrentPee
7974
.addArgument(peerResult.getPeer())
8075
.addArgument(this::getRetryCount)
8176
.log();
82-
83-
result.complete(peerResult);
8477
return peerResult;
8578
});
8679
}
@@ -99,4 +92,14 @@ protected void handleTaskError(final Throwable error) {
9992
}
10093
super.handleTaskError(error);
10194
}
95+
96+
@Override
97+
protected boolean emptyResult(final PeerTaskResult<List<Block>> peerResult) {
98+
return peerResult.getResult().isEmpty();
99+
}
100+
101+
@Override
102+
protected boolean completeResult(final PeerTaskResult<List<Block>> peerResult) {
103+
return !emptyResult(peerResult);
104+
}
102105
}

0 commit comments

Comments
 (0)