From 677499a835a52aa6f20543e9319d5af4af9d5b25 Mon Sep 17 00:00:00 2001 From: zhangkai Date: Sat, 27 Apr 2024 22:14:09 +0800 Subject: [PATCH] remove lock --- sequencer/addrqueue_xlayer.go | 10 ++++----- sequencer/interfaces.go | 1 + sequencer/l2block.go | 2 ++ sequencer/mock_worker_xlayer.go | 8 +++++++ sequencer/pooltx_counter_xlayer.go | 35 +++++++----------------------- sequencer/sequencer_xlayer.go | 2 +- sequencer/txsorted_list.go | 1 - sequencer/worker.go | 13 ++++++++--- sequencer/worker_xlayer.go | 30 +++++++++++++++++++++++++ 9 files changed, 65 insertions(+), 37 deletions(-) create mode 100644 sequencer/mock_worker_xlayer.go create mode 100644 sequencer/worker_xlayer.go diff --git a/sequencer/addrqueue_xlayer.go b/sequencer/addrqueue_xlayer.go index 3b1bc0f17b..c700850d53 100644 --- a/sequencer/addrqueue_xlayer.go +++ b/sequencer/addrqueue_xlayer.go @@ -1,16 +1,16 @@ package sequencer -func (a *addrQueue) GetTxCount() int64 { +func (a *addrQueue) GetTxCount() uint64 { if a == nil { return 0 } - var readyTxCount int64 + var readyTxCount uint64 if a.readyTx != nil { readyTxCount = 1 } - notReadyTxCount := int64(len(a.notReadyTxs)) - forcedTxCount := int64(len(a.forcedTxs)) - pendingTxsToStoreCount := int64(len(a.pendingTxsToStore)) + notReadyTxCount := uint64(len(a.notReadyTxs)) + forcedTxCount := uint64(len(a.forcedTxs)) + pendingTxsToStoreCount := uint64(len(a.pendingTxsToStore)) return readyTxCount + notReadyTxCount + forcedTxCount + pendingTxsToStoreCount } diff --git a/sequencer/interfaces.go b/sequencer/interfaces.go index 77993abec2..36faf5891a 100644 --- a/sequencer/interfaces.go +++ b/sequencer/interfaces.go @@ -94,4 +94,5 @@ type workerInterface interface { NewTxTracker(tx pool.Transaction, usedZKcounters state.ZKCounters, reservedZKCouners state.ZKCounters, ip string) (*TxTracker, error) AddForcedTx(txHash common.Hash, addr common.Address) DeleteForcedTx(txHash common.Hash, addr common.Address) + CountReadyTx() uint64 } diff --git a/sequencer/l2block.go b/sequencer/l2block.go index bf56d64326..bdc1756c8d 100644 --- a/sequencer/l2block.go +++ b/sequencer/l2block.go @@ -446,6 +446,8 @@ func (f *finalizer) storeL2Block(ctx context.Context, l2Block *L2Block) error { func (f *finalizer) finalizeWIPL2Block(ctx context.Context) { log.Debugf("finalizing WIP L2 block [%d]", f.wipL2Block.trackingNum) + getPoolReadyTxCounter().setReadyTxCount(f.workerIntf.CountReadyTx()) + prevTimestamp := f.wipL2Block.timestamp prevL1InfoTreeIndex := f.wipL2Block.l1InfoTreeExitRoot.L1InfoTreeIndex diff --git a/sequencer/mock_worker_xlayer.go b/sequencer/mock_worker_xlayer.go new file mode 100644 index 0000000000..efb4563946 --- /dev/null +++ b/sequencer/mock_worker_xlayer.go @@ -0,0 +1,8 @@ +package sequencer + +// CountReadyTx provides a mock function with given fields: +func (_m *WorkerMock) CountReadyTx() uint64 { + _m.Called() + ret := _m.Called() + return ret.Get(0).(uint64) +} diff --git a/sequencer/pooltx_counter_xlayer.go b/sequencer/pooltx_counter_xlayer.go index 73a6018ea3..b9cfdf1a87 100644 --- a/sequencer/pooltx_counter_xlayer.go +++ b/sequencer/pooltx_counter_xlayer.go @@ -2,15 +2,13 @@ package sequencer import ( "sync" + "sync/atomic" ) -// PoolReadyTxCounter is the pool tx counter +// PoolReadyTxCounter is the struct that holds the ready tx counter type PoolReadyTxCounter struct { - // Ready is the number of ready transactions - Address map[string]int64 - - // RwMutex is the mutex for the pool tx counter - RwMutex sync.RWMutex + // Count is the number of ready transactions + Count uint64 } var poolReadyTxCounterInst *PoolReadyTxCounter @@ -23,28 +21,11 @@ func getPoolReadyTxCounter() *PoolReadyTxCounter { return poolReadyTxCounterInst } -func (ptx *PoolReadyTxCounter) set(addr string, count int64) { - ptx.RwMutex.Lock() - defer ptx.RwMutex.Unlock() - if ptx.Address == nil { - ptx.Address = make(map[string]int64) - } - ptx.Address[addr] = count -} - -func (ptx *PoolReadyTxCounter) delete(addr string) { - ptx.RwMutex.Lock() - defer ptx.RwMutex.Unlock() - delete(ptx.Address, addr) +func (ptx *PoolReadyTxCounter) setReadyTxCount(count uint64) { + atomic.StoreUint64(&ptx.Count, count) } // Sum returns the sum of the ready tx counter -func (ptx *PoolReadyTxCounter) Sum() int64 { - ptx.RwMutex.RLock() - defer ptx.RwMutex.RUnlock() - var sum int64 - for _, v := range ptx.Address { - sum += v - } - return sum +func (ptx *PoolReadyTxCounter) getReadyTxCount() uint64 { + return atomic.LoadUint64(&ptx.Count) } diff --git a/sequencer/sequencer_xlayer.go b/sequencer/sequencer_xlayer.go index 5b26c0982a..50d5749ddf 100644 --- a/sequencer/sequencer_xlayer.go +++ b/sequencer/sequencer_xlayer.go @@ -24,7 +24,7 @@ func (s *Sequencer) countPendingTx() { } func (s *Sequencer) updateReadyTxCount() { - err := s.pool.UpdateReadyTxCount(context.Background(), uint64(getPoolReadyTxCounter().Sum())) + err := s.pool.UpdateReadyTxCount(context.Background(), getPoolReadyTxCounter().getReadyTxCount()) if err != nil { log.Errorf("error adding ready tx count: %v", err) } diff --git a/sequencer/txsorted_list.go b/sequencer/txsorted_list.go index 5de333e679..c1de2e2147 100644 --- a/sequencer/txsorted_list.go +++ b/sequencer/txsorted_list.go @@ -40,7 +40,6 @@ func (e *txSortedList) add(tx *TxTracker) bool { func (e *txSortedList) delete(tx *TxTracker) bool { e.mutex.Lock() defer e.mutex.Unlock() - getPoolReadyTxCounter().delete(tx.FromStr) if tx, found := e.list[tx.HashStr]; found { sLen := len(e.sorted) diff --git a/sequencer/worker.go b/sequencer/worker.go index d36438cd14..65fb84af88 100644 --- a/sequencer/worker.go +++ b/sequencer/worker.go @@ -23,6 +23,8 @@ type Worker struct { batchConstraints state.BatchConstraintsCfg readyTxsCond *timeoutCond claimGp *big.Int + + readyTxCounter map[string]uint64 } // NewWorker creates an init a worker @@ -33,6 +35,7 @@ func NewWorker(state stateInterface, constraints state.BatchConstraintsCfg, read state: state, batchConstraints: constraints, readyTxsCond: readyTxsCond, + readyTxCounter: make(map[string]uint64), } return &w @@ -107,11 +110,12 @@ func (w *Worker) AddTxTracker(ctx context.Context, tx *TxTracker) (replacedTx *T if prevReadyTx != nil { log.Debugf("prevReadyTx %s (nonce: %d, gasPrice: %d, addr: %s) deleted from TxSortedList", prevReadyTx.HashStr, prevReadyTx.Nonce, prevReadyTx.GasPrice, tx.FromStr) w.txSortedList.delete(prevReadyTx) + w.deleteReadyTxCounter(prevReadyTx.FromStr) } if newReadyTx != nil { log.Debugf("newReadyTx %s (nonce: %d, gasPrice: %d, addr: %s) added to TxSortedList", newReadyTx.HashStr, newReadyTx.Nonce, newReadyTx.GasPrice, tx.FromStr) w.addTxToSortedList(newReadyTx) - getPoolReadyTxCounter().set(addr.fromStr, addr.GetTxCount()) + w.setReadyTxCounter(addr.fromStr, addr.GetTxCount()) } if repTx != nil { @@ -132,11 +136,12 @@ func (w *Worker) applyAddressUpdate(from common.Address, fromNonce *uint64, from if prevReadyTx != nil { log.Debugf("prevReadyTx %s (nonce: %d, gasPrice: %d) deleted from TxSortedList", prevReadyTx.Hash.String(), prevReadyTx.Nonce, prevReadyTx.GasPrice) w.txSortedList.delete(prevReadyTx) + w.deleteReadyTxCounter(prevReadyTx.FromStr) } if newReadyTx != nil { log.Debugf("newReadyTx %s (nonce: %d, gasPrice: %d) added to TxSortedList", newReadyTx.Hash.String(), newReadyTx.Nonce, newReadyTx.GasPrice) w.addTxToSortedList(newReadyTx) - getPoolReadyTxCounter().set(addrQueue.fromStr, addrQueue.GetTxCount()) + w.setReadyTxCounter(addrQueue.fromStr, addrQueue.GetTxCount()) } return newReadyTx, prevReadyTx, txsToDelete @@ -203,6 +208,7 @@ func (w *Worker) DeleteTx(txHash common.Hash, addr common.Address) { if deletedReadyTx != nil { log.Debugf("tx %s deleted from TxSortedList", deletedReadyTx.Hash.String()) w.txSortedList.delete(deletedReadyTx) + w.deleteReadyTxCounter(deletedReadyTx.FromStr) } } else { log.Warnf("addrQueue %s not found", addr.String()) @@ -343,7 +349,7 @@ func (w *Worker) GetBestFittingTx(resources state.BatchResources) (*TxTracker, e wg.Wait() if foundAt != -1 { - log.Debugf("best fitting tx %s found at index %d with gasPrice %d", tx.HashStr, foundAt, tx.GasPrice) + log.Infof("best fitting tx %s found at index %d with gasPrice %d", tx.HashStr, foundAt, tx.GasPrice) if !tx.IsClaimTx { w.claimGp = tx.GasPrice } @@ -367,6 +373,7 @@ func (w *Worker) ExpireTransactions(maxTime time.Duration) []*TxTracker { if prevReadyTx != nil { w.txSortedList.delete(prevReadyTx) + w.deleteReadyTxCounter(prevReadyTx.FromStr) } /*if addrQueue.IsEmpty() { diff --git a/sequencer/worker_xlayer.go b/sequencer/worker_xlayer.go new file mode 100644 index 0000000000..3286771bbc --- /dev/null +++ b/sequencer/worker_xlayer.go @@ -0,0 +1,30 @@ +package sequencer + +func (w *Worker) deleteReadyTxCounter(addr string) { + if w == nil || w.readyTxCounter == nil { + return + } + delete(w.readyTxCounter, addr) +} + +func (w *Worker) setReadyTxCounter(addr string, count uint64) { + if w == nil || w.readyTxCounter == nil { + return + } + w.readyTxCounter[addr] = count +} + +// CountReadyTx returns the number of ready transactions +func (w *Worker) CountReadyTx() uint64 { + if w == nil { + return 0 + } + w.workerMutex.Lock() + defer w.workerMutex.Unlock() + + var count uint64 + for _, c := range w.readyTxCounter { + count += c + } + return count +}