Skip to content

Commit ddcc364

Browse files
[FIXED] Respect consumer's starting seq, even if in the future
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
1 parent 2352ad9 commit ddcc364

File tree

2 files changed

+54
-3
lines changed

2 files changed

+54
-3
lines changed

server/consumer.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -5364,12 +5364,18 @@ func (o *consumer) selectStartingSeqNo() {
53645364
o.sseq = o.cfg.OptStartSeq
53655365
}
53665366

5367-
if state.FirstSeq == 0 {
5367+
if state.FirstSeq == 0 && (o.cfg.Direct || o.cfg.OptStartSeq == 0) {
5368+
// If the stream is empty, deliver only new.
5369+
// But only if mirroring/sourcing, or start seq is unset, otherwise need to respect provided value.
53685370
o.sseq = 1
5371+
} else if o.sseq > state.LastSeq && (o.cfg.Direct || o.cfg.OptStartSeq == 0) {
5372+
// If selected sequence is in the future, clamp back down.
5373+
// But only if mirroring/sourcing, or start seq is unset, otherwise need to respect provided value.
5374+
o.sseq = state.LastSeq + 1
53695375
} else if o.sseq < state.FirstSeq {
5376+
// If the first sequence is further ahead than the starting sequence,
5377+
// there are no messages there anymore, so move the sequence up.
53705378
o.sseq = state.FirstSeq
5371-
} else if o.sseq > state.LastSeq {
5372-
o.sseq = state.LastSeq + 1
53735379
}
53745380
}
53755381

server/jetstream_cluster_1_test.go

+45
Original file line numberDiff line numberDiff line change
@@ -7323,6 +7323,51 @@ func TestJetStreamClusterConsumerHealthCheckMustNotRecreate(t *testing.T) {
73237323
checkNodeIsClosed(ca)
73247324
}
73257325

7326+
func TestJetStreamClusterRespectConsumerStartSeq(t *testing.T) {
7327+
c := createJetStreamClusterExplicit(t, "R3S", 3)
7328+
defer c.shutdown()
7329+
7330+
nc, js := jsClientConnect(t, c.randomServer())
7331+
defer nc.Close()
7332+
7333+
// Create replicated stream.
7334+
_, err := js.AddStream(&nats.StreamConfig{
7335+
Name: "TEST",
7336+
Subjects: []string{"foo"},
7337+
Replicas: 3,
7338+
})
7339+
require_NoError(t, err)
7340+
7341+
// We could have published messages into the stream that have not yet been applied on the follower.
7342+
// If we create a consumer with a starting sequence in the future, we must respect it.
7343+
ci, err := js.AddConsumer("TEST", &nats.ConsumerConfig{
7344+
DeliverPolicy: nats.DeliverByStartSequencePolicy,
7345+
OptStartSeq: 20,
7346+
})
7347+
require_NoError(t, err)
7348+
require_Equal(t, ci.Delivered.Stream, 19)
7349+
7350+
// Same thing if the first sequence is not 0.
7351+
err = js.PurgeStream("TEST", &nats.StreamPurgeRequest{Sequence: 10})
7352+
require_NoError(t, err)
7353+
7354+
ci, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
7355+
DeliverPolicy: nats.DeliverByStartSequencePolicy,
7356+
OptStartSeq: 20,
7357+
})
7358+
require_NoError(t, err)
7359+
require_Equal(t, ci.Delivered.Stream, 19)
7360+
7361+
// Only if we're requested to start at a sequence that's not available anymore
7362+
// can we safely move it up. That data is gone already, so can't do anything else.
7363+
ci, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
7364+
DeliverPolicy: nats.DeliverByStartSequencePolicy,
7365+
OptStartSeq: 5,
7366+
})
7367+
require_NoError(t, err)
7368+
require_Equal(t, ci.Delivered.Stream, 9)
7369+
}
7370+
73267371
//
73277372
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
73287373
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.

0 commit comments

Comments
 (0)