From a441c8261b06342eff87c2833aa41ba5a2802da2 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Tue, 21 Jan 2025 09:19:52 +0100 Subject: [PATCH] [FIXED] Consumer slowdown when redelivering deleted message Signed-off-by: Maurice van Veen --- server/consumer.go | 24 +++++++----- server/jetstream_consumer_test.go | 63 +++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 10 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 82fd094adb..54033d2680 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -3603,17 +3603,21 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) { } continue } - if seq > 0 { - pmsg := getJSPubMsgFromPool() - sm, err := o.mset.store.LoadMsg(seq, &pmsg.StoreMsg) - if sm == nil || err != nil { - pmsg.returnToPool() - pmsg, dc = nil, 0 - // Adjust back deliver count. - o.decDeliveryCount(seq) - } - return pmsg, dc, err + pmsg := getJSPubMsgFromPool() + sm, err := o.mset.store.LoadMsg(seq, &pmsg.StoreMsg) + if sm == nil || err != nil { + pmsg.returnToPool() + pmsg, dc = nil, 0 + // Adjust back deliver count. + o.decDeliveryCount(seq) + } + // Message was scheduled for redelivery but was removed in the meantime. + if err == ErrStoreMsgNotFound || err == errDeletedMsg { + delete(o.pending, seq) + delete(o.rdc, seq) + continue } + return pmsg, dc, err } } diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index 1d325602f6..4f1b4ad310 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -1619,3 +1619,66 @@ func TestJetStreamConsumerSwitchLeaderDuringInflightAck(t *testing.T) { require_NoError(t, err) require_Len(t, len(msgs), 1) } + +func TestJetStreamConsumerMessageDeletedDuringRedelivery(t *testing.T) { + storageTypes := []nats.StorageType{nats.MemoryStorage, nats.FileStorage} + for _, storageType := range storageTypes { + t.Run(storageType.String(), func(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Storage: storageType, + }) + require_NoError(t, err) + + for i := 0; i < 3; i++ { + _, err = js.Publish("foo", nil) + require_NoError(t, err) + } + + sub, err := js.PullSubscribe( + "foo", + "CONSUMER", + nats.ManualAck(), + nats.AckExplicit(), + nats.AckWait(time.Second), + ) + require_NoError(t, err) + + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("CONSUMER") + require_NotNil(t, o) + + msgs, err := sub.Fetch(3) + require_NoError(t, err) + require_Len(t, len(msgs), 3) + + err = js.DeleteMsg("TEST", 2) + require_NoError(t, err) + + o.mu.Lock() + defer o.mu.Unlock() + for seq := range o.rdc { + o.removeFromRedeliverQueue(seq) + } + + o.pending = make(map[uint64]*Pending) + o.pending[2] = &Pending{} + o.addToRedeliverQueue(2) + + _, _, err = o.getNextMsg() + require_Error(t, err, ErrStoreEOF) + require_Len(t, len(o.pending), 0) + require_Len(t, len(o.rdc), 0) + }) + } +}