diff --git a/server/consumer.go b/server/consumer.go index 54033d2680..4d66b23b97 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -3613,8 +3613,10 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) { } // Message was scheduled for redelivery but was removed in the meantime. if err == ErrStoreMsgNotFound || err == errDeletedMsg { - delete(o.pending, seq) - delete(o.rdc, seq) + // This is a race condition where the message is still in o.pending and + // scheduled for redelivery, but it has been removed from the stream. + // o.processTerm is called in a goroutine so could run after we get here. + // That will correct the pending state and delivery/ack floors, so just skip here. continue } return pmsg, dc, err diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index 4f1b4ad310..6248bbcec6 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -1665,20 +1665,45 @@ func TestJetStreamConsumerMessageDeletedDuringRedelivery(t *testing.T) { err = js.DeleteMsg("TEST", 2) require_NoError(t, err) + // Wait for mset.storeUpdates to call into o.decStreamPending which runs + // the o.processTerm goroutine, removing one message from pending. + checkFor(t, 2*time.Second, 100*time.Millisecond, func() error { + o.mu.RLock() + defer o.mu.RUnlock() + if len(o.pending) != 2 { + return fmt.Errorf("expected 2 pending, but got %d", len(o.pending)) + } + return nil + }) + + // Now empty the redelivery queue and reset the pending state. o.mu.Lock() - defer o.mu.Unlock() - for seq := range o.rdc { + for _, seq := range o.rdq { o.removeFromRedeliverQueue(seq) } - o.pending = make(map[uint64]*Pending) o.pending[2] = &Pending{} o.addToRedeliverQueue(2) + // Also reset delivery/ack floors to confirm they get corrected. + o.adflr, o.asflr = 0, 0 + o.dseq, o.sseq = 11, 11 + + // Getting the next message should skip seq 2, as that's deleted, but must not touch state. _, _, err = o.getNextMsg() + o.mu.Unlock() require_Error(t, err, ErrStoreEOF) + require_Len(t, len(o.pending), 1) + + // Simulate the o.processTerm goroutine running after a call to o.getNextMsg. + // Pending state and delivery/ack floors should be corrected. + o.processTerm(2, 2, 1, ackTermUnackedLimitsReason, _EMPTY_) + + o.mu.RLock() + defer o.mu.RUnlock() require_Len(t, len(o.pending), 0) - require_Len(t, len(o.rdc), 0) + require_Equal(t, o.adflr, 10) + require_Equal(t, o.asflr, 10) }) } }