From 6931751de2ce3bcb614fe353b8b9e1970144e24a Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Mon, 24 Feb 2025 16:05:30 +0100 Subject: [PATCH 1/2] [FIXED] Zero delivery count for message Signed-off-by: Maurice van Veen --- server/consumer.go | 20 ++++-------- server/jetstream_consumer_test.go | 53 +++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 13 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 3aa91e4f83c..467ce227b55 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -3917,7 +3917,10 @@ func (o *consumer) deliveryCount(seq uint64) uint64 { if o.rdc == nil { return 1 } - return o.rdc[seq] + if dc := o.rdc[seq]; dc >= 1 { + return dc + } + return 1 } // Increase the delivery count for this message. @@ -4231,10 +4234,7 @@ func (o *consumer) checkAckFloor() { // Check if this message was pending. o.mu.RLock() p, isPending := o.pending[seq] - var rdc uint64 = 1 - if o.rdc != nil { - rdc = o.rdc[seq] - } + rdc := o.deliveryCount(seq) o.mu.RUnlock() // If it was pending for us, get rid of it. if isPending { @@ -4252,10 +4252,7 @@ func (o *consumer) checkAckFloor() { if p != nil { dseq = p.Sequence } - var rdc uint64 = 1 - if o.rdc != nil { - rdc = o.rdc[seq] - } + rdc := o.deliveryCount(seq) toTerm = append(toTerm, seq, dseq, rdc) } } @@ -5861,10 +5858,7 @@ func (o *consumer) decStreamPending(sseq uint64, subj string) { // Check if this message was pending. p, wasPending := o.pending[sseq] - var rdc uint64 = 1 - if o.rdc != nil { - rdc = o.rdc[sseq] - } + rdc := o.deliveryCount(sseq) o.mu.Unlock() diff --git a/server/jetstream_consumer_test.go b/server/jetstream_consumer_test.go index f32b9167eb7..72f1139d81a 100644 --- a/server/jetstream_consumer_test.go +++ b/server/jetstream_consumer_test.go @@ -2714,3 +2714,56 @@ func TestJetStreamConsumerMessageDeletedDuringRedelivery(t *testing.T) { }) } } + +func TestJetStreamConsumerDeliveryCount(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + require_NoError(t, err) + + for i := 0; i < 2; i++ { + _, err = js.Publish("foo", nil) + require_NoError(t, err) + } + + sub, err := js.PullSubscribe( + "foo", + "CONSUMER", + nats.ManualAck(), + nats.AckExplicit(), + nats.AckWait(time.Second), + nats.MaxDeliver(1), + ) + require_NoError(t, err) + + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("CONSUMER") + require_NotNil(t, o) + + msgs, err := sub.Fetch(2) + require_NoError(t, err) + require_Len(t, len(msgs), 2) + require_NoError(t, msgs[1].Nak()) + + require_Equal(t, o.deliveryCount(1), 1) + require_Equal(t, o.deliveryCount(2), 1) + + // max deliver 1 so this will fail + _, err = sub.Fetch(1, nats.MaxWait(250*time.Millisecond)) + require_Error(t, err) + + // This would previously report delivery count 0, because o.rdc!=nil + require_Equal(t, o.deliveryCount(1), 1) + require_Equal(t, o.deliveryCount(2), 1) + +} From 6caa8580e086ae6aaae7f05e72a1c7f9fd3d0bd1 Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Mon, 24 Feb 2025 16:20:38 +0100 Subject: [PATCH 2/2] [FIXED] Preserve max delivered messages Signed-off-by: Maurice van Veen --- server/consumer.go | 24 +++++-- server/jetstream_cluster_1_test.go | 2 +- server/jetstream_cluster_3_test.go | 2 +- server/jetstream_test.go | 105 +++++++++++++++++++++++++++++ 4 files changed, 125 insertions(+), 8 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 467ce227b55..0937b11b822 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -1985,11 +1985,16 @@ func (o *consumer) hasMaxDeliveries(seq uint64) bool { if o.maxp > 0 && len(o.pending) >= o.maxp { o.signalNewMessages() } - // Cleanup our tracking. - delete(o.pending, seq) - if o.rdc != nil { - delete(o.rdc, seq) + // Make sure to remove from pending. + if p, ok := o.pending[seq]; ok && p != nil { + delete(o.pending, seq) + o.updateDelivered(p.Sequence, seq, dc, p.Timestamp) + } + // Ensure redelivered state is set, if not already. + if o.rdc == nil { + o.rdc = make(map[uint64]uint64) } + o.rdc[seq] = dc return true } return false @@ -3264,6 +3269,7 @@ func (o *consumer) needAck(sseq uint64, subj string) bool { var needAck bool var asflr, osseq uint64 var pending map[uint64]*Pending + var rdc map[uint64]uint64 o.mu.RLock() defer o.mu.RUnlock() @@ -3288,7 +3294,7 @@ func (o *consumer) needAck(sseq uint64, subj string) bool { } if o.isLeader() { asflr, osseq = o.asflr, o.sseq - pending = o.pending + pending, rdc = o.pending, o.rdc } else { if o.store == nil { return false @@ -3299,7 +3305,7 @@ func (o *consumer) needAck(sseq uint64, subj string) bool { return sseq > o.asflr && !o.isFiltered() } // If loading state as here, the osseq is +1. - asflr, osseq, pending = state.AckFloor.Stream, state.Delivered.Stream+1, state.Pending + asflr, osseq, pending, rdc = state.AckFloor.Stream, state.Delivered.Stream+1, state.Pending, state.Redelivered } switch o.cfg.AckPolicy { @@ -3315,6 +3321,12 @@ func (o *consumer) needAck(sseq uint64, subj string) bool { } } + // Finally check if redelivery of this message is tracked. + // If the message is not pending, it should be preserved if it reached max delivery. + if !needAck { + _, needAck = rdc[sseq] + } + return needAck } diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index f3f1d40b46e..eff52af7bca 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -6624,7 +6624,7 @@ func TestJetStreamClusterMaxDeliveriesOnInterestStreams(t *testing.T) { require_Equal(t, ci.AckFloor.Consumer, 1) require_Equal(t, ci.AckFloor.Stream, 1) require_Equal(t, ci.NumAckPending, 0) - require_Equal(t, ci.NumRedelivered, 0) + require_Equal(t, ci.NumRedelivered, 1) require_Equal(t, ci.NumPending, 0) } } diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 14a83fa08df..807fc7f63da 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -5426,7 +5426,7 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) { require_Equal(t, a.AckFloor.Stream, 10) } require_Equal(t, a.NumPending, 40) - require_Equal(t, a.NumRedelivered, 0) + require_Equal(t, a.NumRedelivered, 10) a.Cluster, b.Cluster = nil, nil a.Delivered.Last, b.Delivered.Last = nil, nil if !reflect.DeepEqual(a, b) { diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 96af2972bdf..0af967b1d9c 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -25542,3 +25542,108 @@ func TestJetStreamSubjectDeleteMarkersAfterPurgeNoMarkers(t *testing.T) { }) } } + +// https://github.com/nats-io/nats-server/issues/6538 +func TestJetStreamInterestMaxDeliveryReached(t *testing.T) { + maxWait := 250 * time.Millisecond + for _, useNak := range []bool{true, false} { + for _, test := range []struct { + title string + action func(s *Server, sub *nats.Subscription) + }{ + { + title: "fetch", + action: func(s *Server, sub *nats.Subscription) { + time.Sleep(time.Second) + + // max deliver 1 so this will fail + _, err := sub.Fetch(1, nats.MaxWait(maxWait)) + require_Error(t, err) + }, + }, + { + title: "expire pending", + action: func(s *Server, sub *nats.Subscription) { + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("consumer") + require_NotNil(t, o) + + o.mu.Lock() + o.forceExpirePending() + o.mu.Unlock() + }, + }, + } { + title := fmt.Sprintf("nak/%s", test.title) + if !useNak { + title = fmt.Sprintf("no-%s", title) + } + t.Run(title, func(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Storage: nats.FileStorage, + Subjects: []string{"test"}, + Replicas: 1, + Retention: nats.InterestPolicy, + }) + require_NoError(t, err) + + sub, err := js.PullSubscribe("test", "consumer", nats.AckWait(time.Second), nats.MaxDeliver(1)) + require_NoError(t, err) + + _, err = nc.Request("test", []byte("hello"), maxWait) + require_NoError(t, err) + + nfo, err := js.StreamInfo("TEST") + require_NoError(t, err) + require_Equal(t, nfo.State.Msgs, uint64(1)) + + msg, err := sub.Fetch(1, nats.MaxWait(maxWait)) + require_NoError(t, err) + require_Len(t, 1, len(msg)) + if useNak { + require_NoError(t, msg[0].Nak()) + } + + cnfo, err := js.ConsumerInfo("TEST", "consumer") + require_NoError(t, err) + require_Equal(t, cnfo.NumAckPending, 1) + + test.action(s, sub) + + // max deliver 1 so this will fail + _, err = sub.Fetch(1, nats.MaxWait(maxWait)) + require_Error(t, err) + + cnfo, err = js.ConsumerInfo("TEST", "consumer") + require_NoError(t, err) + require_Equal(t, cnfo.NumAckPending, 0) + + nfo, err = js.StreamInfo("TEST") + require_NoError(t, err) + require_Equal(t, nfo.State.Msgs, uint64(1)) + + sub2, err := js.PullSubscribe("test", "consumer2") + require_NoError(t, err) + + msg, err = sub2.Fetch(1) + require_NoError(t, err) + require_Len(t, 1, len(msg)) + require_NoError(t, msg[0].AckSync()) + + nfo, err = js.StreamInfo("TEST") + require_NoError(t, err) + require_Equal(t, nfo.State.Msgs, uint64(1)) + }) + } + } +}