diff --git a/app/abci_utils.go b/app/abci_utils.go new file mode 100644 index 00000000..c1a3a661 --- /dev/null +++ b/app/abci_utils.go @@ -0,0 +1,338 @@ +package app + +import ( + "fmt" + + "github.com/cockroachdb/errors" + abci "github.com/cometbft/cometbft/abci/types" + gethtypes "github.com/ethereum/go-ethereum/core/types" + evmtypes "github.com/evmos/ethermint/x/evm/types" + + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/types/mempool" + "github.com/cosmos/cosmos-sdk/x/auth/signing" +) + +type ( + // GasTx defines the contract that a transaction with a gas limit must implement. + GasTx interface { + GetGas() uint64 + } + + // ProposalTxVerifier defines the interface that is implemented by BaseApp, + // that any custom ABCI PrepareProposal and ProcessProposal handler can use + // to verify a transaction. + ProposalTxVerifier interface { + PrepareProposalVerifyTx(tx sdk.Tx) ([]byte, error) + ProcessProposalVerifyTx(txBz []byte) (sdk.Tx, error) + } + + // DefaultProposalHandler defines the default ABCI PrepareProposal and + // ProcessProposal handlers. + DefaultProposalHandler struct { + mempool mempool.Mempool + txVerifier ProposalTxVerifier + txSelector TxSelector + } +) + +func NewDefaultProposalHandler(mp mempool.Mempool, txVerifier ProposalTxVerifier) *DefaultProposalHandler { + return &DefaultProposalHandler{ + mempool: mp, + txVerifier: txVerifier, + txSelector: NewDefaultTxSelector(), + } +} + +// SetTxSelector sets the TxSelector function on the DefaultProposalHandler. +func (h *DefaultProposalHandler) SetTxSelector(ts TxSelector) { + h.txSelector = ts +} + +// PrepareProposalHandler returns the default implementation for processing an +// ABCI proposal. The application's mempool is enumerated and all valid +// transactions are added to the proposal. Transactions are valid if they: +// +// 1) Successfully encode to bytes. +// 2) Are valid (i.e. pass runTx, AnteHandler only). +// +// Enumeration is halted once RequestPrepareProposal.MaxBytes of transactions is +// reached or the mempool is exhausted. +// +// Note: +// +// - Step (2) is identical to the validation step performed in +// DefaultProcessProposal. It is very important that the same validation logic +// is used in both steps, and applications must ensure that this is the case in +// non-default handlers. +// +// - If no mempool is set or if the mempool is a no-op mempool, the transactions +// requested from CometBFT will simply be returned, which, by default, are in +// FIFO order. +func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHandler { + return func(ctx sdk.Context, req abci.RequestPrepareProposal) abci.ResponsePrepareProposal { + var maxBlockGas uint64 + if b := ctx.ConsensusParams().Block; b != nil { + maxBlockGas = uint64(b.MaxGas) + } + + defer h.txSelector.Clear() + + // If the mempool is nil or NoOp we simply return the transactions + // requested from CometBFT, which, by default, should be in FIFO order. + // + // Note, we still need to ensure the transactions returned respect req.MaxTxBytes. + _, isNoOp := h.mempool.(mempool.NoOpMempool) + if h.mempool == nil || isNoOp { + for _, txBz := range req.Txs { + // XXX: We pass nil as the memTx because we have no way of decoding the + // txBz. We'd need to break (update) the ProposalTxVerifier interface. + // As a result, we CANNOT account for block max gas. + stop := h.txSelector.SelectTxForProposal(uint64(req.MaxTxBytes), maxBlockGas, nil, txBz) + if stop { + break + } + } + + return abci.ResponsePrepareProposal{Txs: h.txSelector.SelectedTxs()} + } + + iterator := h.mempool.Select(ctx, req.Txs) + selectedTxsSignersSeqs := make(map[string]uint64) + var selectedTxsNums int + for iterator != nil { + memTx := iterator.Tx() + sigs, err := memTx.(signing.SigVerifiableTx).GetSignaturesV2() + if err != nil { + panic(fmt.Errorf("failed to get signatures: %w", err)) + } + // If the signers aren't in selectedTxsSignersSeqs then we haven't seen them before + // so we add them and continue given that we don't need to check the sequence. + shouldAdd := true + txSignersSeqs := make(map[string]uint64) + if len(sigs) == 0 { + msgs := memTx.GetMsgs() + if len(msgs) == 1 { + msgEthTx, ok := msgs[0].(*evmtypes.MsgEthereumTx) + if ok { + ethTx := msgEthTx.AsTransaction() + signer := gethtypes.NewEIP2930Signer(ethTx.ChainId()) + ethSender, err := signer.Sender(ethTx) + if err == nil { + signer := sdk.AccAddress(ethSender.Bytes()).String() + nonce := ethTx.Nonce() + seq, ok := selectedTxsSignersSeqs[signer] + if !ok { + txSignersSeqs[signer] = nonce + } else { + // If we have seen this signer before in this block, we must make + // sure that the current sequence is seq+1; otherwise is invalid + // and we skip it. + if seq+1 != nonce { + shouldAdd = false + } else { + txSignersSeqs[signer] = nonce + } + } + } + } + } + } else { + for _, sig := range sigs { + signer := sdk.AccAddress(sig.PubKey.Address()).String() + seq, ok := selectedTxsSignersSeqs[signer] + if !ok { + txSignersSeqs[signer] = sig.Sequence + continue + } + + // If we have seen this signer before in this block, we must make + // sure that the current sequence is seq+1; otherwise is invalid + // and we skip it. + if seq+1 != sig.Sequence { + shouldAdd = false + break + } + txSignersSeqs[signer] = sig.Sequence + } + } + + if !shouldAdd { + iterator = iterator.Next() + continue + } + + // NOTE: Since transaction verification was already executed in CheckTx, + // which calls mempool.Insert, in theory everything in the pool should be + // valid. But some mempool implementations may insert invalid txs, so we + // check again. + txBz, err := h.txVerifier.PrepareProposalVerifyTx(memTx) + if err != nil { + err := h.mempool.Remove(memTx) + if err != nil && !errors.Is(err, mempool.ErrTxNotFound) { + panic(err) + } + } else { + stop := h.txSelector.SelectTxForProposal(uint64(req.MaxTxBytes), maxBlockGas, memTx, txBz) + if stop { + break + } + + txsLen := len(h.txSelector.SelectedTxs()) + for sender, seq := range txSignersSeqs { + // If txsLen != selectedTxsNums is true, it means that we've + // added a new tx to the selected txs, so we need to update + // the sequence of the sender. + if txsLen != selectedTxsNums { + selectedTxsSignersSeqs[sender] = seq + } else if _, ok := selectedTxsSignersSeqs[sender]; !ok { + // The transaction hasn't been added but it passed the + // verification, so we know that the sequence is correct. + // So we set this sender's sequence to seq-1, in order + // to avoid unnecessary calls to PrepareProposalVerifyTx. + selectedTxsSignersSeqs[sender] = seq - 1 + } + } + selectedTxsNums = txsLen + } + + iterator = iterator.Next() + } + return abci.ResponsePrepareProposal{Txs: h.txSelector.SelectedTxs()} + } +} + +// ProcessProposalHandler returns the default implementation for processing an +// ABCI proposal. Every transaction in the proposal must pass 2 conditions: +// +// 1. The transaction bytes must decode to a valid transaction. +// 2. The transaction must be valid (i.e. pass runTx, AnteHandler only) +// +// If any transaction fails to pass either condition, the proposal is rejected. +// Note that step (2) is identical to the validation step performed in +// DefaultPrepareProposal. It is very important that the same validation logic +// is used in both steps, and applications must ensure that this is the case in +// non-default handlers. +func (h *DefaultProposalHandler) ProcessProposalHandler() sdk.ProcessProposalHandler { + // If the mempool is nil or NoOp we simply return ACCEPT, + // because PrepareProposal may have included txs that could fail verification. + _, isNoOp := h.mempool.(mempool.NoOpMempool) + if h.mempool == nil || isNoOp { + return NoOpProcessProposal() + } + + return func(ctx sdk.Context, req abci.RequestProcessProposal) abci.ResponseProcessProposal { + var totalTxGas uint64 + + var maxBlockGas int64 + if b := ctx.ConsensusParams().Block; b != nil { + maxBlockGas = b.MaxGas + } + + for _, txBytes := range req.Txs { + tx, err := h.txVerifier.ProcessProposalVerifyTx(txBytes) + if err != nil { + return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT} + } + + if maxBlockGas > 0 { + gasTx, ok := tx.(GasTx) + if ok { + totalTxGas += gasTx.GetGas() + } + + if totalTxGas > uint64(maxBlockGas) { + return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT} + } + } + } + + return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT} + } +} + +// NoOpPrepareProposal defines a no-op PrepareProposal handler. It will always +// return the transactions sent by the client's request. +func NoOpPrepareProposal() sdk.PrepareProposalHandler { + return func(_ sdk.Context, req abci.RequestPrepareProposal) abci.ResponsePrepareProposal { + return abci.ResponsePrepareProposal{Txs: req.Txs} + } +} + +// NoOpProcessProposal defines a no-op ProcessProposal Handler. It will always +// return ACCEPT. +func NoOpProcessProposal() sdk.ProcessProposalHandler { + return func(_ sdk.Context, _ abci.RequestProcessProposal) abci.ResponseProcessProposal { + return abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_ACCEPT} + } +} + +// TxSelector defines a helper type that assists in selecting transactions during +// mempool transaction selection in PrepareProposal. It keeps track of the total +// number of bytes and total gas of the selected transactions. It also keeps +// track of the selected transactions themselves. +type TxSelector interface { + // SelectedTxs should return a copy of the selected transactions. + SelectedTxs() [][]byte + + // Clear should clear the TxSelector, nulling out all relevant fields. + Clear() + + // SelectTxForProposal should attempt to select a transaction for inclusion in + // a proposal based on inclusion criteria defined by the TxSelector. It must + // return if the caller should halt the transaction selection loop + // (typically over a mempool) or otherwise. + SelectTxForProposal(maxTxBytes, maxBlockGas uint64, memTx sdk.Tx, txBz []byte) bool +} + +type defaultTxSelector struct { + totalTxBytes uint64 + totalTxGas uint64 + selectedTxs [][]byte +} + +func NewDefaultTxSelector() TxSelector { + return &defaultTxSelector{} +} + +func (ts *defaultTxSelector) SelectedTxs() [][]byte { + txs := make([][]byte, len(ts.selectedTxs)) + copy(txs, ts.selectedTxs) + return txs +} + +func (ts *defaultTxSelector) Clear() { + ts.totalTxBytes = 0 + ts.totalTxGas = 0 + ts.selectedTxs = nil +} + +func (ts *defaultTxSelector) SelectTxForProposal(maxTxBytes, maxBlockGas uint64, memTx sdk.Tx, txBz []byte) bool { + txSize := uint64(len(txBz)) + + var txGasLimit uint64 + if memTx != nil { + if gasTx, ok := memTx.(GasTx); ok { + txGasLimit = gasTx.GetGas() + } + } + + // only add the transaction to the proposal if we have enough capacity + if (txSize + ts.totalTxBytes) <= maxTxBytes { + // If there is a max block gas limit, add the tx only if the limit has + // not been met. + if maxBlockGas > 0 { + if (txGasLimit + ts.totalTxGas) <= maxBlockGas { + ts.totalTxGas += txGasLimit + ts.totalTxBytes += txSize + ts.selectedTxs = append(ts.selectedTxs, txBz) + } + } else { + ts.totalTxBytes += txSize + ts.selectedTxs = append(ts.selectedTxs, txBz) + } + } + + // check if we've reached capacity; if so, we cannot select any more transactions + return ts.totalTxBytes >= maxTxBytes || (maxBlockGas > 0 && (ts.totalTxGas >= maxBlockGas)) +} diff --git a/app/app.go b/app/app.go index 7b3605f8..7aeb3ec4 100644 --- a/app/app.go +++ b/app/app.go @@ -292,21 +292,24 @@ type App struct { func init() { } +func NewBaseApp(logger tmlog.Logger, db dbm.DB, encodingConfig chainparams.EncodingConfig, + baseAppOptions ...func(*baseapp.BaseApp)) *baseapp.BaseApp { + bApp := baseapp.NewBaseApp(chaincfg.AppName, logger, db, encodingConfig.TxConfig.TxDecoder(), baseAppOptions...) + return bApp +} + // NewApp returns a reference to an initialized App. func NewApp( - logger tmlog.Logger, - db dbm.DB, homePath string, traceStore io.Writer, encodingConfig chainparams.EncodingConfig, options Options, - baseAppOptions ...func(*baseapp.BaseApp), + bApp *baseapp.BaseApp, ) *App { appCodec := encodingConfig.Marshaler legacyAmino := encodingConfig.Amino interfaceRegistry := encodingConfig.InterfaceRegistry - bApp := baseapp.NewBaseApp(chaincfg.AppName, logger, db, encodingConfig.TxConfig.TxDecoder(), baseAppOptions...) bApp.SetCommitMultiStoreTracer(traceStore) bApp.SetVersion(version.Version) bApp.SetInterfaceRegistry(interfaceRegistry) diff --git a/app/priority_nonce.go b/app/priority_nonce.go new file mode 100644 index 00000000..8a023b8c --- /dev/null +++ b/app/priority_nonce.go @@ -0,0 +1,488 @@ +package app + +import ( + "context" + "fmt" + "math" + + "github.com/huandu/skiplist" + + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/types/mempool" + "github.com/cosmos/cosmos-sdk/x/auth/signing" + gethtypes "github.com/ethereum/go-ethereum/core/types" + evmtypes "github.com/evmos/ethermint/x/evm/types" +) + +var ( + _ mempool.Mempool = (*PriorityNonceMempool)(nil) + _ mempool.Iterator = (*PriorityNonceIterator)(nil) +) + +// PriorityNonceMempool is a mempool implementation that stores txs +// in a partially ordered set by 2 dimensions: priority, and sender-nonce +// (sequence number). Internally it uses one priority ordered skip list and one +// skip list per sender ordered by sender-nonce (sequence number). When there +// are multiple txs from the same sender, they are not always comparable by +// priority to other sender txs and must be partially ordered by both sender-nonce +// and priority. +type PriorityNonceMempool struct { + priorityIndex *skiplist.SkipList + priorityCounts map[int64]int + senderIndices map[string]*skiplist.SkipList + scores map[txMeta]txMeta + onRead func(tx sdk.Tx) + txReplacement func(op, np int64, oTx, nTx sdk.Tx) bool + maxTx int +} + +type PriorityNonceIterator struct { + senderCursors map[string]*skiplist.Element + nextPriority int64 + sender string + priorityNode *skiplist.Element + mempool *PriorityNonceMempool +} + +// txMeta stores transaction metadata used in indices +type txMeta struct { + // nonce is the sender's sequence number + nonce uint64 + // priority is the transaction's priority + priority int64 + // sender is the transaction's sender + sender string + // weight is the transaction's weight, used as a tiebreaker for transactions with the same priority + weight int64 + // senderElement is a pointer to the transaction's element in the sender index + senderElement *skiplist.Element +} + +// txMetaLess is a comparator for txKeys that first compares priority, then weight, +// then sender, then nonce, uniquely identifying a transaction. +// +// Note, txMetaLess is used as the comparator in the priority index. +func txMetaLess(a, b any) int { + keyA := a.(txMeta) + keyB := b.(txMeta) + res := skiplist.Int64.Compare(keyA.priority, keyB.priority) + if res != 0 { + return res + } + + // Weight is used as a tiebreaker for transactions with the same priority. + // Weight is calculated in a single pass in .Select(...) and so will be 0 + // on .Insert(...). + res = skiplist.Int64.Compare(keyA.weight, keyB.weight) + if res != 0 { + return res + } + + // Because weight will be 0 on .Insert(...), we must also compare sender and + // nonce to resolve priority collisions. If we didn't then transactions with + // the same priority would overwrite each other in the priority index. + res = skiplist.String.Compare(keyA.sender, keyB.sender) + if res != 0 { + return res + } + + return skiplist.Uint64.Compare(keyA.nonce, keyB.nonce) +} + +type PriorityNonceMempoolOption func(*PriorityNonceMempool) + +// PriorityNonceWithOnRead sets a callback to be called when a tx is read from +// the mempool. +func PriorityNonceWithOnRead(onRead func(tx sdk.Tx)) PriorityNonceMempoolOption { + return func(mp *PriorityNonceMempool) { + mp.onRead = onRead + } +} + +// PriorityNonceWithTxReplacement sets a callback to be called when duplicated +// transaction nonce detected during mempool insert. An application can define a +// transaction replacement rule based on tx priority or certain transaction fields. +func PriorityNonceWithTxReplacement(txReplacementRule func(op, np int64, oTx, nTx sdk.Tx) bool) PriorityNonceMempoolOption { + return func(mp *PriorityNonceMempool) { + mp.txReplacement = txReplacementRule + } +} + +// PriorityNonceWithMaxTx sets the maximum number of transactions allowed in the +// mempool with the semantics: +// +// <0: disabled, `Insert` is a no-op +// 0: unlimited +// >0: maximum number of transactions allowed +func PriorityNonceWithMaxTx(maxTx int) PriorityNonceMempoolOption { + return func(mp *PriorityNonceMempool) { + mp.maxTx = maxTx + } +} + +// DefaultPriorityMempool returns a priorityNonceMempool with no options. +func DefaultPriorityMempool() mempool.Mempool { + return NewPriorityMempool() +} + +// NewPriorityMempool returns the SDK's default mempool implementation which +// returns txs in a partial order by 2 dimensions; priority, and sender-nonce. +func NewPriorityMempool(opts ...PriorityNonceMempoolOption) *PriorityNonceMempool { + mp := &PriorityNonceMempool{ + priorityIndex: skiplist.New(skiplist.LessThanFunc(txMetaLess)), + priorityCounts: make(map[int64]int), + senderIndices: make(map[string]*skiplist.SkipList), + scores: make(map[txMeta]txMeta), + } + + for _, opt := range opts { + opt(mp) + } + + return mp +} + +// NextSenderTx returns the next transaction for a given sender by nonce order, +// i.e. the next valid transaction for the sender. If no such transaction exists, +// nil will be returned. +func (mp *PriorityNonceMempool) NextSenderTx(sender string) sdk.Tx { + senderIndex, ok := mp.senderIndices[sender] + if !ok { + return nil + } + + cursor := senderIndex.Front() + return cursor.Value.(sdk.Tx) +} + +// Insert attempts to insert a Tx into the app-side mempool in O(log n) time, +// returning an error if unsuccessful. Sender and nonce are derived from the +// transaction's first signature. +// +// Transactions are unique by sender and nonce. Inserting a duplicate tx is an +// O(log n) no-op. +// +// Inserting a duplicate tx with a different priority overwrites the existing tx, +// changing the total order of the mempool. +func (mp *PriorityNonceMempool) Insert(ctx context.Context, tx sdk.Tx) error { + if mp.maxTx > 0 && mp.CountTx() >= mp.maxTx { + return mempool.ErrMempoolTxMaxCapacity + } else if mp.maxTx < 0 { + return nil + } + + sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2() + if err != nil { + return err + } + sdkContext := sdk.UnwrapSDKContext(ctx) + priority := sdkContext.Priority() + + var sender string + var nonce uint64 + + if len(sigs) == 0 { + msgs := tx.GetMsgs() + if len(msgs) != 1 { + return fmt.Errorf("tx must have at least one signer") + } + msgEthTx, ok := msgs[0].(*evmtypes.MsgEthereumTx) + if !ok { + return fmt.Errorf("tx must have at least one signer") + } + ethTx := msgEthTx.AsTransaction() + signer := gethtypes.NewEIP2930Signer(ethTx.ChainId()) + ethSender, err := signer.Sender(ethTx) + if err != nil { + return fmt.Errorf("tx must have at least one signer") + } + sender = sdk.AccAddress(ethSender.Bytes()).String() + nonce = ethTx.Nonce() + } else { + sig := sigs[0] + sender = sdk.AccAddress(sig.PubKey.Address()).String() + nonce = sig.Sequence + } + + key := txMeta{nonce: nonce, priority: priority, sender: sender} + + senderIndex, ok := mp.senderIndices[sender] + if !ok { + senderIndex = skiplist.New(skiplist.LessThanFunc(func(a, b any) int { + return skiplist.Uint64.Compare(b.(txMeta).nonce, a.(txMeta).nonce) + })) + + // initialize sender index if not found + mp.senderIndices[sender] = senderIndex + } + + // Since mp.priorityIndex is scored by priority, then sender, then nonce, a + // changed priority will create a new key, so we must remove the old key and + // re-insert it to avoid having the same tx with different priorityIndex indexed + // twice in the mempool. + // + // This O(log n) remove operation is rare and only happens when a tx's priority + // changes. + sk := txMeta{nonce: nonce, sender: sender} + if oldScore, txExists := mp.scores[sk]; txExists { + if mp.txReplacement != nil && !mp.txReplacement(oldScore.priority, priority, senderIndex.Get(key).Value.(sdk.Tx), tx) { + return fmt.Errorf( + "tx doesn't fit the replacement rule, oldPriority: %v, newPriority: %v, oldTx: %v, newTx: %v", + oldScore.priority, + priority, + senderIndex.Get(key).Value.(sdk.Tx), + tx, + ) + } + + mp.priorityIndex.Remove(txMeta{ + nonce: nonce, + sender: sender, + priority: oldScore.priority, + weight: oldScore.weight, + }) + mp.priorityCounts[oldScore.priority]-- + } + + mp.priorityCounts[priority]++ + + // Since senderIndex is scored by nonce, a changed priority will overwrite the + // existing key. + key.senderElement = senderIndex.Set(key, tx) + + mp.scores[sk] = txMeta{priority: priority} + mp.priorityIndex.Set(key, tx) + + return nil +} + +func (i *PriorityNonceIterator) iteratePriority() mempool.Iterator { + // beginning of priority iteration + if i.priorityNode == nil { + i.priorityNode = i.mempool.priorityIndex.Front() + } else { + i.priorityNode = i.priorityNode.Next() + } + + // end of priority iteration + if i.priorityNode == nil { + return nil + } + + i.sender = i.priorityNode.Key().(txMeta).sender + + nextPriorityNode := i.priorityNode.Next() + if nextPriorityNode != nil { + i.nextPriority = nextPriorityNode.Key().(txMeta).priority + } else { + i.nextPriority = math.MinInt64 + } + + return i.Next() +} + +func (i *PriorityNonceIterator) Next() mempool.Iterator { + if i.priorityNode == nil { + return nil + } + + cursor, ok := i.senderCursors[i.sender] + if !ok { + // beginning of sender iteration + cursor = i.mempool.senderIndices[i.sender].Front() + } else { + // middle of sender iteration + cursor = cursor.Next() + } + + // end of sender iteration + if cursor == nil { + return i.iteratePriority() + } + + key := cursor.Key().(txMeta) + + // We've reached a transaction with a priority lower than the next highest + // priority in the pool. + if key.priority < i.nextPriority { + return i.iteratePriority() + } else if key.priority == i.nextPriority && i.priorityNode.Next() != nil { + // Weight is incorporated into the priority index key only (not sender index) + // so we must fetch it here from the scores map. + weight := i.mempool.scores[txMeta{nonce: key.nonce, sender: key.sender}].weight + if weight < i.priorityNode.Next().Key().(txMeta).weight { + return i.iteratePriority() + } + } + + i.senderCursors[i.sender] = cursor + return i +} + +func (i *PriorityNonceIterator) Tx() sdk.Tx { + return i.senderCursors[i.sender].Value.(sdk.Tx) +} + +// Select returns a set of transactions from the mempool, ordered by priority +// and sender-nonce in O(n) time. The passed in list of transactions are ignored. +// This is a readonly operation, the mempool is not modified. +// +// The maxBytes parameter defines the maximum number of bytes of transactions to +// return. +// +// NOTE: It is not safe to use this iterator while removing transactions from +// the underlying mempool. +func (mp *PriorityNonceMempool) Select(_ context.Context, _ [][]byte) mempool.Iterator { + if mp.priorityIndex.Len() == 0 { + return nil + } + + mp.reorderPriorityTies() + + iterator := &PriorityNonceIterator{ + mempool: mp, + senderCursors: make(map[string]*skiplist.Element), + } + + return iterator.iteratePriority() +} + +type reorderKey struct { + deleteKey txMeta + insertKey txMeta + tx sdk.Tx +} + +func (mp *PriorityNonceMempool) reorderPriorityTies() { + node := mp.priorityIndex.Front() + + var reordering []reorderKey + for node != nil { + key := node.Key().(txMeta) + if mp.priorityCounts[key.priority] > 1 { + newKey := key + newKey.weight = senderWeight(key.senderElement) + reordering = append(reordering, reorderKey{deleteKey: key, insertKey: newKey, tx: node.Value.(sdk.Tx)}) + } + + node = node.Next() + } + + for _, k := range reordering { + mp.priorityIndex.Remove(k.deleteKey) + delete(mp.scores, txMeta{nonce: k.deleteKey.nonce, sender: k.deleteKey.sender}) + mp.priorityIndex.Set(k.insertKey, k.tx) + mp.scores[txMeta{nonce: k.insertKey.nonce, sender: k.insertKey.sender}] = k.insertKey + } +} + +// senderWeight returns the weight of a given tx (t) at senderCursor. Weight is +// defined as the first (nonce-wise) same sender tx with a priority not equal to +// t. It is used to resolve priority collisions, that is when 2 or more txs from +// different senders have the same priority. +func senderWeight(senderCursor *skiplist.Element) int64 { + if senderCursor == nil { + return 0 + } + + weight := senderCursor.Key().(txMeta).priority + senderCursor = senderCursor.Next() + for senderCursor != nil { + p := senderCursor.Key().(txMeta).priority + if p != weight { + weight = p + } + + senderCursor = senderCursor.Next() + } + + return weight +} + +// CountTx returns the number of transactions in the mempool. +func (mp *PriorityNonceMempool) CountTx() int { + return mp.priorityIndex.Len() +} + +// Remove removes a transaction from the mempool in O(log n) time, returning an +// error if unsuccessful. +func (mp *PriorityNonceMempool) Remove(tx sdk.Tx) error { + sigs, err := tx.(signing.SigVerifiableTx).GetSignaturesV2() + if err != nil { + return err + } + var sender string + var nonce uint64 + if len(sigs) == 0 { + msgs := tx.GetMsgs() + if len(msgs) != 1 { + return fmt.Errorf("attempted to remove a tx with no signatures") + } + msgEthTx, ok := msgs[0].(*evmtypes.MsgEthereumTx) + if !ok { + return fmt.Errorf("attempted to remove a tx with no signatures") + } + ethTx := msgEthTx.AsTransaction() + signer := gethtypes.NewEIP2930Signer(ethTx.ChainId()) + ethSender, err := signer.Sender(ethTx) + if err != nil { + return fmt.Errorf("attempted to remove a tx with no signatures") + } + sender = sdk.AccAddress(ethSender.Bytes()).String() + nonce = ethTx.Nonce() + } else { + sig := sigs[0] + sender = sdk.AccAddress(sig.PubKey.Address()).String() + nonce = sig.Sequence + } + + scoreKey := txMeta{nonce: nonce, sender: sender} + score, ok := mp.scores[scoreKey] + if !ok { + return mempool.ErrTxNotFound + } + tk := txMeta{nonce: nonce, priority: score.priority, sender: sender, weight: score.weight} + + senderTxs, ok := mp.senderIndices[sender] + if !ok { + return fmt.Errorf("sender %s not found", sender) + } + + mp.priorityIndex.Remove(tk) + senderTxs.Remove(tk) + delete(mp.scores, scoreKey) + mp.priorityCounts[score.priority]-- + + return nil +} + +func IsEmpty(mempool mempool.Mempool) error { + mp := mempool.(*PriorityNonceMempool) + if mp.priorityIndex.Len() != 0 { + return fmt.Errorf("priorityIndex not empty") + } + + var countKeys []int64 + for k := range mp.priorityCounts { + countKeys = append(countKeys, k) + } + + for _, k := range countKeys { + if mp.priorityCounts[k] != 0 { + return fmt.Errorf("priorityCounts not zero at %v, got %v", k, mp.priorityCounts[k]) + } + } + + var senderKeys []string + for k := range mp.senderIndices { + senderKeys = append(senderKeys, k) + } + + for _, k := range senderKeys { + if mp.senderIndices[k].Len() != 0 { + return fmt.Errorf("senderIndex not empty for sender %v", k) + } + } + + return nil +} diff --git a/app/test_common.go b/app/test_common.go index b822ddd2..d72bd3d6 100644 --- a/app/test_common.go +++ b/app/test_common.go @@ -92,9 +92,10 @@ func NewTestAppFromSealed() TestApp { encCfg := MakeEncodingConfig() + bApp := NewBaseApp(log.NewNopLogger(), db, encCfg, baseapp.SetChainID(TestChainId)) app := NewApp( - log.NewNopLogger(), db, chaincfg.DefaultNodeHome, nil, - encCfg, DefaultOptions, baseapp.SetChainID(TestChainId), + chaincfg.DefaultNodeHome, nil, + encCfg, DefaultOptions, bApp, ) return TestApp{App: *app} } diff --git a/cmd/0gchaind/app.go b/cmd/0gchaind/app.go index 89a9366e..f09ec847 100644 --- a/cmd/0gchaind/app.go +++ b/cmd/0gchaind/app.go @@ -107,18 +107,9 @@ func (ac appCreator) newApp( skipLoadLatest = cast.ToBool(appOpts.Get(flagSkipLoadLatest)) } - return app.NewApp( - logger, db, homeDir, traceStore, ac.encodingConfig, - app.Options{ - SkipLoadLatest: skipLoadLatest, - SkipUpgradeHeights: skipUpgradeHeights, - SkipGenesisInvariants: cast.ToBool(appOpts.Get(crisis.FlagSkipGenesisInvariants)), - InvariantCheckPeriod: cast.ToUint(appOpts.Get(server.FlagInvCheckPeriod)), - MempoolEnableAuth: mempoolEnableAuth, - MempoolAuthAddresses: mempoolAuthAddresses, - EVMTrace: cast.ToString(appOpts.Get(ethermintflags.EVMTracer)), - EVMMaxGasWanted: cast.ToUint64(appOpts.Get(ethermintflags.EVMMaxTxGasWanted)), - }, + mempool := app.NewPriorityMempool() + + bApp := app.NewBaseApp(logger, db, ac.encodingConfig, baseapp.SetPruning(pruningOpts), baseapp.SetMinGasPrices(strings.Replace(cast.ToString(appOpts.Get(server.FlagMinGasPrices)), ";", ",", -1)), baseapp.SetHaltHeight(cast.ToUint64(appOpts.Get(server.FlagHaltHeight))), @@ -132,7 +123,28 @@ func (ac appCreator) newApp( baseapp.SetIAVLDisableFastNode(cast.ToBool(iavlDisableFastNode)), baseapp.SetIAVLLazyLoading(cast.ToBool(appOpts.Get(server.FlagIAVLLazyLoading))), baseapp.SetChainID(chainID), + baseapp.SetMempool(mempool), + ) + bApp.SetTxEncoder(ac.encodingConfig.TxConfig.TxEncoder()) + abciProposalHandler := app.NewDefaultProposalHandler(mempool, bApp) + bApp.SetPrepareProposal(abciProposalHandler.PrepareProposalHandler()) + + newApp := app.NewApp( + homeDir, traceStore, ac.encodingConfig, + app.Options{ + SkipLoadLatest: skipLoadLatest, + SkipUpgradeHeights: skipUpgradeHeights, + SkipGenesisInvariants: cast.ToBool(appOpts.Get(crisis.FlagSkipGenesisInvariants)), + InvariantCheckPeriod: cast.ToUint(appOpts.Get(server.FlagInvCheckPeriod)), + MempoolEnableAuth: mempoolEnableAuth, + MempoolAuthAddresses: mempoolAuthAddresses, + EVMTrace: cast.ToString(appOpts.Get(ethermintflags.EVMTracer)), + EVMMaxGasWanted: cast.ToUint64(appOpts.Get(ethermintflags.EVMMaxTxGasWanted)), + }, + bApp, ) + + return newApp } // appExport writes out an app's state to json. @@ -157,13 +169,15 @@ func (ac appCreator) appExport( var tempApp *app.App if height != -1 { - tempApp = app.NewApp(logger, db, homePath, traceStore, ac.encodingConfig, options) + bApp := app.NewBaseApp(logger, db, ac.encodingConfig) + tempApp = app.NewApp(homePath, traceStore, ac.encodingConfig, options, bApp) if err := tempApp.LoadHeight(height); err != nil { return servertypes.ExportedApp{}, err } } else { - tempApp = app.NewApp(logger, db, homePath, traceStore, ac.encodingConfig, options) + bApp := app.NewBaseApp(logger, db, ac.encodingConfig) + tempApp = app.NewApp(homePath, traceStore, ac.encodingConfig, options, bApp) } return tempApp.ExportAppStateAndValidators(forZeroHeight, jailAllowedAddrs, modulesToExport) }