Skip to content

Commit

Permalink
Avoid keeping txpool lock during block creation
Browse files Browse the repository at this point in the history
Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
  • Loading branch information
fab-10 committed Jun 7, 2024
1 parent 2d59f4d commit a494a9f
Show file tree
Hide file tree
Showing 14 changed files with 191 additions and 131 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private List<AbstractTransactionSelector> createTransactionSelectors(
public TransactionSelectionResults buildTransactionListForBlock() {
LOG.atDebug()
.setMessage("Transaction pool stats {}")
.addArgument(blockSelectionContext.transactionPool().logStats())
.addArgument(blockSelectionContext.transactionPool()::logStats)
.log();
timeLimitedSelection();
LOG.atTrace()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ public abstract class AbstractIsolationTests {
txPoolMetrics,
transactionReplacementTester,
new BlobCache(),
MiningParameters.newDefault()));
MiningParameters.newDefault()),
ethScheduler);

protected final List<GenesisAllocation> accounts =
GenesisConfigFile.fromResource("/dev.json")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ private static PendingTransactions createLayeredPendingTransactions(
miningParameters);
}

return new LayeredPendingTransactions(transactionPoolConfiguration, pendingTransactionsSorter);
return new LayeredPendingTransactions(
transactionPoolConfiguration, pendingTransactionsSorter, ethScheduler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeSet;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Stream;

/**
* Holds the current set of executable pending transactions, that are candidate for inclusion on
Expand Down Expand Up @@ -168,8 +168,16 @@ protected int[] getRemainingPromotionsPerType() {
}

@Override
public Stream<PendingTransaction> stream() {
return orderByFee.descendingSet().stream();
public List<SenderPendingTransactions> getBySender() {
final var sendersToAdd = new HashSet<>(txsBySender.keySet());
return orderByFee.descendingSet().stream()
.map(PendingTransaction::getSender)
.filter(sendersToAdd::remove)
.map(
sender ->
new SenderPendingTransactions(
sender, List.copyOf(txsBySender.get(sender).values())))
.toList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;

import static java.util.Collections.unmodifiableList;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ADDED;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ALREADY_KNOWN;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.REJECTED_UNDERPRICED_REPLACEMENT;
Expand Down Expand Up @@ -54,7 +55,6 @@
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -138,6 +138,8 @@ public boolean contains(final Transaction transaction) {
|| nextLayer.contains(transaction);
}

public abstract List<SenderPendingTransactions> getBySender();

@Override
public List<PendingTransaction> getAll() {
final List<PendingTransaction> allNextLayers = nextLayer.getAll();
Expand Down Expand Up @@ -548,17 +550,17 @@ public List<Transaction> getAllPriority() {
return priorityTxs;
}

Stream<PendingTransaction> stream(final Address sender) {
return txsBySender.getOrDefault(sender, EMPTY_SENDER_TXS).values().stream();
}

@Override
public List<PendingTransaction> getAllFor(final Address sender) {
return Stream.concat(stream(sender), nextLayer.getAllFor(sender).stream()).toList();
public synchronized List<PendingTransaction> getAllFor(final Address sender) {
final var fromNextLayers = nextLayer.getAllFor(sender);
final var fromThisLayer = txsBySender.getOrDefault(sender, EMPTY_SENDER_TXS).values();
final var concatLayers =
new ArrayList<PendingTransaction>(fromThisLayer.size() + fromNextLayers.size());
concatLayers.addAll(fromThisLayer);
concatLayers.addAll(fromNextLayers);
return unmodifiableList(concatLayers);
}

abstract Stream<PendingTransaction> stream();

@Override
public int count() {
return pendingTransactions.size() + nextLayer.count();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionAddedListener;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionDroppedListener;
Expand All @@ -41,13 +42,10 @@

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collector;
import java.util.stream.Collectors;

Expand All @@ -63,12 +61,15 @@ public class LayeredPendingTransactions implements PendingTransactions {
private static final Marker INVALID_TX_REMOVED = MarkerFactory.getMarker("INVALID_TX_REMOVED");
private final TransactionPoolConfiguration poolConfig;
private final AbstractPrioritizedTransactions prioritizedTransactions;
private final EthScheduler ethScheduler;

public LayeredPendingTransactions(
final TransactionPoolConfiguration poolConfig,
final AbstractPrioritizedTransactions prioritizedTransactions) {
final AbstractPrioritizedTransactions prioritizedTransactions,
final EthScheduler ethScheduler) {
this.poolConfig = poolConfig;
this.prioritizedTransactions = prioritizedTransactions;
this.ethScheduler = ethScheduler;
}

@Override
Expand Down Expand Up @@ -311,79 +312,56 @@ public synchronized List<Transaction> getPriorityTransactions() {
}

@Override
// There's a small edge case here we could encounter.
// When we pass an upgrade block that has a new transaction type, we start allowing transactions
// of that new type into our pool.
// If we then reorg to a block lower than the upgrade block height _and_ we create a block, that
// block could end up with transactions of the new type.
// This seems like it would be very rare but worth it to document that we don't handle that case
// right now.
public synchronized void selectTransactions(
final PendingTransactions.TransactionSelector selector) {
public void selectTransactions(final PendingTransactions.TransactionSelector selector) {
final List<PendingTransaction> invalidTransactions = new ArrayList<>();
final Set<Hash> alreadyChecked = new HashSet<>();
final Set<Address> skipSenders = new HashSet<>();
final AtomicBoolean completed = new AtomicBoolean(false);

prioritizedTransactions.stream()
.takeWhile(unused -> !completed.get())
.filter(highPrioPendingTx -> !skipSenders.contains(highPrioPendingTx.getSender()))
.peek(this::logSenderTxs)
.forEach(
highPrioPendingTx ->
prioritizedTransactions.stream(highPrioPendingTx.getSender())
.takeWhile(
candidatePendingTx ->
!skipSenders.contains(candidatePendingTx.getSender())
&& !completed.get())
.filter(
candidatePendingTx ->
!alreadyChecked.contains(candidatePendingTx.getHash())
&& Long.compareUnsigned(
candidatePendingTx.getNonce(), highPrioPendingTx.getNonce())
<= 0)
.forEach(
candidatePendingTx -> {
alreadyChecked.add(candidatePendingTx.getHash());
final var res = selector.evaluateTransaction(candidatePendingTx);

LOG.atTrace()
.setMessage("Selection result {} for transaction {}")
.addArgument(res)
.addArgument(candidatePendingTx::toTraceLog)
.log();

if (res.discard()) {
invalidTransactions.add(candidatePendingTx);
logDiscardedTransaction(candidatePendingTx, res);
}

if (res.stop()) {
completed.set(true);
}

if (!res.selected()) {
// avoid processing other txs from this sender if this one is skipped
// since the following will not be selected due to the nonce gap
skipSenders.add(candidatePendingTx.getSender());
LOG.trace("Skipping tx from sender {}", candidatePendingTx.getSender());
}
}));

invalidTransactions.forEach(
invalidTx -> prioritizedTransactions.remove(invalidTx, INVALIDATED));
}

private void logSenderTxs(final PendingTransaction highPrioPendingTx) {
LOG.atTrace()
.setMessage("highPrioPendingTx {}, senderTxs {}")
.addArgument(highPrioPendingTx::toTraceLog)
.addArgument(
() ->
prioritizedTransactions.stream(highPrioPendingTx.getSender())
.map(PendingTransaction::toTraceLog)
.collect(Collectors.joining(", ")))
.log();
final List<SenderPendingTransactions> candidateTransactions;
synchronized (this) {
// since selecting transactions for block creation is a potential long operation
// we want to avoid to keep the log for all the process, so we just lock to get
// the candidate transactions
candidateTransactions = prioritizedTransactions.getBySender();
}

selection:
for (final var senderTxs : candidateTransactions) {
LOG.trace("highPrioSenderTxs {}", senderTxs);

for (final var candidatePendingTx : senderTxs.pendingTransactions()) {
final var selectionResult = selector.evaluateTransaction(candidatePendingTx);

LOG.atTrace()
.setMessage("Selection result {} for transaction {}")
.addArgument(selectionResult)
.addArgument(candidatePendingTx::toTraceLog)
.log();

if (selectionResult.discard()) {
invalidTransactions.add(candidatePendingTx);
logDiscardedTransaction(candidatePendingTx, selectionResult);
}

if (!selectionResult.selected()) {
// avoid processing other txs from this sender if this one is skipped
// since the following will not be selected due to the nonce gap
LOG.trace("Skipping remaining txs for sender {}", candidatePendingTx.getSender());
break;
}

if (selectionResult.stop()) {
break selection;
}
}
}

ethScheduler.scheduleTxWorkerTask(
() ->
invalidTransactions.forEach(
invalidTx -> {
synchronized (this) {
prioritizedTransactions.remove(invalidTx, INVALIDATED);
}
}));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class ReadyTransactions extends AbstractSequentialTransactionsLayer {

Expand Down Expand Up @@ -138,10 +137,14 @@ protected boolean promotionFilter(final PendingTransaction pendingTransaction) {
}

@Override
public Stream<PendingTransaction> stream() {
public synchronized List<SenderPendingTransactions> getBySender() {
return orderByMaxFee.descendingSet().stream()
.map(PendingTransaction::getSender)
.flatMap(sender -> txsBySender.get(sender).values().stream());
.map(
sender ->
new SenderPendingTransactions(
sender, List.copyOf(txsBySender.get(sender).values())))
.toList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;

import java.util.List;
import java.util.stream.Collectors;

public record SenderPendingTransactions(
Address sender, List<PendingTransaction> pendingTransactions) {

@Override
public String toString() {
return "Sender "
+ sender
+ " has "
+ pendingTransactions.size()
+ " pending transactions "
+ pendingTransactions.stream()
.map(PendingTransaction::toTraceLog)
.collect(Collectors.joining(",", "[", "]"));
}
}
Loading

0 comments on commit a494a9f

Please sign in to comment.