Skip to content

Commit

Permalink
Merge pull request #2834 from nats-io/fix_backoff
Browse files Browse the repository at this point in the history
[FIXED] JetStream: BackOff redeliveries would always use first in list
  • Loading branch information
kozlovic authored Feb 1, 2022
2 parents 0d15872 + 30c431a commit b23ed77
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
2 changes: 1 addition & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3130,7 +3130,7 @@ func (o *consumer) checkPending() {
}
elapsed, deadline := now-p.Timestamp, ttl
if len(o.cfg.BackOff) > 0 && o.rdc != nil {
dc := int(o.rdc[p.Sequence])
dc := int(o.rdc[seq])
if dc >= len(o.cfg.BackOff) {
dc = len(o.cfg.BackOff) - 1
}
Expand Down
9 changes: 8 additions & 1 deletion server/jetstream_cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10345,16 +10345,23 @@ func TestJetStreamClusterRedeliverBackoffs(t *testing.T) {
_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Replicas: 2,
Subjects: []string{"foo"},
Subjects: []string{"foo", "bar"},
})
require_NoError(t, err)

// Produce some messages on bar so that when we create the consumer
// on "foo", we don't have a 1:1 between consumer/stream sequence.
for i := 0; i < 10; i++ {
js.Publish("bar", []byte("msg"))
}

// Test when BackOff is configured and AckWait and MaxDeliver are as well.
// Currently the BackOff will override AckWait, but we want MaxDeliver to be set to be at least len(BackOff)+1.
ccReq := &CreateConsumerRequest{
Stream: "TEST",
Config: ConsumerConfig{
Durable: "dlc",
FilterSubject: "foo",
DeliverSubject: "x",
AckPolicy: AckExplicit,
AckWait: 30 * time.Second,
Expand Down

0 comments on commit b23ed77

Please sign in to comment.