Skip to content

Commit

Permalink
mempool: reduce lock contention during CheckTx (cleanup) (#8983)
Browse files Browse the repository at this point in the history
The way this was originally structured, we reacquired the lock after issuing
the initial ABCI CheckTx call, only to immediately release it. Restructure the
code so that this redundant acquire is no longer necessary.
  • Loading branch information
M. J. Fromberger authored Jul 12, 2022
1 parent cb93d3b commit 9e64c95
Showing 1 changed file with 52 additions and 51 deletions.
103 changes: 52 additions & 51 deletions internal/mempool/v1/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,69 +177,70 @@ func (txmp *TxMempool) CheckTx(
// During the initial phase of CheckTx, we do not need to modify any state.
// A transaction will not actually be added to the mempool until it survives
// a call to the ABCI CheckTx method and size constraint checks.
txmp.mtx.RLock()
defer txmp.mtx.RUnlock()
height, err := func() (int64, error) {
txmp.mtx.RLock()
defer txmp.mtx.RUnlock()

// Reject transactions in excess of the configured maximum transaction size.
if len(tx) > txmp.config.MaxTxBytes {
return types.ErrTxTooLarge{Max: txmp.config.MaxTxBytes, Actual: len(tx)}
}
// Reject transactions in excess of the configured maximum transaction size.
if len(tx) > txmp.config.MaxTxBytes {
return 0, types.ErrTxTooLarge{Max: txmp.config.MaxTxBytes, Actual: len(tx)}
}

// If a precheck hook is defined, call it before invoking the application.
if txmp.preCheck != nil {
if err := txmp.preCheck(tx); err != nil {
return types.ErrPreCheck{Reason: err}
// If a precheck hook is defined, call it before invoking the application.
if txmp.preCheck != nil {
if err := txmp.preCheck(tx); err != nil {
return 0, types.ErrPreCheck{Reason: err}
}
}
}

// Early exit if the proxy connection has an error.
if err := txmp.proxyAppConn.Error(); err != nil {
return err
}
// Early exit if the proxy connection has an error.
if err := txmp.proxyAppConn.Error(); err != nil {
return 0, err
}

txKey := tx.Key()
txKey := tx.Key()

// Check for the transaction in the cache.
if !txmp.cache.Push(tx) {
// If the cached transaction is also in the pool, record its sender.
if elt, ok := txmp.txByKey[txKey]; ok {
w := elt.Value.(*WrappedTx)
w.SetPeer(txInfo.SenderID)
// Check for the transaction in the cache.
if !txmp.cache.Push(tx) {
// If the cached transaction is also in the pool, record its sender.
if elt, ok := txmp.txByKey[txKey]; ok {
w := elt.Value.(*WrappedTx)
w.SetPeer(txInfo.SenderID)
}
return 0, types.ErrTxInCache
}
return types.ErrTxInCache
return txmp.height, nil
}()
if err != nil {
return err
}

// Initiate an ABCI CheckTx for this transaction. The callback is
// responsible for adding the transaction to the pool if it survives.
return func() error {
// N.B.: We have to issue the call outside the lock. In a local client,
// even an "async" call invokes its callback immediately which will make
// the callback deadlock trying to acquire the same lock. This isn't a
// problem with out-of-process calls, but this has to work for both.
height := txmp.height
txmp.mtx.RUnlock()
defer txmp.mtx.RLock()

reqRes, err := txmp.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{Tx: tx})
if err != nil {
txmp.cache.Remove(tx)
return err
//
// N.B.: We have to issue the call outside the lock. In a local client,
// even an "async" call invokes its callback immediately which will make
// the callback deadlock trying to acquire the same lock. This isn't a
// problem with out-of-process calls, but this has to work for both.
reqRes, err := txmp.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{Tx: tx})
if err != nil {
txmp.cache.Remove(tx)
return err
}
reqRes.SetCallback(func(res *abci.Response) {
wtx := &WrappedTx{
tx: tx,
hash: tx.Key(),
timestamp: time.Now().UTC(),
height: height,
}
reqRes.SetCallback(func(res *abci.Response) {
wtx := &WrappedTx{
tx: tx,
hash: txKey,
timestamp: time.Now().UTC(),
height: height,
}
wtx.SetPeer(txInfo.SenderID)
txmp.initialTxCallback(wtx, res)
if cb != nil {
cb(res)
}
})
return nil
}()
wtx.SetPeer(txInfo.SenderID)
txmp.initialTxCallback(wtx, res)
if cb != nil {
cb(res)
}
})
return nil
}

// RemoveTxByKey removes the transaction with the specified key from the
Expand Down

0 comments on commit 9e64c95

Please sign in to comment.