diff --git a/chain/messagepool/check.go b/chain/messagepool/check.go index b1e2a277846..a1097e7d1f2 100644 --- a/chain/messagepool/check.go +++ b/chain/messagepool/check.go @@ -32,14 +32,14 @@ func (mp *MessagePool) CheckMessages(ctx context.Context, protos []*api.MessageP // CheckPendingMessages performs a set of logical sets for all messages pending from a given actor func (mp *MessagePool) CheckPendingMessages(ctx context.Context, from address.Address) ([][]api.MessageCheckStatus, error) { var msgs []*types.Message - mp.lk.Lock() + mp.lk.RLock() mset, ok := mp.pending[from] if ok { for _, sm := range mset.msgs { msgs = append(msgs, &sm.Message) } } - mp.lk.Unlock() + mp.lk.RUnlock() if len(msgs) == 0 { return nil, nil @@ -58,7 +58,7 @@ func (mp *MessagePool) CheckReplaceMessages(ctx context.Context, replace []*type msgMap := make(map[address.Address]map[uint64]*types.Message) count := 0 - mp.lk.Lock() + mp.lk.RLock() for _, m := range replace { mmap, ok := msgMap[m.From] if !ok { @@ -76,7 +76,7 @@ func (mp *MessagePool) CheckReplaceMessages(ctx context.Context, replace []*type } mmap[m.Nonce] = m } - mp.lk.Unlock() + mp.lk.RUnlock() msgs := make([]*types.Message, 0, count) start := 0 @@ -103,9 +103,9 @@ func (mp *MessagePool) checkMessages(ctx context.Context, msgs []*types.Message, if mp.api.IsLite() { return nil, nil } - mp.curTsLk.Lock() + mp.curTsLk.RLock() curTs := mp.curTs - mp.curTsLk.Unlock() + mp.curTsLk.RUnlock() epoch := curTs.Height() + 1 @@ -143,7 +143,7 @@ func (mp *MessagePool) checkMessages(ctx context.Context, msgs []*types.Message, st, ok := state[m.From] if !ok { - mp.lk.Lock() + mp.lk.RLock() mset, ok := mp.pending[m.From] if ok && !interned { st = &actorState{nextNonce: mset.nextNonce, requiredFunds: mset.requiredFunds} @@ -151,14 +151,14 @@ func (mp *MessagePool) checkMessages(ctx context.Context, msgs []*types.Message, st.requiredFunds = new(stdbig.Int).Add(st.requiredFunds, m.Message.Value.Int) } state[m.From] = st - mp.lk.Unlock() + mp.lk.RUnlock() check.OK = true check.Hint = map[string]interface{}{ "nonce": st.nextNonce, } } else { - mp.lk.Unlock() + mp.lk.RUnlock() stateNonce, err := mp.getStateNonce(ctx, m.From, curTs) if err != nil { diff --git a/chain/messagepool/messagepool.go b/chain/messagepool/messagepool.go index 0d787bd50ef..b0e7b7e2b73 100644 --- a/chain/messagepool/messagepool.go +++ b/chain/messagepool/messagepool.go @@ -118,7 +118,7 @@ func init() { } type MessagePool struct { - lk sync.Mutex + lk sync.RWMutex ds dtypes.MetadataDS @@ -137,9 +137,9 @@ type MessagePool struct { // do NOT access this map directly, use getPendingMset, setPendingMset, deletePendingMset, forEachPending, and clearPending respectively pending map[address.Address]*msgSet - keyCache map[address.Address]address.Address + keyCache *lru.Cache[address.Address, address.Address] - curTsLk sync.Mutex // DO NOT LOCK INSIDE lk + curTsLk sync.RWMutex // DO NOT LOCK INSIDE lk curTs *types.TipSet cfgLk sync.RWMutex @@ -372,6 +372,7 @@ func New(ctx context.Context, api Provider, ds dtypes.MetadataDS, us stmgr.Upgra cache, _ := lru.New2Q[cid.Cid, crypto.Signature](build.BlsSignatureCacheSize) verifcache, _ := lru.New2Q[string, struct{}](build.VerifSigCacheSize) noncecache, _ := lru.New[nonceCacheKey, uint64](256) + keycache, _ := lru.New[address.Address, address.Address](1_000_000) cfg, err := loadConfig(ctx, ds) if err != nil { @@ -390,7 +391,7 @@ func New(ctx context.Context, api Provider, ds dtypes.MetadataDS, us stmgr.Upgra repubTrigger: make(chan struct{}, 1), localAddrs: make(map[address.Address]struct{}), pending: make(map[address.Address]*msgSet), - keyCache: make(map[address.Address]address.Address), + keyCache: keycache, minGasPrice: types.NewInt(0), getNtwkVersion: us.GetNtwkVersion, pruneTrigger: make(chan struct{}, 1), @@ -474,8 +475,8 @@ func (mp *MessagePool) TryForEachPendingMessage(f func(cid.Cid) error) error { func (mp *MessagePool) resolveToKey(ctx context.Context, addr address.Address) (address.Address, error) { // check the cache - a, f := mp.keyCache[addr] - if f { + a, ok := mp.keyCache.Get(addr) + if ok { return a, nil } @@ -486,8 +487,8 @@ func (mp *MessagePool) resolveToKey(ctx context.Context, addr address.Address) ( } // place both entries in the cache (may both be key addresses, which is fine) - mp.keyCache[addr] = ka - mp.keyCache[ka] = ka + mp.keyCache.Add(addr, ka) + mp.keyCache.Add(ka, ka) return ka, nil } @@ -763,7 +764,28 @@ func (mp *MessagePool) Add(ctx context.Context, m *types.SignedMessage) error { <-mp.addSema }() + mp.curTsLk.RLock() + tmpCurTs := mp.curTs + mp.curTsLk.RUnlock() + + //ensures computations are cached without holding lock + _, _ = mp.api.GetActorAfter(m.Message.From, tmpCurTs) + _, _ = mp.getStateNonce(ctx, m.Message.From, tmpCurTs) + mp.curTsLk.Lock() + if tmpCurTs == mp.curTs { + //with the lock enabled, mp.curTs is the same Ts as we just had, so we know that our computations are cached + } else { + //curTs has been updated so we want to cache the new one: + tmpCurTs = mp.curTs + //we want to release the lock, cache the computations then grab it again + mp.curTsLk.Unlock() + _, _ = mp.api.GetActorAfter(m.Message.From, tmpCurTs) + _, _ = mp.getStateNonce(ctx, m.Message.From, tmpCurTs) + mp.curTsLk.Lock() + //now that we have the lock, we continue, we could do this as a loop forever, but that's bad to loop forever, and this was added as an optimization and it seems once is enough because the computation < block time + } + defer mp.curTsLk.Unlock() _, err = mp.addTs(ctx, m, mp.curTs, false, false) @@ -852,9 +874,6 @@ func (mp *MessagePool) addTs(ctx context.Context, m *types.SignedMessage, curTs return false, xerrors.Errorf("minimum expected nonce is %d: %w", snonce, ErrNonceTooLow) } - mp.lk.Lock() - defer mp.lk.Unlock() - senderAct, err := mp.api.GetActorAfter(m.Message.From, curTs) if err != nil { return false, xerrors.Errorf("failed to get sender actor: %w", err) @@ -869,6 +888,9 @@ func (mp *MessagePool) addTs(ctx context.Context, m *types.SignedMessage, curTs return false, xerrors.Errorf("sender actor %s is not a valid top-level sender", m.Message.From) } + mp.lk.Lock() + defer mp.lk.Unlock() + publish, err := mp.verifyMsgBeforeAdd(ctx, m, curTs, local) if err != nil { return false, xerrors.Errorf("verify msg failed: %w", err) @@ -1001,19 +1023,19 @@ func (mp *MessagePool) addLocked(ctx context.Context, m *types.SignedMessage, st } func (mp *MessagePool) GetNonce(ctx context.Context, addr address.Address, _ types.TipSetKey) (uint64, error) { - mp.curTsLk.Lock() - defer mp.curTsLk.Unlock() + mp.curTsLk.RLock() + defer mp.curTsLk.RUnlock() - mp.lk.Lock() - defer mp.lk.Unlock() + mp.lk.RLock() + defer mp.lk.RUnlock() return mp.getNonceLocked(ctx, addr, mp.curTs) } // GetActor should not be used. It is only here to satisfy interface mess caused by lite node handling func (mp *MessagePool) GetActor(_ context.Context, addr address.Address, _ types.TipSetKey) (*types.Actor, error) { - mp.curTsLk.Lock() - defer mp.curTsLk.Unlock() + mp.curTsLk.RLock() + defer mp.curTsLk.RUnlock() return mp.api.GetActorAfter(addr, mp.curTs) } @@ -1164,11 +1186,11 @@ func (mp *MessagePool) remove(ctx context.Context, from address.Address, nonce u } func (mp *MessagePool) Pending(ctx context.Context) ([]*types.SignedMessage, *types.TipSet) { - mp.curTsLk.Lock() - defer mp.curTsLk.Unlock() + mp.curTsLk.RLock() + defer mp.curTsLk.RUnlock() - mp.lk.Lock() - defer mp.lk.Unlock() + mp.lk.RLock() + defer mp.lk.RUnlock() return mp.allPending(ctx) } @@ -1184,11 +1206,11 @@ func (mp *MessagePool) allPending(ctx context.Context) ([]*types.SignedMessage, } func (mp *MessagePool) PendingFor(ctx context.Context, a address.Address) ([]*types.SignedMessage, *types.TipSet) { - mp.curTsLk.Lock() - defer mp.curTsLk.Unlock() + mp.curTsLk.RLock() + defer mp.curTsLk.RUnlock() - mp.lk.Lock() - defer mp.lk.Unlock() + mp.lk.RLock() + defer mp.lk.RUnlock() return mp.pendingFor(ctx, a), mp.curTs } @@ -1237,9 +1259,9 @@ func (mp *MessagePool) HeadChange(ctx context.Context, revert []*types.TipSet, a maybeRepub := func(cid cid.Cid) { if !repubTrigger { - mp.lk.Lock() + mp.lk.RLock() _, republished := mp.republished[cid] - mp.lk.Unlock() + mp.lk.RUnlock() if republished { repubTrigger = true } @@ -1310,9 +1332,9 @@ func (mp *MessagePool) HeadChange(ctx context.Context, revert []*types.TipSet, a } if len(revert) > 0 && futureDebug { - mp.lk.Lock() + mp.lk.RLock() msgs, ts := mp.allPending(ctx) - mp.lk.Unlock() + mp.lk.RUnlock() buckets := map[address.Address]*statBucket{} diff --git a/chain/messagepool/repub.go b/chain/messagepool/repub.go index 9a1e19b6072..70467643914 100644 --- a/chain/messagepool/repub.go +++ b/chain/messagepool/repub.go @@ -20,17 +20,18 @@ const repubMsgLimit = 30 var RepublishBatchDelay = 100 * time.Millisecond func (mp *MessagePool) republishPendingMessages(ctx context.Context) error { - mp.curTsLk.Lock() + mp.curTsLk.RLock() ts := mp.curTs baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts) + mp.curTsLk.RUnlock() if err != nil { - mp.curTsLk.Unlock() return xerrors.Errorf("computing basefee: %w", err) } baseFeeLowerBound := getBaseFeeLowerBound(baseFee, baseFeeLowerBoundFactor) pending := make(map[address.Address]map[uint64]*types.SignedMessage) + mp.curTsLk.Lock() mp.lk.Lock() mp.republished = nil // clear this to avoid races triggering an early republish mp.forEachLocal(ctx, func(ctx context.Context, actor address.Address) { diff --git a/chain/messagepool/selection.go b/chain/messagepool/selection.go index bd504412863..d510cf9508d 100644 --- a/chain/messagepool/selection.go +++ b/chain/messagepool/selection.go @@ -43,6 +43,7 @@ func (mp *MessagePool) SelectMessages(ctx context.Context, ts *types.TipSet, tq mp.curTsLk.Lock() defer mp.curTsLk.Unlock() + //TODO confirm if we can switch to RLock here for performance mp.lk.Lock() defer mp.lk.Unlock()