Skip to content

Commit

Permalink
[FIXED] Max delivered stuck consumers and mis-reported redelivered st…
Browse files Browse the repository at this point in the history
…ats. (#5995)

Process max delivery boundaries when expiring vs always putting back on
the redelivered queue.

Previously we would always re-queue and do this on getNextMsg() which
worked well for push consumers, but with pull based consumers would
require a new pull request to be present to process any redelivered and
this could report redelivered status incorrectly on max deliver of 1.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison authored Oct 12, 2024
2 parents 702f5d0 + bd3be75 commit 2c71b59
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 14 deletions.
47 changes: 45 additions & 2 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,9 @@ var (
AckNext = []byte("+NXT")
// Terminate delivery of the message.
AckTerm = []byte("+TERM")
)

const (
// reasons to supply when terminating messages using limits
ackTermLimitsReason = "Message deleted by stream limits"
ackTermUnackedLimitsReason = "Unacknowledged message was deleted"
Expand Down Expand Up @@ -1344,6 +1346,9 @@ func (o *consumer) setLeader(isLeader bool) {
pullMode := o.isPullMode()
o.mu.Unlock()

// Check if there are any pending we might need to clean up etc.
o.checkPending()

// Snapshot initial info.
o.infoWithSnap(true)

Expand Down Expand Up @@ -1774,12 +1779,39 @@ func (o *consumer) config() ConsumerConfig {
return o.cfg
}

// Check if we have hit max deliveries. If so do notification and cleanup.
// Return whether or not the max was hit.
// Lock should be held.
func (o *consumer) hasMaxDeliveries(seq uint64) bool {
if o.maxdc == 0 {
return false
}
if dc := o.deliveryCount(seq); dc >= o.maxdc {
// We have hit our max deliveries for this sequence.
// Only send the advisory once.
if dc == o.maxdc {
o.notifyDeliveryExceeded(seq, dc)
}
// Determine if we signal to start flow of messages again.
if o.maxp > 0 && len(o.pending) >= o.maxp {
o.signalNewMessages()
}
// Cleanup our tracking.
delete(o.pending, seq)
if o.rdc != nil {
delete(o.rdc, seq)
}
return true
}
return false
}

// Force expiration of all pending.
// Lock should be held.
func (o *consumer) forceExpirePending() {
var expired []uint64
for seq := range o.pending {
if !o.onRedeliverQueue(seq) {
if !o.onRedeliverQueue(seq) && !o.hasMaxDeliveries(seq) {
expired = append(expired, seq)
}
}
Expand Down Expand Up @@ -3531,6 +3563,14 @@ func trackDownAccountAndInterest(acc *Account, interest string) (*Account, strin
return acc, interest
}

// Return current delivery count for a given sequence.
func (o *consumer) deliveryCount(seq uint64) uint64 {
if o.rdc == nil {
return 1
}
return o.rdc[seq]
}

// Increase the delivery count for this message.
// ONLY used on redelivery semantics.
// Lock should be held.
Expand Down Expand Up @@ -4754,7 +4794,10 @@ func (o *consumer) checkPending() {
}
}
if elapsed >= deadline {
if !o.onRedeliverQueue(seq) {
// We will check if we have hit our max deliveries. Previously we would do this on getNextMsg() which
// worked well for push consumers, but with pull based consumers would require a new pull request to be
// present to process and redelivered could be reported incorrectly.
if !o.onRedeliverQueue(seq) && !o.hasMaxDeliveries(seq) {
expired = append(expired, seq)
}
} else if deadline-elapsed < next {
Expand Down
58 changes: 58 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6442,6 +6442,64 @@ func TestJetStreamClusterMetaStepdownFromNonSysAccount(t *testing.T) {
require_NotEqual(t, ml, c.leader())
}

func TestJetStreamClusterMaxDeliveriesOnInterestStreams(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

// Client based API
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo.*"},
Retention: nats.InterestPolicy,
})
require_NoError(t, err)

sub1, err := js.PullSubscribe("foo.*", "c1", nats.AckWait(10*time.Millisecond), nats.MaxDeliver(1))
require_NoError(t, err)

sub2, err := js.PullSubscribe("foo.*", "c2", nats.AckWait(10*time.Millisecond), nats.MaxDeliver(1))
require_NoError(t, err)

js.Publish("foo.bar", []byte("HELLO"))

si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 1)

msgs, err := sub1.Fetch(1)
require_NoError(t, err)
require_Equal(t, len(msgs), 1)

msgs, err = sub2.Fetch(1)
require_NoError(t, err)
require_Equal(t, len(msgs), 1)

// Wait for redelivery to both consumers which will do nothing.
time.Sleep(250 * time.Millisecond)

// Now check that stream and consumer infos are correct.
si, err = js.StreamInfo("TEST")
require_NoError(t, err)
// Messages that are skipped due to max deliveries should NOT remove messages.
require_Equal(t, si.State.Msgs, 1)
require_Equal(t, si.State.Consumers, 2)

for _, cname := range []string{"c1", "c2"} {
ci, err := js.ConsumerInfo("TEST", cname)
require_NoError(t, err)
require_Equal(t, ci.Delivered.Consumer, 1)
require_Equal(t, ci.Delivered.Stream, 1)
require_Equal(t, ci.AckFloor.Consumer, 1)
require_Equal(t, ci.AckFloor.Stream, 1)
require_Equal(t, ci.NumAckPending, 0)
require_Equal(t, ci.NumRedelivered, 0)
require_Equal(t, ci.NumPending, 0)
}
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down
34 changes: 22 additions & 12 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5422,17 +5422,19 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) {
}

// File based.
_, err = js.Subscribe("foo",
func(msg *nats.Msg) {},
nats.Durable("file"),
sub, err := js.PullSubscribe("foo", "file",
nats.ManualAck(),
nats.MaxDeliver(1),
nats.AckWait(time.Second),
nats.MaxAckPending(10),
)
require_NoError(t, err)

// Let first batch retry and expire.
msgs, err := sub.Fetch(10)
require_NoError(t, err)
require_Equal(t, len(msgs), 10)

// Let first batch expire.
time.Sleep(1200 * time.Millisecond)

cia, err := js.ConsumerInfo("TEST", "file")
Expand All @@ -5450,6 +5452,12 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) {
// Also last activity for delivered can be slightly off so nil out as well.
checkConsumerInfo := func(a, b *nats.ConsumerInfo) {
t.Helper()
require_Equal(t, a.Delivered.Consumer, 10)
require_Equal(t, a.Delivered.Stream, 10)
require_Equal(t, a.AckFloor.Consumer, 10)
require_Equal(t, a.AckFloor.Stream, 10)
require_Equal(t, a.NumPending, 40)
require_Equal(t, a.NumRedelivered, 0)
a.Cluster, b.Cluster = nil, nil
a.Delivered.Last, b.Delivered.Last = nil, nil
if !reflect.DeepEqual(a, b) {
Expand All @@ -5460,9 +5468,7 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) {
checkConsumerInfo(cia, cib)

// Memory based.
_, err = js.Subscribe("foo",
func(msg *nats.Msg) {},
nats.Durable("mem"),
sub, err = js.PullSubscribe("foo", "mem",
nats.ManualAck(),
nats.MaxDeliver(1),
nats.AckWait(time.Second),
Expand All @@ -5471,6 +5477,10 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) {
)
require_NoError(t, err)

msgs, err = sub.Fetch(10)
require_NoError(t, err)
require_Equal(t, len(msgs), 10)

// Let first batch retry and expire.
time.Sleep(1200 * time.Millisecond)

Expand All @@ -5488,9 +5498,7 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) {
checkConsumerInfo(cia, cib)

// Now file based but R1 and server restart.
_, err = js.Subscribe("foo",
func(msg *nats.Msg) {},
nats.Durable("r1"),
sub, err = js.PullSubscribe("foo", "r1",
nats.ManualAck(),
nats.MaxDeliver(1),
nats.AckWait(time.Second),
Expand All @@ -5499,6 +5507,10 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) {
)
require_NoError(t, err)

msgs, err = sub.Fetch(10)
require_NoError(t, err)
require_Equal(t, len(msgs), 10)

// Let first batch retry and expire.
time.Sleep(1200 * time.Millisecond)

Expand All @@ -5517,8 +5529,6 @@ func TestJetStreamClusterConsumerMaxDeliveryNumAckPendingBug(t *testing.T) {
// Created can skew a small bit due to server restart, this is expected.
now := time.Now()
cia.Created, cib.Created = now, now
// Clear any disagreement on push bound.
cia.PushBound, cib.PushBound = false, false
checkConsumerInfo(cia, cib)
}

Expand Down

0 comments on commit 2c71b59

Please sign in to comment.