Skip to content

Commit

Permalink
Fix WQ retention issue due to checkInterestState / `isInterestReten…
Browse files Browse the repository at this point in the history
…tion`

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
neilalexander committed Jul 24, 2024
1 parent 571d97a commit 2826fc0
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 1 deletion.
47 changes: 47 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2446,3 +2446,50 @@ func TestJetStreamClusterConsumerLeak(t *testing.T) {
t.Fatalf("Extra memory usage too high: %v", after.HeapInuse-before.HeapInuse)
}
}

func TestJetStreamClusterWQRoundRobinSubjectRetention(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "wq_stream",
Subjects: []string{"something.>"},
Storage: nats.FileStorage,
Retention: nats.WorkQueuePolicy,
Replicas: 3,
})
require_NoError(t, err)

for i := 0; i < 100; i++ {
n := (i % 5) + 1
_, err := js.Publish(fmt.Sprintf("something.%d", n), nil)
require_NoError(t, err)
}

sub, err := js.PullSubscribe(
"something.5",
"wq_consumer_5",
nats.BindStream("wq_stream"),
nats.ConsumerReplicas(3),
)
require_NoError(t, err)

for {
msgs, _ := sub.Fetch(5)
if len(msgs) == 0 {
break
}
for _, msg := range msgs {
require_NoError(t, msg.AckSync())
}
}

si, err := js.StreamInfo("wq_stream")
require_NoError(t, err)
require_Equal(t, si.State.Msgs, 80)
require_Equal(t, si.State.NumDeleted, 20)
require_Equal(t, si.State.NumSubjects, 4)
}
2 changes: 1 addition & 1 deletion server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5633,7 +5633,7 @@ func (mset *stream) checkInterestState() {
func (mset *stream) isInterestRetention() bool {
mset.mu.RLock()
defer mset.mu.RUnlock()
return mset.cfg.Retention != LimitsPolicy
return mset.cfg.Retention == InterestPolicy
}

// NumConsumers reports on number of active consumers for this stream.
Expand Down

0 comments on commit 2826fc0

Please sign in to comment.