Skip to content

Commit

Permalink
remove lock
Browse files Browse the repository at this point in the history
  • Loading branch information
giskook committed Apr 29, 2024
1 parent 70be6d0 commit 677499a
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 37 deletions.
10 changes: 5 additions & 5 deletions sequencer/addrqueue_xlayer.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package sequencer

func (a *addrQueue) GetTxCount() int64 {
func (a *addrQueue) GetTxCount() uint64 {
if a == nil {
return 0
}
var readyTxCount int64
var readyTxCount uint64
if a.readyTx != nil {
readyTxCount = 1
}
notReadyTxCount := int64(len(a.notReadyTxs))
forcedTxCount := int64(len(a.forcedTxs))
pendingTxsToStoreCount := int64(len(a.pendingTxsToStore))
notReadyTxCount := uint64(len(a.notReadyTxs))
forcedTxCount := uint64(len(a.forcedTxs))
pendingTxsToStoreCount := uint64(len(a.pendingTxsToStore))

return readyTxCount + notReadyTxCount + forcedTxCount + pendingTxsToStoreCount
}
1 change: 1 addition & 0 deletions sequencer/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,5 @@ type workerInterface interface {
NewTxTracker(tx pool.Transaction, usedZKcounters state.ZKCounters, reservedZKCouners state.ZKCounters, ip string) (*TxTracker, error)
AddForcedTx(txHash common.Hash, addr common.Address)
DeleteForcedTx(txHash common.Hash, addr common.Address)
CountReadyTx() uint64
}
2 changes: 2 additions & 0 deletions sequencer/l2block.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,8 @@ func (f *finalizer) storeL2Block(ctx context.Context, l2Block *L2Block) error {
func (f *finalizer) finalizeWIPL2Block(ctx context.Context) {
log.Debugf("finalizing WIP L2 block [%d]", f.wipL2Block.trackingNum)

getPoolReadyTxCounter().setReadyTxCount(f.workerIntf.CountReadyTx())

prevTimestamp := f.wipL2Block.timestamp
prevL1InfoTreeIndex := f.wipL2Block.l1InfoTreeExitRoot.L1InfoTreeIndex

Expand Down
8 changes: 8 additions & 0 deletions sequencer/mock_worker_xlayer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package sequencer

// CountReadyTx provides a mock function with given fields:
func (_m *WorkerMock) CountReadyTx() uint64 {
_m.Called()
ret := _m.Called()
return ret.Get(0).(uint64)
}
35 changes: 8 additions & 27 deletions sequencer/pooltx_counter_xlayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ package sequencer

import (
"sync"
"sync/atomic"
)

// PoolReadyTxCounter is the pool tx counter
// PoolReadyTxCounter is the struct that holds the ready tx counter
type PoolReadyTxCounter struct {
// Ready is the number of ready transactions
Address map[string]int64

// RwMutex is the mutex for the pool tx counter
RwMutex sync.RWMutex
// Count is the number of ready transactions
Count uint64
}

var poolReadyTxCounterInst *PoolReadyTxCounter
Expand All @@ -23,28 +21,11 @@ func getPoolReadyTxCounter() *PoolReadyTxCounter {
return poolReadyTxCounterInst
}

func (ptx *PoolReadyTxCounter) set(addr string, count int64) {
ptx.RwMutex.Lock()
defer ptx.RwMutex.Unlock()
if ptx.Address == nil {
ptx.Address = make(map[string]int64)
}
ptx.Address[addr] = count
}

func (ptx *PoolReadyTxCounter) delete(addr string) {
ptx.RwMutex.Lock()
defer ptx.RwMutex.Unlock()
delete(ptx.Address, addr)
func (ptx *PoolReadyTxCounter) setReadyTxCount(count uint64) {
atomic.StoreUint64(&ptx.Count, count)
}

// Sum returns the sum of the ready tx counter
func (ptx *PoolReadyTxCounter) Sum() int64 {
ptx.RwMutex.RLock()
defer ptx.RwMutex.RUnlock()
var sum int64
for _, v := range ptx.Address {
sum += v
}
return sum
func (ptx *PoolReadyTxCounter) getReadyTxCount() uint64 {
return atomic.LoadUint64(&ptx.Count)
}
2 changes: 1 addition & 1 deletion sequencer/sequencer_xlayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (s *Sequencer) countPendingTx() {
}

func (s *Sequencer) updateReadyTxCount() {
err := s.pool.UpdateReadyTxCount(context.Background(), uint64(getPoolReadyTxCounter().Sum()))
err := s.pool.UpdateReadyTxCount(context.Background(), getPoolReadyTxCounter().getReadyTxCount())
if err != nil {
log.Errorf("error adding ready tx count: %v", err)
}
Expand Down
1 change: 0 additions & 1 deletion sequencer/txsorted_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ func (e *txSortedList) add(tx *TxTracker) bool {
func (e *txSortedList) delete(tx *TxTracker) bool {
e.mutex.Lock()
defer e.mutex.Unlock()
getPoolReadyTxCounter().delete(tx.FromStr)

if tx, found := e.list[tx.HashStr]; found {
sLen := len(e.sorted)
Expand Down
13 changes: 10 additions & 3 deletions sequencer/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type Worker struct {
batchConstraints state.BatchConstraintsCfg
readyTxsCond *timeoutCond
claimGp *big.Int

readyTxCounter map[string]uint64
}

// NewWorker creates an init a worker
Expand All @@ -33,6 +35,7 @@ func NewWorker(state stateInterface, constraints state.BatchConstraintsCfg, read
state: state,
batchConstraints: constraints,
readyTxsCond: readyTxsCond,
readyTxCounter: make(map[string]uint64),
}

return &w
Expand Down Expand Up @@ -107,11 +110,12 @@ func (w *Worker) AddTxTracker(ctx context.Context, tx *TxTracker) (replacedTx *T
if prevReadyTx != nil {
log.Debugf("prevReadyTx %s (nonce: %d, gasPrice: %d, addr: %s) deleted from TxSortedList", prevReadyTx.HashStr, prevReadyTx.Nonce, prevReadyTx.GasPrice, tx.FromStr)
w.txSortedList.delete(prevReadyTx)
w.deleteReadyTxCounter(prevReadyTx.FromStr)
}
if newReadyTx != nil {
log.Debugf("newReadyTx %s (nonce: %d, gasPrice: %d, addr: %s) added to TxSortedList", newReadyTx.HashStr, newReadyTx.Nonce, newReadyTx.GasPrice, tx.FromStr)
w.addTxToSortedList(newReadyTx)
getPoolReadyTxCounter().set(addr.fromStr, addr.GetTxCount())
w.setReadyTxCounter(addr.fromStr, addr.GetTxCount())
}

if repTx != nil {
Expand All @@ -132,11 +136,12 @@ func (w *Worker) applyAddressUpdate(from common.Address, fromNonce *uint64, from
if prevReadyTx != nil {
log.Debugf("prevReadyTx %s (nonce: %d, gasPrice: %d) deleted from TxSortedList", prevReadyTx.Hash.String(), prevReadyTx.Nonce, prevReadyTx.GasPrice)
w.txSortedList.delete(prevReadyTx)
w.deleteReadyTxCounter(prevReadyTx.FromStr)
}
if newReadyTx != nil {
log.Debugf("newReadyTx %s (nonce: %d, gasPrice: %d) added to TxSortedList", newReadyTx.Hash.String(), newReadyTx.Nonce, newReadyTx.GasPrice)
w.addTxToSortedList(newReadyTx)
getPoolReadyTxCounter().set(addrQueue.fromStr, addrQueue.GetTxCount())
w.setReadyTxCounter(addrQueue.fromStr, addrQueue.GetTxCount())
}

return newReadyTx, prevReadyTx, txsToDelete
Expand Down Expand Up @@ -203,6 +208,7 @@ func (w *Worker) DeleteTx(txHash common.Hash, addr common.Address) {
if deletedReadyTx != nil {
log.Debugf("tx %s deleted from TxSortedList", deletedReadyTx.Hash.String())
w.txSortedList.delete(deletedReadyTx)
w.deleteReadyTxCounter(deletedReadyTx.FromStr)
}
} else {
log.Warnf("addrQueue %s not found", addr.String())
Expand Down Expand Up @@ -343,7 +349,7 @@ func (w *Worker) GetBestFittingTx(resources state.BatchResources) (*TxTracker, e
wg.Wait()

if foundAt != -1 {
log.Debugf("best fitting tx %s found at index %d with gasPrice %d", tx.HashStr, foundAt, tx.GasPrice)
log.Infof("best fitting tx %s found at index %d with gasPrice %d", tx.HashStr, foundAt, tx.GasPrice)
if !tx.IsClaimTx {
w.claimGp = tx.GasPrice
}
Expand All @@ -367,6 +373,7 @@ func (w *Worker) ExpireTransactions(maxTime time.Duration) []*TxTracker {

if prevReadyTx != nil {
w.txSortedList.delete(prevReadyTx)
w.deleteReadyTxCounter(prevReadyTx.FromStr)
}

/*if addrQueue.IsEmpty() {
Expand Down
30 changes: 30 additions & 0 deletions sequencer/worker_xlayer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package sequencer

func (w *Worker) deleteReadyTxCounter(addr string) {
if w == nil || w.readyTxCounter == nil {
return
}
delete(w.readyTxCounter, addr)
}

func (w *Worker) setReadyTxCounter(addr string, count uint64) {
if w == nil || w.readyTxCounter == nil {
return
}
w.readyTxCounter[addr] = count
}

// CountReadyTx returns the number of ready transactions
func (w *Worker) CountReadyTx() uint64 {
if w == nil {
return 0
}
w.workerMutex.Lock()
defer w.workerMutex.Unlock()

var count uint64
for _, c := range w.readyTxCounter {
count += c
}
return count
}

0 comments on commit 677499a

Please sign in to comment.