Skip to content

Commit

Permalink
[FIXED] JetStream: BackOff redeliveries would always use first in list
Browse files Browse the repository at this point in the history
If the consumer's sequence was not the same than the stream's sequence,
then the redelivery would always use the first duration from the
BackOff list.

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Feb 1, 2022
1 parent 0d15872 commit 30c431a
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
2 changes: 1 addition & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3130,7 +3130,7 @@ func (o *consumer) checkPending() {
}
elapsed, deadline := now-p.Timestamp, ttl
if len(o.cfg.BackOff) > 0 && o.rdc != nil {
dc := int(o.rdc[p.Sequence])
dc := int(o.rdc[seq])
if dc >= len(o.cfg.BackOff) {
dc = len(o.cfg.BackOff) - 1
}
Expand Down
9 changes: 8 additions & 1 deletion server/jetstream_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10345,16 +10345,23 @@ func TestJetStreamClusterRedeliverBackoffs(t *testing.T) {
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Replicas: 2,
Subjects: []string{"foo"},
Subjects: []string{"foo", "bar"},
})
require_NoError(t, err)

// Produce some messages on bar so that when we create the consumer
// on "foo", we don't have a 1:1 between consumer/stream sequence.
for i := 0; i < 10; i++ {
js.Publish("bar", []byte("msg"))
}

// Test when BackOff is configured and AckWait and MaxDeliver are as well.
// Currently the BackOff will override AckWait, but we want MaxDeliver to be set to be at least len(BackOff)+1.
ccReq := &CreateConsumerRequest{
Stream: "TEST",
Config: ConsumerConfig{
Durable: "dlc",
FilterSubject: "foo",
DeliverSubject: "x",
AckPolicy: AckExplicit,
AckWait: 30 * time.Second,
Expand Down

0 comments on commit 30c431a

Please sign in to comment.