Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: message pool: change locks to RWMutexes for performance #10561

Merged
merged 8 commits into from
Mar 30, 2023
18 changes: 9 additions & 9 deletions chain/messagepool/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ func (mp *MessagePool) CheckMessages(ctx context.Context, protos []*api.MessageP
// CheckPendingMessages performs a set of logical sets for all messages pending from a given actor
func (mp *MessagePool) CheckPendingMessages(ctx context.Context, from address.Address) ([][]api.MessageCheckStatus, error) {
var msgs []*types.Message
mp.lk.Lock()
snissn marked this conversation as resolved.
Show resolved Hide resolved
mp.lk.RLock()
mset, ok := mp.pending[from]
snissn marked this conversation as resolved.
Show resolved Hide resolved
if ok {
for _, sm := range mset.msgs {
snissn marked this conversation as resolved.
Show resolved Hide resolved
msgs = append(msgs, &sm.Message)
}
}
mp.lk.Unlock()
mp.lk.RUnlock()

if len(msgs) == 0 {
return nil, nil
Expand All @@ -58,7 +58,7 @@ func (mp *MessagePool) CheckReplaceMessages(ctx context.Context, replace []*type
msgMap := make(map[address.Address]map[uint64]*types.Message)
count := 0

mp.lk.Lock()
mp.lk.RLock()
for _, m := range replace {
mmap, ok := msgMap[m.From]
if !ok {
Expand All @@ -76,7 +76,7 @@ func (mp *MessagePool) CheckReplaceMessages(ctx context.Context, replace []*type
}
mmap[m.Nonce] = m
}
mp.lk.Unlock()
mp.lk.RUnlock()

msgs := make([]*types.Message, 0, count)
start := 0
Expand All @@ -103,9 +103,9 @@ func (mp *MessagePool) checkMessages(ctx context.Context, msgs []*types.Message,
if mp.api.IsLite() {
return nil, nil
}
mp.curTsLk.Lock()
mp.curTsLk.RLock()
curTs := mp.curTs
mp.curTsLk.Unlock()
mp.curTsLk.RUnlock()

epoch := curTs.Height() + 1

Expand Down Expand Up @@ -143,22 +143,22 @@ func (mp *MessagePool) checkMessages(ctx context.Context, msgs []*types.Message,

st, ok := state[m.From]
if !ok {
mp.lk.Lock()
mp.lk.RLock()
mset, ok := mp.pending[m.From]
if ok && !interned {
st = &actorState{nextNonce: mset.nextNonce, requiredFunds: mset.requiredFunds}
for _, m := range mset.msgs {
st.requiredFunds = new(stdbig.Int).Add(st.requiredFunds, m.Message.Value.Int)
}
state[m.From] = st
mp.lk.Unlock()
mp.lk.RUnlock()

check.OK = true
check.Hint = map[string]interface{}{
"nonce": st.nextNonce,
}
} else {
mp.lk.Unlock()
mp.lk.RUnlock()

stateNonce, err := mp.getStateNonce(ctx, m.From, curTs)
if err != nil {
Expand Down
67 changes: 44 additions & 23 deletions chain/messagepool/messagepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func init() {
}

type MessagePool struct {
lk sync.Mutex
lk sync.RWMutex

ds dtypes.MetadataDS

Expand All @@ -139,7 +139,7 @@ type MessagePool struct {

keyCache map[address.Address]address.Address

curTsLk sync.Mutex // DO NOT LOCK INSIDE lk
curTsLk sync.RWMutex // DO NOT LOCK INSIDE lk
curTs *types.TipSet

cfgLk sync.RWMutex
Expand Down Expand Up @@ -763,7 +763,28 @@ func (mp *MessagePool) Add(ctx context.Context, m *types.SignedMessage) error {
<-mp.addSema
}()

mp.curTsLk.RLock()
tmpCurTs := mp.curTs
mp.curTsLk.RUnlock()

//ensures computations are cached without holding lack
snissn marked this conversation as resolved.
Show resolved Hide resolved
_, _ = mp.api.GetActorAfter(m.Message.From, tmpCurTs)
_, _ = mp.getStateNonce(ctx, m.Message.From, tmpCurTs)

mp.curTsLk.Lock()
if tmpCurTs == mp.curTs {
//with the lock enabled, mp.curTs is the same Ts as we just had, so we know that our computations are cached
} else {
//curTs has been updated so we want to cache the new one:
tmpCurTs = mp.curTs
//we want to release the lock, cache the computations then grab it again
mp.curTsLk.Unlock()
_, _ = mp.api.GetActorAfter(m.Message.From, tmpCurTs)
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
_, _ = mp.getStateNonce(ctx, m.Message.From, tmpCurTs)
mp.curTsLk.Lock()
//now that we have the lock, we continue, we could do this as a loop forever, but that's bad to loop forever, and this was added as an optimization and it seems once is enough because the computation < block time
}

defer mp.curTsLk.Unlock()

_, err = mp.addTs(ctx, m, mp.curTs, false, false)
Expand Down Expand Up @@ -852,14 +873,14 @@ func (mp *MessagePool) addTs(ctx context.Context, m *types.SignedMessage, curTs
return false, xerrors.Errorf("minimum expected nonce is %d: %w", snonce, ErrNonceTooLow)
}

mp.lk.Lock()
defer mp.lk.Unlock()

senderAct, err := mp.api.GetActorAfter(m.Message.From, curTs)
if err != nil {
return false, xerrors.Errorf("failed to get sender actor: %w", err)
}

mp.lk.Lock()
snissn marked this conversation as resolved.
Show resolved Hide resolved
defer mp.lk.Unlock()

// This message can only be included in the _next_ epoch and beyond, hence the +1.
epoch := curTs.Height() + 1
nv := mp.api.StateNetworkVersion(ctx, epoch)
Expand Down Expand Up @@ -1001,19 +1022,19 @@ func (mp *MessagePool) addLocked(ctx context.Context, m *types.SignedMessage, st
}

func (mp *MessagePool) GetNonce(ctx context.Context, addr address.Address, _ types.TipSetKey) (uint64, error) {
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
mp.curTsLk.RLock()
defer mp.curTsLk.RUnlock()

mp.lk.Lock()
defer mp.lk.Unlock()
mp.lk.RLock()
defer mp.lk.RUnlock()

return mp.getNonceLocked(ctx, addr, mp.curTs)
snissn marked this conversation as resolved.
Show resolved Hide resolved
}

// GetActor should not be used. It is only here to satisfy interface mess caused by lite node handling
func (mp *MessagePool) GetActor(_ context.Context, addr address.Address, _ types.TipSetKey) (*types.Actor, error) {
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
mp.curTsLk.RLock()
defer mp.curTsLk.RUnlock()
return mp.api.GetActorAfter(addr, mp.curTs)
}

Expand Down Expand Up @@ -1164,11 +1185,11 @@ func (mp *MessagePool) remove(ctx context.Context, from address.Address, nonce u
}

func (mp *MessagePool) Pending(ctx context.Context) ([]*types.SignedMessage, *types.TipSet) {
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
mp.curTsLk.RLock()
defer mp.curTsLk.RUnlock()

mp.lk.Lock()
defer mp.lk.Unlock()
mp.lk.RLock()
defer mp.lk.RUnlock()

return mp.allPending(ctx)
}
Expand All @@ -1184,11 +1205,11 @@ func (mp *MessagePool) allPending(ctx context.Context) ([]*types.SignedMessage,
}

func (mp *MessagePool) PendingFor(ctx context.Context, a address.Address) ([]*types.SignedMessage, *types.TipSet) {
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
mp.curTsLk.RLock()
defer mp.curTsLk.RUnlock()

mp.lk.Lock()
defer mp.lk.Unlock()
mp.lk.RLock()
defer mp.lk.RUnlock()
return mp.pendingFor(ctx, a), mp.curTs
snissn marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down Expand Up @@ -1237,9 +1258,9 @@ func (mp *MessagePool) HeadChange(ctx context.Context, revert []*types.TipSet, a

maybeRepub := func(cid cid.Cid) {
if !repubTrigger {
mp.lk.Lock()
mp.lk.RLock()
_, republished := mp.republished[cid]
mp.lk.Unlock()
mp.lk.RUnlock()
if republished {
repubTrigger = true
}
Expand Down Expand Up @@ -1310,9 +1331,9 @@ func (mp *MessagePool) HeadChange(ctx context.Context, revert []*types.TipSet, a
}

if len(revert) > 0 && futureDebug {
mp.lk.Lock()
mp.lk.RLock()
msgs, ts := mp.allPending(ctx)
mp.lk.Unlock()
mp.lk.RUnlock()

buckets := map[address.Address]*statBucket{}

Expand Down
5 changes: 3 additions & 2 deletions chain/messagepool/repub.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@ const repubMsgLimit = 30
var RepublishBatchDelay = 100 * time.Millisecond

func (mp *MessagePool) republishPendingMessages(ctx context.Context) error {
mp.curTsLk.Lock()
mp.curTsLk.RLock()
ts := mp.curTs

baseFee, err := mp.api.ChainComputeBaseFee(context.TODO(), ts)
mp.curTsLk.RUnlock()
snissn marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
mp.curTsLk.Unlock()
return xerrors.Errorf("computing basefee: %w", err)
}
baseFeeLowerBound := getBaseFeeLowerBound(baseFee, baseFeeLowerBoundFactor)

pending := make(map[address.Address]map[uint64]*types.SignedMessage)
mp.curTsLk.Lock()
snissn marked this conversation as resolved.
Show resolved Hide resolved
mp.lk.Lock()
mp.republished = nil // clear this to avoid races triggering an early republish
snissn marked this conversation as resolved.
Show resolved Hide resolved
mp.forEachLocal(ctx, func(ctx context.Context, actor address.Address) {
Expand Down
1 change: 1 addition & 0 deletions chain/messagepool/selection.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func (mp *MessagePool) SelectMessages(ctx context.Context, ts *types.TipSet, tq
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()

//TODO confirm if we can switch to RLock here for performance
snissn marked this conversation as resolved.
Show resolved Hide resolved
mp.lk.Lock()
defer mp.lk.Unlock()

Expand Down