Skip to content

Commit d3e9e79

Browse files
Fix consumer DeliverLast policy
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
1 parent 5342ba9 commit d3e9e79

File tree

2 files changed

+8
-8
lines changed

2 files changed

+8
-8
lines changed

server/consumer.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -5293,13 +5293,13 @@ func (o *consumer) selectStartingSeqNo() {
52935293
} else if o.cfg.DeliverPolicy == DeliverLast {
52945294
if o.subjf == nil {
52955295
o.sseq = state.LastSeq
5296-
return
5297-
}
5298-
// If we are partitioned here this will be properly set when we become leader.
5299-
for _, filter := range o.subjf {
5300-
ss := o.mset.store.FilteredState(1, filter.subject)
5301-
if ss.Last > o.sseq {
5302-
o.sseq = ss.Last
5296+
} else {
5297+
// If we are partitioned here this will be properly set when we become leader.
5298+
for _, filter := range o.subjf {
5299+
ss := o.mset.store.FilteredState(1, filter.subject)
5300+
if ss.Last > o.sseq {
5301+
o.sseq = ss.Last
5302+
}
53035303
}
53045304
}
53055305
} else if o.cfg.DeliverPolicy == DeliverLastPerSubject {

server/jetstream_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1390,7 +1390,7 @@ func TestJetStreamBasicDeliverSubject(t *testing.T) {
13901390
}
13911391
// Check that is the last msg we sent though.
13921392
if mseq, _ := strconv.Atoi(string(m.Data)); mseq != 200 {
1393-
t.Fatalf("Expected messag sequence to be 200, but got %d", mseq)
1393+
t.Fatalf("Expected message sequence to be 200, but got %d", mseq)
13941394
}
13951395

13961396
checkSubEmpty()

0 commit comments

Comments
 (0)