-
-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FIXED] (2.11) Replicated consumer skipped redeliveries #6566
Conversation
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
@@ -1384,11 +1390,6 @@ func (o *consumer) setLeader(isLeader bool) { | |||
o.lss = nil | |||
} | |||
|
|||
// Update the group on the our starting sequence if we are starting but we skipped some in the stream. | |||
if o.dseq == 1 && o.sseq > 1 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is replaced by the change above ^.
@@ -4821,16 +4823,16 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64, | |||
// Update delivered first. | |||
o.updateDelivered(dseq, seq, dc, ts) | |||
|
|||
// Send message. | |||
o.outq.send(pmsg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could send a message to the client before o.trackPending
captures the sequence below, but it likely doesn't happen in practice. Moving it to be below o.trackPending
doesn't hurt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Replicated consumers could skip redeliveries of non-acked messages when:
o.sseq
back down to agreed state, skipping redelivery of messages below. The following code caused that issue:Other included commits fix various code/tests that depended on above lines of code:
TestJetStreamSuperClusterConsumerDeliverNewBug
started flaking. It would never guarantee that all replicas agreed on the same consumer state.o.store.SetStarting(o.sseq - 1)
always being called, without being based on replicated state. Which meant that when the storage directory was purged, this state would not reliably come back. Nowo.updateSkipped(o.sseq)
is called for the very first time of becoming leader. Ensuring all replicas agree on the initial starting sequence, skipped ahead or not. It has also been changed to not only skip aheado.sseq
, but also reflect this in the underlying stored state.TestJetStreamBasicDeliverSubject
started failing due to a misplacedreturn
ino.selectStartingSeqNo()
. The return is now removed.TestJetStreamClusterConsumerDeliveredSyncReporting
had a small correctness issue, as skipping aheado.sseq
would not be reflected in the underlying store. Now before the first fetch we expect stream/consumer sequence 0, after that fetch we expect stream/consumer sequence 1, and after the last fetch we expect a consumer sequence 1, and a skipped ahead stream sequence 11.Signed-off-by: Maurice van Veen github@mauricevanveen.com