Skip to content

Commit

Permalink
[FIXED] Consumer slowdown when redelivering deleted message
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
MauriceVanVeen authored and neilalexander committed Jan 22, 2025
1 parent 7fa1182 commit a441c82
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 10 deletions.
24 changes: 14 additions & 10 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3603,17 +3603,21 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
}
continue
}
if seq > 0 {
pmsg := getJSPubMsgFromPool()
sm, err := o.mset.store.LoadMsg(seq, &pmsg.StoreMsg)
if sm == nil || err != nil {
pmsg.returnToPool()
pmsg, dc = nil, 0
// Adjust back deliver count.
o.decDeliveryCount(seq)
}
return pmsg, dc, err
pmsg := getJSPubMsgFromPool()
sm, err := o.mset.store.LoadMsg(seq, &pmsg.StoreMsg)
if sm == nil || err != nil {
pmsg.returnToPool()
pmsg, dc = nil, 0
// Adjust back deliver count.
o.decDeliveryCount(seq)
}
// Message was scheduled for redelivery but was removed in the meantime.
if err == ErrStoreMsgNotFound || err == errDeletedMsg {
delete(o.pending, seq)
delete(o.rdc, seq)
continue
}
return pmsg, dc, err
}
}

Expand Down
63 changes: 63 additions & 0 deletions server/jetstream_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1619,3 +1619,66 @@ func TestJetStreamConsumerSwitchLeaderDuringInflightAck(t *testing.T) {
require_NoError(t, err)
require_Len(t, len(msgs), 1)
}

func TestJetStreamConsumerMessageDeletedDuringRedelivery(t *testing.T) {
storageTypes := []nats.StorageType{nats.MemoryStorage, nats.FileStorage}
for _, storageType := range storageTypes {
t.Run(storageType.String(), func(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Storage: storageType,
})
require_NoError(t, err)

for i := 0; i < 3; i++ {
_, err = js.Publish("foo", nil)
require_NoError(t, err)
}

sub, err := js.PullSubscribe(
"foo",
"CONSUMER",
nats.ManualAck(),
nats.AckExplicit(),
nats.AckWait(time.Second),
)
require_NoError(t, err)

acc, err := s.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
o := mset.lookupConsumer("CONSUMER")
require_NotNil(t, o)

msgs, err := sub.Fetch(3)
require_NoError(t, err)
require_Len(t, len(msgs), 3)

err = js.DeleteMsg("TEST", 2)
require_NoError(t, err)

o.mu.Lock()
defer o.mu.Unlock()
for seq := range o.rdc {
o.removeFromRedeliverQueue(seq)
}

o.pending = make(map[uint64]*Pending)
o.pending[2] = &Pending{}
o.addToRedeliverQueue(2)

_, _, err = o.getNextMsg()
require_Error(t, err, ErrStoreEOF)
require_Len(t, len(o.pending), 0)
require_Len(t, len(o.rdc), 0)
})
}
}

0 comments on commit a441c82

Please sign in to comment.