Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 43 additions & 15 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"errors"
"fmt"
"github.com/cosmos/evm/mempool/txpool/locals"
"sync"
"time"

ethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/holiman/uint256"
Expand Down Expand Up @@ -46,9 +48,10 @@ type (
vmKeeper VMKeeperI

/** Mempools **/
txPool *txpool.TxPool
legacyTxPool *legacypool.LegacyPool
cosmosPool sdkmempool.ExtMempool
txPool *txpool.TxPool
legacyTxPool *legacypool.LegacyPool
localTxTracker *locals.TxTracker
cosmosPool sdkmempool.ExtMempool

/** Utils **/
logger log.Logger
Expand Down Expand Up @@ -127,6 +130,7 @@ func NewExperimentalEVMMempool(getCtxCallback func(height int64, prove bool) (sd
// from queued into pending, noting their readiness to be executed.
legacyPool.BroadcastTxFn = func(txs []*ethtypes.Transaction) error {
logger.Debug("broadcasting EVM transactions", "tx_count", len(txs))
fmt.Println(clientCtx)
return broadcastEVMTransactions(clientCtx, txConfig, txs)
}
}
Expand All @@ -143,6 +147,21 @@ func NewExperimentalEVMMempool(getCtxCallback func(height int64, prove bool) (sd
panic("tx pool should contain only legacypool")
}

var localTxTracker *locals.TxTracker

if !legacyConfig.NoLocals {
rejournal := legacyConfig.Rejournal
if rejournal < time.Second {
logger.Debug("Sanitizing invalid txpool journal time", "provided", rejournal, "updated", time.Second)
rejournal = time.Second
}
localTxTracker = locals.New(legacyConfig.Journal, rejournal, blockchain.Config(), txPool)
err := localTxTracker.Start()
if err != nil {
return nil
}
}

// Create Cosmos Mempool from configuration
cosmosPoolConfig := config.CosmosPoolConfig
if cosmosPoolConfig == nil {
Expand Down Expand Up @@ -174,18 +193,19 @@ func NewExperimentalEVMMempool(getCtxCallback func(height int64, prove bool) (sd
cosmosPool = sdkmempool.NewPriorityMempool(*cosmosPoolConfig)

evmMempool := &ExperimentalEVMMempool{
vmKeeper: vmKeeper,
txPool: txPool,
legacyTxPool: txPool.Subpools[0].(*legacypool.LegacyPool),
cosmosPool: cosmosPool,
logger: logger,
txConfig: txConfig,
blockchain: blockchain,
bondDenom: bondDenom,
evmDenom: evmDenom,
blockGasLimit: config.BlockGasLimit,
minTip: config.MinTip,
anteHandler: config.AnteHandler,
vmKeeper: vmKeeper,
txPool: txPool,
legacyTxPool: txPool.Subpools[0].(*legacypool.LegacyPool),
localTxTracker: localTxTracker,
cosmosPool: cosmosPool,
logger: logger,
txConfig: txConfig,
blockchain: blockchain,
bondDenom: bondDenom,
evmDenom: evmDenom,
blockGasLimit: config.BlockGasLimit,
minTip: config.MinTip,
anteHandler: config.AnteHandler,
}

vmKeeper.SetEvmMempool(evmMempool)
Expand Down Expand Up @@ -303,6 +323,10 @@ func (m *ExperimentalEVMMempool) Remove(tx sdk.Tx) error {
m.mtx.Lock()
defer m.mtx.Unlock()

if m.blockchain.latestCtx.BlockHeight() == 0 {
return nil
}

m.logger.Debug("removing transaction from mempool")

msg, err := m.getEVMMessage(tx)
Expand Down Expand Up @@ -419,6 +443,10 @@ func (m *ExperimentalEVMMempool) Close() error {
errs = append(errs, fmt.Errorf("failed to close txpool: %w", err))
}

if err := m.localTxTracker.Stop(); err != nil {
errs = append(errs, fmt.Errorf("failed to close localTxTracker: %w", err))
}

return errors.Join(errs...)
}

Expand Down
46 changes: 46 additions & 0 deletions mempool/txpool/locals/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2025 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package locals

import (
"errors"

"github.com/ethereum/go-ethereum/core/txpool"
"github.com/ethereum/go-ethereum/core/txpool/legacypool"
)

// IsTemporaryReject determines whether the given error indicates a temporary
// reason to reject a transaction from being included in the txpool. The result
// may change if the txpool's state changes later.
func IsTemporaryReject(err error) bool {
switch {
case errors.Is(err, legacypool.ErrOutOfOrderTxFromDelegated):
return true
case errors.Is(err, txpool.ErrInflightTxLimitReached):
return true
case errors.Is(err, legacypool.ErrAuthorityReserved):
return true
case errors.Is(err, txpool.ErrUnderpriced):
return true
case errors.Is(err, legacypool.ErrTxPoolOverflow):
return true
case errors.Is(err, legacypool.ErrFutureReplacePending):
return true
default:
return false
}
}
186 changes: 186 additions & 0 deletions mempool/txpool/locals/journal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package locals

import (
"errors"
"io"
"io/fs"
"os"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
)

// errNoActiveJournal is returned if a transaction is attempted to be inserted
// into the journal, but no such file is currently open.
var errNoActiveJournal = errors.New("no active journal")

// devNull is a WriteCloser that just discards anything written into it. Its
// goal is to allow the transaction journal to write into a fake journal when
// loading transactions on startup without printing warnings due to no file
// being read for write.
type devNull struct{}

func (*devNull) Write(p []byte) (n int, err error) { return len(p), nil }
func (*devNull) Close() error { return nil }

// journal is a rotating log of transactions with the aim of storing locally
// created transactions to allow non-executed ones to survive node restarts.
type journal struct {
path string // Filesystem path to store the transactions at
writer io.WriteCloser // Output stream to write new transactions into
}

// newTxJournal creates a new transaction journal to
func newTxJournal(path string) *journal {
return &journal{
path: path,
}
}

// load parses a transaction journal dump from disk, loading its contents into
// the specified pool.
func (journal *journal) load(add func([]*types.Transaction) []error) error {
// Open the journal for loading any past transactions
input, err := os.Open(journal.path)
if errors.Is(err, fs.ErrNotExist) {
// Skip the parsing if the journal file doesn't exist at all
return nil
}
if err != nil {
return err
}
defer input.Close()

// Temporarily discard any journal additions (don't double add on load)
journal.writer = new(devNull)
defer func() { journal.writer = nil }()

// Inject all transactions from the journal into the pool
stream := rlp.NewStream(input, 0)
total, dropped := 0, 0

// Create a method to load a limited batch of transactions and bump the
// appropriate progress counters. Then use this method to load all the
// journaled transactions in small-ish batches.
loadBatch := func(txs types.Transactions) {
for _, err := range add(txs) {
if err != nil {
log.Debug("Failed to add journaled transaction", "err", err)
dropped++
}
}
}
var (
failure error
batch types.Transactions
)
for {
// Parse the next transaction and terminate on error
tx := new(types.Transaction)
if err = stream.Decode(tx); err != nil {
if err != io.EOF {
failure = err
}
if batch.Len() > 0 {
loadBatch(batch)
}
break
}
// New transaction parsed, queue up for later, import if threshold is reached
total++

if batch = append(batch, tx); batch.Len() > 1024 {
loadBatch(batch)
batch = batch[:0]
}
}
log.Info("Loaded local transaction journal", "transactions", total, "dropped", dropped)

return failure
}

// insert adds the specified transaction to the local disk journal.
func (journal *journal) insert(tx *types.Transaction) error {
if journal.writer == nil {
return errNoActiveJournal
}
if err := rlp.Encode(journal.writer, tx); err != nil {
return err
}
return nil
}

// rotate regenerates the transaction journal based on the current contents of
// the transaction pool.
func (journal *journal) rotate(all map[common.Address]types.Transactions) error {
// Close the current journal (if any is open)
if journal.writer != nil {
if err := journal.writer.Close(); err != nil {
return err
}
journal.writer = nil
}
// Generate a new journal with the contents of the current pool
replacement, err := os.OpenFile(journal.path+".new", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return err
}
journaled := 0
for _, txs := range all {
for _, tx := range txs {
if err = rlp.Encode(replacement, tx); err != nil {
replacement.Close()

Check warning

Code scanning / CodeQL

Writable file handle closed without error handling Warning

File handle may be writable as a result of data flow from a
call to OpenFile
and closing it may result in data loss upon failure, which is not handled explicitly.

Copilot Autofix

AI 18 days ago

To fix this issue, always check and handle errors returned by replacement.Close(). In this code, there are two places where replacement.Close() is called:

  • In the error path inside the loop, after a failed rlp.Encode
  • After the loop, when all transactions have been written

In both cases, the error from Close() should be checked. In the error path, if rlp.Encode fails, and replacement.Close() also fails, both errors are relevant, but returning the first error (the encoding error) is conventionally favored, though logging the close error could also be considered. In the success path, if replacement.Close() fails, its error should be returned to the caller instead of pretending everything succeeded.

The code should be updated to:

  • Check the error from replacement.Close().
  • In the error path, attempt to close and return the error from rlp.Encode (possibly logging any close error).
  • In the normal path, after replacement.Close(), return any error from closing (if present).

No extra imports or new methods are needed, just additional error handling.


Suggested changeset 1
mempool/txpool/locals/journal.go

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/mempool/txpool/locals/journal.go b/mempool/txpool/locals/journal.go
--- a/mempool/txpool/locals/journal.go
+++ b/mempool/txpool/locals/journal.go
@@ -147,13 +147,18 @@
 	for _, txs := range all {
 		for _, tx := range txs {
 			if err = rlp.Encode(replacement, tx); err != nil {
-				replacement.Close()
+				closeErr := replacement.Close()
+				if closeErr != nil {
+					// Optionally, log closeErr here using `log.Warn`, since only one error can be returned.
+				}
 				return err
 			}
 		}
 		journaled += len(txs)
 	}
-	replacement.Close()
+	if err := replacement.Close(); err != nil {
+		return err
+	}
 
 	// Replace the live journal with the newly generated one
 	if err = os.Rename(journal.path+".new", journal.path); err != nil {
EOF
@@ -147,13 +147,18 @@
for _, txs := range all {
for _, tx := range txs {
if err = rlp.Encode(replacement, tx); err != nil {
replacement.Close()
closeErr := replacement.Close()
if closeErr != nil {
// Optionally, log closeErr here using `log.Warn`, since only one error can be returned.
}
return err
}
}
journaled += len(txs)
}
replacement.Close()
if err := replacement.Close(); err != nil {
return err
}

// Replace the live journal with the newly generated one
if err = os.Rename(journal.path+".new", journal.path); err != nil {
Copilot is powered by AI and may make mistakes. Always verify output.
return err
}
}
journaled += len(txs)
}
Comment on lines +147 to +155

Check warning

Code scanning / CodeQL

Iteration over map Warning

Iteration over map may be a possible source of non-determinism
replacement.Close()

Check warning

Code scanning / CodeQL

Writable file handle closed without error handling Warning

File handle may be writable as a result of data flow from a
call to OpenFile
and closing it may result in data loss upon failure, which is not handled explicitly.

Copilot Autofix

AI 18 days ago

General approach:
Explicitly check the error returned by replacement.Close() on line 156, and propagate it up by returning if it is non-nil.

Detailed steps:

  • After the loop writing to replacement, instead of calling replacement.Close() and discarding its error, check it.
  • If this error is non-nil, return it immediately, just as is done for other errors within rotate.
  • This preserves the existing functional behavior but closes the bug where a late error would be suppressed.
  • No additional imports or helpers are needed. Just update the close call and error handling.

Where to change:

  • In mempool/txpool/locals/journal.go, in the rotate function, around line 156 (after writing transactions and before renaming the file).

Suggested changeset 1
mempool/txpool/locals/journal.go

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/mempool/txpool/locals/journal.go b/mempool/txpool/locals/journal.go
--- a/mempool/txpool/locals/journal.go
+++ b/mempool/txpool/locals/journal.go
@@ -153,7 +153,9 @@
 		}
 		journaled += len(txs)
 	}
-	replacement.Close()
+	if err := replacement.Close(); err != nil {
+		return err
+	}
 
 	// Replace the live journal with the newly generated one
 	if err = os.Rename(journal.path+".new", journal.path); err != nil {
EOF
@@ -153,7 +153,9 @@
}
journaled += len(txs)
}
replacement.Close()
if err := replacement.Close(); err != nil {
return err
}

// Replace the live journal with the newly generated one
if err = os.Rename(journal.path+".new", journal.path); err != nil {
Copilot is powered by AI and may make mistakes. Always verify output.

// Replace the live journal with the newly generated one
if err = os.Rename(journal.path+".new", journal.path); err != nil {
return err
}
sink, err := os.OpenFile(journal.path, os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return err
}
journal.writer = sink

logger := log.Info
if len(all) == 0 {
logger = log.Debug
}
logger("Regenerated local transaction journal", "transactions", journaled, "accounts", len(all))

return nil
}

// close flushes the transaction journal contents to disk and closes the file.
func (journal *journal) close() error {
var err error

if journal.writer != nil {
err = journal.writer.Close()
journal.writer = nil
}
return err
}
Loading
Loading