Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] (2.11) Replicated consumer skipped redeliveries #6566

Merged
merged 4 commits into from
Feb 21, 2025

Conversation

MauriceVanVeen
Copy link
Member

Replicated consumers could skip redeliveries of non-acked messages when:

  • A message was delivered to the client, but there was no quorum on updating delivered state across replicas. Then a replicated ack came in, which would up the starting sequence, skipping redelivery of messages below. The following code caused that issue:
// Match leader logic on checking if ack is ahead of delivered.
// This could happen on a cooperative takeover with high speed deliveries.
if sseq > o.state.Delivered.Stream {
	o.state.Delivered.Stream = sseq + 1
}
  • A message was delivered to the client, but there was no quorum on updating delivered state across replicas. Then the consumer leader steps down, and becomes leader again. It would not reset o.sseq back down to agreed state, skipping redelivery of messages below. The following code caused that issue:
// If o.sseq is greater don't update. Don't go backwards on o.sseq if leader.
if !o.isLeader() || o.sseq <= state.Delivered.Stream {
	o.sseq = state.Delivered.Stream + 1
}

Other included commits fix various code/tests that depended on above lines of code:

  • TestJetStreamSuperClusterConsumerDeliverNewBug started flaking. It would never guarantee that all replicas agreed on the same consumer state.
    • The issue lied in o.store.SetStarting(o.sseq - 1) always being called, without being based on replicated state. Which meant that when the storage directory was purged, this state would not reliably come back. Now o.updateSkipped(o.sseq) is called for the very first time of becoming leader. Ensuring all replicas agree on the initial starting sequence, skipped ahead or not. It has also been changed to not only skip ahead o.sseq, but also reflect this in the underlying stored state.
    • The test has also been made stricter, not only checking the state on the consumer leader, but all replicas. And also checking both the in-memory state and the replicated state being exactly what they are supposed to be.
  • TestJetStreamBasicDeliverSubject started failing due to a misplaced return in o.selectStartingSeqNo(). The return is now removed.
  • TestJetStreamClusterConsumerDeliveredSyncReporting had a small correctness issue, as skipping ahead o.sseq would not be reflected in the underlying store. Now before the first fetch we expect stream/consumer sequence 0, after that fetch we expect stream/consumer sequence 1, and after the last fetch we expect a consumer sequence 1, and a skipped ahead stream sequence 11.

Signed-off-by: Maurice van Veen github@mauricevanveen.com

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
@MauriceVanVeen MauriceVanVeen requested a review from a team as a code owner February 21, 2025 15:31
@@ -1384,11 +1390,6 @@ func (o *consumer) setLeader(isLeader bool) {
o.lss = nil
}

// Update the group on the our starting sequence if we are starting but we skipped some in the stream.
if o.dseq == 1 && o.sseq > 1 {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is replaced by the change above ^.

@@ -4821,16 +4823,16 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64,
// Update delivered first.
o.updateDelivered(dseq, seq, dc, ts)

// Send message.
o.outq.send(pmsg)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could send a message to the client before o.trackPending captures the sequence below, but it likely doesn't happen in practice. Moving it to be below o.trackPending doesn't hurt.

Copy link
Member

@derekcollison derekcollison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@derekcollison derekcollison merged commit 3234b33 into main Feb 21, 2025
5 checks passed
@derekcollison derekcollison deleted the maurice/skipped-redeliveries branch February 21, 2025 16:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants