Skip to content

Commit 3234b33

Browse files
[FIXED] (2.11) Replicated consumer skipped redeliveries (#6566)
Replicated consumers could skip redeliveries of non-acked messages when: - A message was delivered to the client, but there was no quorum on updating delivered state across replicas. Then a replicated ack came in, which would up the starting sequence, skipping redelivery of messages below. The following code caused that issue: ```go // Match leader logic on checking if ack is ahead of delivered. // This could happen on a cooperative takeover with high speed deliveries. if sseq > o.state.Delivered.Stream { o.state.Delivered.Stream = sseq + 1 } ``` - A message was delivered to the client, but there was no quorum on updating delivered state across replicas. Then the consumer leader steps down, and becomes leader again. It would not reset `o.sseq` back down to agreed state, skipping redelivery of messages below. The following code caused that issue: ```go // If o.sseq is greater don't update. Don't go backwards on o.sseq if leader. if !o.isLeader() || o.sseq <= state.Delivered.Stream { o.sseq = state.Delivered.Stream + 1 } ``` Other included commits fix various code/tests that depended on above lines of code: - `TestJetStreamSuperClusterConsumerDeliverNewBug` started flaking. It would never guarantee that all replicas agreed on the same consumer state. - The issue lied in `o.store.SetStarting(o.sseq - 1)` always being called, without being based on replicated state. Which meant that when the storage directory was purged, this state would not reliably come back. Now `o.updateSkipped(o.sseq)` is called for the very first time of becoming leader. Ensuring all replicas agree on the initial starting sequence, skipped ahead or not. It has also been changed to not only skip ahead `o.sseq`, but also reflect this in the underlying stored state. - The test has also been made stricter, not only checking the state on the consumer leader, but all replicas. And also checking both the in-memory state and the replicated state being exactly what they are supposed to be. - `TestJetStreamBasicDeliverSubject` started failing due to a misplaced `return` in `o.selectStartingSeqNo()`. The return is now removed. - `TestJetStreamClusterConsumerDeliveredSyncReporting` had a small correctness issue, as skipping ahead `o.sseq` would not be reflected in the underlying store. Now before the first fetch we expect stream/consumer sequence 0, after that fetch we expect stream/consumer sequence 1, and after the last fetch we expect a consumer sequence 1, and a skipped ahead stream sequence 11. Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
2 parents 89c66fb + 0971e09 commit 3234b33

9 files changed

+346
-106
lines changed

server/consumer.go

+27-24
Original file line numberDiff line numberDiff line change
@@ -1373,8 +1373,14 @@ func (o *consumer) setLeader(isLeader bool) {
13731373
o.rdq = nil
13741374
o.rdqi.Empty()
13751375

1376-
// Restore our saved state. During non-leader status we just update our underlying store.
1377-
o.readStoredState(lseq)
1376+
// Restore our saved state.
1377+
// During non-leader status we just update our underlying store when not clustered.
1378+
// If clustered we need to propose our initial (possibly skipped ahead) o.sseq to the group.
1379+
if o.node == nil || o.dseq > 1 || (o.store != nil && o.store.HasState()) {
1380+
o.readStoredState(lseq)
1381+
} else if o.node != nil && o.sseq >= 1 {
1382+
o.updateSkipped(o.sseq)
1383+
}
13781384

13791385
// Setup initial num pending.
13801386
o.streamNumPending()
@@ -1384,11 +1390,6 @@ func (o *consumer) setLeader(isLeader bool) {
13841390
o.lss = nil
13851391
}
13861392

1387-
// Update the group on the our starting sequence if we are starting but we skipped some in the stream.
1388-
if o.dseq == 1 && o.sseq > 1 {
1389-
o.updateSkipped(o.sseq)
1390-
}
1391-
13921393
// Do info sub.
13931394
if o.infoSub == nil && jsa != nil {
13941395
isubj := fmt.Sprintf(clusterConsumerInfoT, jsa.acc(), stream, o.name)
@@ -2811,10 +2812,7 @@ func (o *consumer) applyState(state *ConsumerState) {
28112812
return
28122813
}
28132814

2814-
// If o.sseq is greater don't update. Don't go backwards on o.sseq if leader.
2815-
if !o.isLeader() || o.sseq <= state.Delivered.Stream {
2816-
o.sseq = state.Delivered.Stream + 1
2817-
}
2815+
o.sseq = state.Delivered.Stream + 1
28182816
o.dseq = state.Delivered.Consumer + 1
28192817
o.adflr = state.AckFloor.Consumer
28202818
o.asflr = state.AckFloor.Stream
@@ -2972,9 +2970,13 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
29722970
}
29732971
// If we are the leader we could have o.sseq that is skipped ahead.
29742972
// To maintain consistency in reporting (e.g. jsz) we always take the state for our delivered/ackfloor stream sequence.
2975-
info.Delivered.Consumer, info.Delivered.Stream = state.Delivered.Consumer, state.Delivered.Stream
2973+
// Only use skipped ahead o.sseq if we're a new consumer and have not yet replicated this state yet.
2974+
leader := o.isLeader()
2975+
if !leader || o.store.HasState() {
2976+
info.Delivered.Consumer, info.Delivered.Stream = state.Delivered.Consumer, state.Delivered.Stream
2977+
}
29762978
info.AckFloor.Consumer, info.AckFloor.Stream = state.AckFloor.Consumer, state.AckFloor.Stream
2977-
if !o.isLeader() {
2979+
if !leader {
29782980
info.NumAckPending = len(state.Pending)
29792981
info.NumRedelivered = len(state.Redelivered)
29802982
}
@@ -4821,16 +4823,16 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64,
48214823
// Update delivered first.
48224824
o.updateDelivered(dseq, seq, dc, ts)
48234825

4824-
// Send message.
4825-
o.outq.send(pmsg)
4826-
48274826
if ap == AckExplicit || ap == AckAll {
48284827
o.trackPending(seq, dseq)
48294828
} else if ap == AckNone {
48304829
o.adflr = dseq
48314830
o.asflr = seq
48324831
}
48334832

4833+
// Send message.
4834+
o.outq.send(pmsg)
4835+
48344836
// Flow control.
48354837
if o.maxpb > 0 && o.needFlowControl(psz) {
48364838
o.sendFlowControl()
@@ -5291,13 +5293,13 @@ func (o *consumer) selectStartingSeqNo() {
52915293
} else if o.cfg.DeliverPolicy == DeliverLast {
52925294
if o.subjf == nil {
52935295
o.sseq = state.LastSeq
5294-
return
5295-
}
5296-
// If we are partitioned here this will be properly set when we become leader.
5297-
for _, filter := range o.subjf {
5298-
ss := o.mset.store.FilteredState(1, filter.subject)
5299-
if ss.Last > o.sseq {
5300-
o.sseq = ss.Last
5296+
} else {
5297+
// If we are partitioned here this will be properly set when we become leader.
5298+
for _, filter := range o.subjf {
5299+
ss := o.mset.store.FilteredState(1, filter.subject)
5300+
if ss.Last > o.sseq {
5301+
o.sseq = ss.Last
5302+
}
53015303
}
53025304
}
53035305
} else if o.cfg.DeliverPolicy == DeliverLastPerSubject {
@@ -5397,7 +5399,8 @@ func (o *consumer) selectStartingSeqNo() {
53975399
// Set ack store floor to store-1
53985400
o.asflr = o.sseq - 1
53995401
// Set our starting sequence state.
5400-
if o.store != nil && o.sseq > 0 {
5402+
// But only if we're not clustered, if clustered we propose upon becoming leader.
5403+
if o.store != nil && o.sseq > 0 && o.cfg.replicas(&o.mset.cfg) == 1 {
54015404
o.store.SetStarting(o.sseq - 1)
54025405
}
54035406
}

server/filestore.go

+22-7
Original file line numberDiff line numberDiff line change
@@ -9884,9 +9884,30 @@ func (o *consumerFileStore) SetStarting(sseq uint64) error {
98849884
return o.writeState(buf)
98859885
}
98869886

9887+
// UpdateStarting updates our starting stream sequence.
9888+
func (o *consumerFileStore) UpdateStarting(sseq uint64) {
9889+
o.mu.Lock()
9890+
defer o.mu.Unlock()
9891+
9892+
if sseq > o.state.Delivered.Stream {
9893+
o.state.Delivered.Stream = sseq
9894+
// For AckNone just update delivered and ackfloor at the same time.
9895+
if o.cfg.AckPolicy == AckNone {
9896+
o.state.AckFloor.Stream = sseq
9897+
}
9898+
}
9899+
// Make sure we flush to disk.
9900+
o.kickFlusher()
9901+
}
9902+
98879903
// HasState returns if this store has a recorded state.
98889904
func (o *consumerFileStore) HasState() bool {
98899905
o.mu.Lock()
9906+
// We have a running state, or stored on disk but not yet initialized.
9907+
if o.state.Delivered.Consumer != 0 || o.state.Delivered.Stream != 0 {
9908+
o.mu.Unlock()
9909+
return true
9910+
}
98909911
_, err := os.Stat(o.ifn)
98919912
o.mu.Unlock()
98929913
return err == nil
@@ -9939,7 +9960,7 @@ func (o *consumerFileStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64) err
99399960
if o.state.Redelivered == nil {
99409961
o.state.Redelivered = make(map[uint64]uint64)
99419962
}
9942-
// Only update if greater then what we already have.
9963+
// Only update if greater than what we already have.
99439964
if o.state.Redelivered[sseq] < dc-1 {
99449965
o.state.Redelivered[sseq] = dc - 1
99459966
}
@@ -9975,12 +9996,6 @@ func (o *consumerFileStore) UpdateAcks(dseq, sseq uint64) error {
99759996
return nil
99769997
}
99779998

9978-
// Match leader logic on checking if ack is ahead of delivered.
9979-
// This could happen on a cooperative takeover with high speed deliveries.
9980-
if sseq > o.state.Delivered.Stream {
9981-
o.state.Delivered.Stream = sseq + 1
9982-
}
9983-
99849999
if len(o.state.Pending) == 0 || o.state.Pending[sseq] == nil {
998510000
delete(o.state.Redelivered, sseq)
998610001
return ErrStoreMsgNotFound

server/jetstream_cluster.go

+7-5
Original file line numberDiff line numberDiff line change
@@ -5056,11 +5056,13 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
50565056
}
50575057
case updateSkipOp:
50585058
o.mu.Lock()
5059-
if !o.isLeader() {
5060-
var le = binary.LittleEndian
5061-
if sseq := le.Uint64(buf[1:]); sseq > o.sseq {
5062-
o.sseq = sseq
5063-
}
5059+
var le = binary.LittleEndian
5060+
sseq := le.Uint64(buf[1:])
5061+
if !o.isLeader() && sseq > o.sseq {
5062+
o.sseq = sseq
5063+
}
5064+
if o.store != nil {
5065+
o.store.UpdateStarting(sseq - 1)
50645066
}
50655067
o.mu.Unlock()
50665068
case addPendingRequest:

server/jetstream_cluster_1_test.go

+27-3
Original file line numberDiff line numberDiff line change
@@ -6093,24 +6093,48 @@ func TestJetStreamClusterConsumerDeliveredSyncReporting(t *testing.T) {
60936093
require_NoError(t, err)
60946094
}
60956095

6096+
opts := &JSzOptions{Accounts: true, Streams: true, Consumer: true}
6097+
for _, s := range c.servers {
6098+
jsz, err := s.Jsz(opts)
6099+
require_NoError(t, err)
6100+
ci := jsz.AccountDetails[0].Streams[0].Consumer[0]
6101+
require_Equal(t, ci.Delivered.Consumer, 0)
6102+
require_Equal(t, ci.Delivered.Stream, 0)
6103+
}
6104+
60966105
msgs, err := sub.Fetch(1)
60976106
require_NoError(t, err)
60986107
require_Equal(t, len(msgs), 1)
6108+
meta, err := msgs[0].Metadata()
6109+
require_NoError(t, err)
6110+
require_Equal(t, meta.Sequence.Consumer, 1)
6111+
require_Equal(t, meta.Sequence.Stream, 1)
6112+
6113+
// Allow some time for the state to propagate.
6114+
maxWait := 200 * time.Millisecond
6115+
time.Sleep(maxWait)
6116+
6117+
for _, s := range c.servers {
6118+
jsz, err := s.Jsz(opts)
6119+
require_NoError(t, err)
6120+
ci := jsz.AccountDetails[0].Streams[0].Consumer[0]
6121+
require_Equal(t, ci.Delivered.Consumer, 1)
6122+
require_Equal(t, ci.Delivered.Stream, 1)
6123+
}
60996124

61006125
// Now we want to make sure that jsz reporting will show the same
61016126
// state, including delivered, which will have skipped to the end.
61026127
// The skip can happen on several factors, but for here we just send
61036128
// another pull request which we will let fail.
6104-
_, err = sub.Fetch(1, nats.MaxWait(200*time.Millisecond))
6129+
_, err = sub.Fetch(1, nats.MaxWait(maxWait))
61056130
require_Error(t, err)
61066131

6107-
opts := &JSzOptions{Accounts: true, Streams: true, Consumer: true}
61086132
for _, s := range c.servers {
61096133
jsz, err := s.Jsz(opts)
61106134
require_NoError(t, err)
61116135
ci := jsz.AccountDetails[0].Streams[0].Consumer[0]
61126136
require_Equal(t, ci.Delivered.Consumer, 1)
6113-
require_Equal(t, ci.Delivered.Stream, 1)
6137+
require_Equal(t, ci.Delivered.Stream, 11)
61146138
}
61156139
}
61166140

0 commit comments

Comments
 (0)