Skip to content

Commit

Permalink
Refactor tx selectors with state to work with atomic tx groups
Browse files Browse the repository at this point in the history
Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
  • Loading branch information
fab-10 committed Jan 13, 2025
1 parent af2f0b1 commit 8a4eabf
Show file tree
Hide file tree
Showing 9 changed files with 428 additions and 405 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,10 @@
*/
package net.consensys.linea.sequencer.modulelimit;

import static com.google.common.base.Preconditions.checkArgument;

import java.io.File;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.SequencedMap;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand All @@ -33,7 +28,6 @@
import org.apache.tuweni.toml.Toml;
import org.apache.tuweni.toml.TomlParseResult;
import org.apache.tuweni.toml.TomlTable;
import org.hyperledger.besu.datatypes.Hash;

/**
* Accumulates and verifies line counts for modules based on provided limits. It supports verifying
Expand All @@ -45,8 +39,6 @@
@Slf4j
public class ModuleLineCountValidator {
private final Map<String, Integer> moduleLineCountLimits;
private final SequencedMap<Hash, Map<String, Integer>> pendingAccumulatedLineCountsPerModule =
new LinkedHashMap<>();
@Getter private Map<String, Integer> confirmedAccumulatedLineCountsPerModule;

/**
Expand All @@ -56,9 +48,17 @@ public class ModuleLineCountValidator {
*/
public ModuleLineCountValidator(Map<String, Integer> moduleLineCountLimits) {
this.moduleLineCountLimits = Map.copyOf(moduleLineCountLimits);
this.confirmedAccumulatedLineCountsPerModule =
moduleLineCountLimits.keySet().stream()
.collect(Collectors.toMap(Function.identity(), unused -> 0));
this.confirmedAccumulatedLineCountsPerModule = initialLineCountLimits();
}

private Map<String, Integer> initialLineCountLimits() {
return moduleLineCountLimits.keySet().stream()
.collect(Collectors.toMap(Function.identity(), unused -> 0));
}

public ModuleLimitsValidationResult validate(
final Map<String, Integer> currentAccumulatedLineCounts) {
return validate(currentAccumulatedLineCounts, initialLineCountLimits());
}

