Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(mempool): relaxed locking of mempool #737

Merged
merged 5 commits into from
Mar 6, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 100 additions & 42 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,11 +448,29 @@ func (txmp *TxMempool) Update(
// transactions are evicted.
//
// Finally, the new transaction is added and size stats updated.
//
// Note: due to locking appoach we take, it is possible that meanwhile another thread evicted the same items.
// This means we can put put slightly more items into the mempool, but it has significant performance impact
func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.ResponseCheckTx) error {
txmp.mtx.Lock()
defer txmp.mtx.Unlock()

var err error
// RLock here.

// When the mempool is full, we we don't need a writable lock. RLocking here should add significant
// performance boost in this case, as threads will not need to wait to obtain writable lock.
//
// A disadvantage is that we need to relock RW when we need to evict peers, what introduces race condition
// when two threads want to evict the same transactions. We choose to manage that race condition to gain some
// performance.
txmp.mtx.RLock()
rlocked := true
defer func() {
if rlocked {
txmp.mtx.RUnlock()
} else {
txmp.mtx.Unlock()
}
}()

if txmp.postCheck != nil {
err = txmp.postCheck(wtx.tx, checkTxRes)
}
Expand Down Expand Up @@ -524,10 +542,43 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.Respon
}
}

haveSpace := victimBytes >= wtx.Size()
if haveSpace {
// Sort lowest priority items first so they will be evicted first. Break
// ties in favor of newer items (to maintain FIFO semantics in a group).
sort.Slice(victims, func(i, j int) bool {
iw := victims[i].Value.(*WrappedTx)
jw := victims[j].Value.(*WrappedTx)
if iw.Priority() == jw.Priority() {
return iw.timestamp.After(jw.timestamp)
}
return iw.Priority() < jw.Priority()
})

txmp.logger.Debug("evicting lower-priority transactions",
"new_tx", tmstrings.LazySprintf("%X", wtx.tx.Hash()),
"new_priority", priority)

// Evict as many of the victims as necessary to make room.
// We need to drop RLock and Lock here, as from now on, we will be modifying the mempool.
// This introduces race condition which we handle inside evict()
if rlocked {
txmp.mtx.RUnlock()
txmp.mtx.Lock()
rlocked = false
}

haveSpace = txmp.evict(wtx.Size(), victims)

if !haveSpace {
txmp.logger.Debug("unexpected mempool eviction failure - possibly concurrent eviction happened")
}
}

// If there are no suitable eviction candidates, or the total size of
// those candidates is not enough to make room for the new transaction,
// drop the new one.
if len(victims) == 0 || victimBytes < wtx.Size() {
if !haveSpace {
txmp.cache.Remove(wtx.tx)
txmp.logger.Error(
"rejected valid incoming transaction; mempool is full",
Expand All @@ -540,49 +591,19 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.Respon
// fmt.Errorf("transaction rejected: mempool is full (%X)", wtx.tx.Hash())
return nil
}

txmp.logger.Debug("evicting lower-priority transactions",
"new_tx", tmstrings.LazySprintf("%X", wtx.tx.Hash()),
"new_priority", priority,
)

// Sort lowest priority items first so they will be evicted first. Break
// ties in favor of newer items (to maintain FIFO semantics in a group).
sort.Slice(victims, func(i, j int) bool {
iw := victims[i].Value.(*WrappedTx)
jw := victims[j].Value.(*WrappedTx)
if iw.Priority() == jw.Priority() {
return iw.timestamp.After(jw.timestamp)
}
return iw.Priority() < jw.Priority()
})

// Evict as many of the victims as necessary to make room.
var evictedBytes int64
for _, vic := range victims {
w := vic.Value.(*WrappedTx)

txmp.logger.Debug(
"evicted valid existing transaction; mempool full",
"old_tx", tmstrings.LazySprintf("%X", w.tx.Hash()),
"old_priority", w.priority,
)
txmp.removeTxByElement(vic)
txmp.cache.Remove(w.tx)
txmp.metrics.EvictedTxs.Add(1)

// We may not need to evict all the eligible transactions. Bail out
// early if we have made enough room.
evictedBytes += w.Size()
if evictedBytes >= wtx.Size() {
break
}
}
}

wtx.SetGasWanted(checkTxRes.GasWanted)
wtx.SetPriority(priority)
wtx.SetSender(sender)

// Ensure we have writable lock
if rlocked {
txmp.mtx.RUnlock()
txmp.mtx.Lock()
rlocked = false
}

txmp.insertTx(wtx)

txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size()))
Expand All @@ -594,10 +615,47 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.Respon
"height", txmp.height,
"num_txs", txmp.Size(),
)

txmp.notifyTxsAvailable()

return nil
}

// Remove victims from the mempool until we fee up to <size> bytes
// Returns true when enough victims were removed.
//
// Caller should hold writable lock
func (txmp *TxMempool) evict(size int64, victims []*clist.CElement) bool {
var evictedBytes int64
for _, vic := range victims {
w := vic.Value.(*WrappedTx)

if vic.Removed() {
// Race condition - some other thread already removed this item
// We handle it by just skipping this tx
continue
}

txmp.logger.Debug(
"evicted valid existing transaction; mempool full",
"old_tx", tmstrings.LazySprintf("%X", w.tx.Hash()),
"old_priority", w.priority,
)
txmp.removeTxByElement(vic)
txmp.cache.Remove(w.tx)
txmp.metrics.EvictedTxs.Add(1)

// We may not need to evict all the eligible transactions. Bail out
// early if we have made enough room.
evictedBytes += w.Size()
if evictedBytes >= size {
return true
}
}

return false
}

func (txmp *TxMempool) insertTx(wtx *WrappedTx) {
elt := txmp.txs.PushBack(wtx)
txmp.txByKey[wtx.tx.Key()] = elt
Expand Down
Loading