/**
Expand All @@ -68,7 +68,9 @@ public ModuleLineCountValidator(Map<String, Integer> moduleLineCountLimits) {
* counts.
* @return A {@link ModuleLimitsValidationResult} indicating the outcome of the verification.
*/
public ModuleLimitsValidationResult validate(Map<String, Integer> currentAccumulatedLineCounts) {
public ModuleLimitsValidationResult validate(
final Map<String, Integer> currentAccumulatedLineCounts,
final Map<String, Integer> prevAccumulatedLineCounts) {
for (Map.Entry<String, Integer> moduleEntry : currentAccumulatedLineCounts.entrySet()) {
String moduleName = moduleEntry.getKey();
Integer currentTotalLineCountForModule = moduleEntry.getValue();
Expand All @@ -80,7 +82,7 @@ public ModuleLimitsValidationResult validate(Map<String, Integer> currentAccumul
}

final int lineCountAddedByCurrentTx =
currentTotalLineCountForModule - getLastAccumulatedLineCountsPerModule().get(moduleName);
currentTotalLineCountForModule - prevAccumulatedLineCounts.get(moduleName);

if (lineCountAddedByCurrentTx > lineCountLimitForModule) {
return ModuleLimitsValidationResult.txModuleLineCountOverflow(
Expand All @@ -103,72 +105,6 @@ public ModuleLimitsValidationResult validate(Map<String, Integer> currentAccumul
return ModuleLimitsValidationResult.VALID;
}

/**
* Append the accumulated line counts per module related to the passed tx hash. The appended value
* remains pending, meaning it could be discarded, until {@link
* ModuleLineCountResult#confirm(Hash)} is called for the same tx hash or a following one.
*
* @param txHash Hash of the transaction
* @param accumulatedLineCounts A map of module names to their new accumulated line counts.
*/
public void appendAccumulatedLineCounts(
final Hash txHash, final Map<String, Integer> accumulatedLineCounts) {
pendingAccumulatedLineCountsPerModule.putLast(txHash, accumulatedLineCounts);
}

/**
* Discards the pending accumulated line counts starting from the specified tx hash.
*
* @param txHash the tx hash, could not be present, in which case there is no change to the
* pending state
*/
public void discard(final Hash txHash) {
boolean afterRemoved = false;
final var it = pendingAccumulatedLineCountsPerModule.entrySet().iterator();
while (it.hasNext()) {
final var entry = it.next();
if (afterRemoved || entry.getKey().equals(txHash)) {
it.remove();
afterRemoved = true;
}
}
}

/**
* Sets the accumulated line counts referred by the specified tx hash has the confirmed one,
* allowing to forget all the preceding entries.
*
* @param txHash the tx hash, it must exist in the pending list, otherwise an exception is thrown
*/
public void confirm(final Hash txHash) {
checkArgument(
pendingAccumulatedLineCountsPerModule.containsKey(txHash),
"The specified tx hash has no pending accumulated line counts.");

final var it = pendingAccumulatedLineCountsPerModule.entrySet().iterator();
while (it.hasNext()) {
final var entry = it.next();
it.remove();
if (entry.getKey().equals(txHash)) {
confirmedAccumulatedLineCountsPerModule = Collections.unmodifiableMap(entry.getValue());
break;
}
}
}

/**
* Gets the latest, including pending, accumulated line counts. Note that the returned values
* could not yet be confirmed and could be discarded in the future.
*
* @return a map with the line count per module
*/
public Map<String, Integer> getLastAccumulatedLineCountsPerModule() {
if (pendingAccumulatedLineCountsPerModule.isEmpty()) {
return confirmedAccumulatedLineCountsPerModule;
}
return pendingAccumulatedLineCountsPerModule.lastEntry().getValue();
}

/** Enumerates possible outcomes of verifying module line counts against their limits. */
public enum ModuleLineCountResult {
VALID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
public class MaxBlockCallDataTransactionSelector implements PluginTransactionSelector {

private final int maxBlockCallDataSize;
private int cumulativeBlockCallDataSize;
private final PendingSelectionState<Integer> pendingCumulativeBlockCallDataSize =
new PendingSelectionState<>(0);

/**
* Evaluates a transaction before processing. Checks if adding the transaction to the block pushes
Expand All @@ -52,13 +53,15 @@ public TransactionSelectionResult evaluateTransactionPreProcessing(

final Transaction transaction = evaluationContext.getPendingTransaction().getTransaction();
final int transactionCallDataSize = transaction.getPayload().size();
final int newCumulativeBlockCallDataSize =
Math.addExact(pendingCumulativeBlockCallDataSize.getLast(), transactionCallDataSize);

if (isTransactionExceedingBlockCallDataSizeLimit(transactionCallDataSize)) {
if (newCumulativeBlockCallDataSize > maxBlockCallDataSize) {
log.atTrace()
.setMessage(
"Cumulative block calldata size including tx {} is {} greater than the max allowed {}, skipping tx")
.addArgument(transaction::getHash)
.addArgument(() -> cumulativeBlockCallDataSize + transactionCallDataSize)
.addArgument(newCumulativeBlockCallDataSize)
.addArgument(maxBlockCallDataSize)
.log();
return BLOCK_CALLDATA_OVERFLOW;
Expand All @@ -67,37 +70,36 @@ public TransactionSelectionResult evaluateTransactionPreProcessing(
log.atTrace()
.setMessage("Cumulative block calldata size including tx {} is {}")
.addArgument(transaction::getHash)
.addArgument(cumulativeBlockCallDataSize)
.addArgument(newCumulativeBlockCallDataSize)
.log();

return SELECTED;
}
pendingCumulativeBlockCallDataSize.appendUnconfirmed(
transaction.getHash(), newCumulativeBlockCallDataSize);

/**
* Checks if the total call data size of all transactions in a block would exceed the maximum
* allowed size if the given transaction were added.
*
* @param transactionCallDataSize The call data size of the transaction.
* @return true if the total call data size would be too big, false otherwise.
*/
private boolean isTransactionExceedingBlockCallDataSizeLimit(int transactionCallDataSize) {
return Math.addExact(cumulativeBlockCallDataSize, transactionCallDataSize)
> maxBlockCallDataSize;
return SELECTED;
}

/**
* Updates the total call data size of all transactions in a block when a transaction is selected.
* Updates the confirmed call data size of all transactions in a block when a transaction is
* selected.
*
* @param pendingTransaction The selected transaction.
* @param evaluationContext The evaluation context of the selected transaction
* @param transactionProcessingResult The processing result of the selected transaction
*/
@Override
public void onTransactionSelected(
final TransactionEvaluationContext<? extends PendingTransaction> evaluationContext,
final TransactionProcessingResult transactionProcessingResult) {
final int transactionCallDataSize =
evaluationContext.getPendingTransaction().getTransaction().getPayload().size();
cumulativeBlockCallDataSize =
Math.addExact(cumulativeBlockCallDataSize, transactionCallDataSize);
pendingCumulativeBlockCallDataSize.confirm(
evaluationContext.getPendingTransaction().getTransaction().getHash());
}

@Override
public void onTransactionNotSelected(
final TransactionEvaluationContext<? extends PendingTransaction> evaluationContext,
final TransactionSelectionResult transactionSelectionResult) {
pendingCumulativeBlockCallDataSize.discard(
evaluationContext.getPendingTransaction().getTransaction().getHash());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
public class MaxBlockGasTransactionSelector implements PluginTransactionSelector {

private final long maxGasPerBlock;
private long cumulativeBlockGasUsed;
private final PendingSelectionState<Long> pendingCumulativeBlockGasUsed =
new PendingSelectionState<>(0L);

/**
* Evaluates a transaction post-processing. Checks if adding the gas used of the transaction, to
Expand Down Expand Up @@ -70,27 +71,25 @@ public TransactionSelectionResult evaluateTransactionPostProcessing(
return TX_GAS_EXCEEDS_USER_MAX_BLOCK_GAS;
}

if (isTransactionExceedingMaxBlockGasLimit(gasUsedByTransaction)) {
final long newCumulativeBlockGasUsed =
Math.addExact(pendingCumulativeBlockGasUsed.getLast(), gasUsedByTransaction);

if (newCumulativeBlockGasUsed > maxGasPerBlock) {
log.atTrace()
.setMessage(
"Not selecting transaction {}, its cumulative block gas used {} greater than max user gas per block {},"
+ " skipping it")
.addArgument(transaction::getHash)
.addArgument(cumulativeBlockGasUsed)
.addArgument(newCumulativeBlockGasUsed)
.addArgument(maxGasPerBlock)
.log();
return TX_TOO_LARGE_FOR_REMAINING_USER_GAS;
}
return SELECTED;
}

private boolean isTransactionExceedingMaxBlockGasLimit(long transactionGasUsed) {
try {
return Math.addExact(cumulativeBlockGasUsed, transactionGasUsed) > maxGasPerBlock;
} catch (final ArithmeticException e) {
// Overflow won't occur as cumulativeBlockGasUsed won't exceed Long.MAX_VALUE
return true;
}
pendingCumulativeBlockGasUsed.appendUnconfirmed(
transaction.getHash(), newCumulativeBlockGasUsed);

return SELECTED;
}

/**
Expand All @@ -104,8 +103,16 @@ private boolean isTransactionExceedingMaxBlockGasLimit(long transactionGasUsed)
public void onTransactionSelected(
final TransactionEvaluationContext<? extends PendingTransaction> evaluationContext,
final TransactionProcessingResult processingResult) {
cumulativeBlockGasUsed =
Math.addExact(cumulativeBlockGasUsed, processingResult.getEstimateGasUsedByTransaction());
pendingCumulativeBlockGasUsed.confirm(
evaluationContext.getPendingTransaction().getTransaction().getHash());
}

@Override
public void onTransactionNotSelected(
final TransactionEvaluationContext<? extends PendingTransaction> evaluationContext,
final TransactionSelectionResult transactionSelectionResult) {
pendingCumulativeBlockGasUsed.discard(
evaluationContext.getPendingTransaction().getTransaction().getHash());
}

@Override
Expand Down
Loading

0 comments on commit 8a4eabf

Please sign in to comment.