Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Make o.Update(state) consistent for file and mem & fixed int underflow #6147

Merged
merged 3 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4270,7 +4270,7 @@ func (o *consumer) checkAckFloor() {
// We will set it explicitly to 1 behind our current lowest in pending, or if
// pending is empty, to our current delivered -1.
const minOffThreshold = 50
if o.asflr < ss.FirstSeq-minOffThreshold {
if ss.FirstSeq >= minOffThreshold && o.asflr < ss.FirstSeq-minOffThreshold {
var psseq, pdseq uint64
for seq, p := range o.pending {
if psseq == 0 || seq < psseq {
Expand Down
17 changes: 9 additions & 8 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -9433,14 +9433,6 @@ func (o *consumerFileStore) UpdateConfig(cfg *ConsumerConfig) error {
}

func (o *consumerFileStore) Update(state *ConsumerState) error {
o.mu.Lock()
defer o.mu.Unlock()

// Check to see if this is an outdated update.
if state.Delivered.Consumer < o.state.Delivered.Consumer || state.AckFloor.Stream < o.state.AckFloor.Stream {
return nil
}

// Sanity checks.
if state.AckFloor.Consumer > state.Delivered.Consumer {
return fmt.Errorf("bad ack floor for consumer")
Expand Down Expand Up @@ -9468,6 +9460,15 @@ func (o *consumerFileStore) Update(state *ConsumerState) error {
}
}

// Replace our state.
o.mu.Lock()
defer o.mu.Unlock()

// Check to see if this is an outdated update.
if state.Delivered.Consumer < o.state.Delivered.Consumer || state.AckFloor.Stream < o.state.AckFloor.Stream {
return fmt.Errorf("old update ignored")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What effect upstream does this have by changing this to an error where as before it was ignored silently?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few cases where this has an effect upstream, most importantly all places already handle/bubble up any error. This is now also more consistent, since it would be done for memstore already but not for filestore before this change.

Also, setStoreState improves in that it doesn't call o.applyState(state) when the o.store.Update(state) was actually ignored/errored. Which was a bug before that would have the states drift.

	err := o.store.Update(state)
	if err == nil {
		o.applyState(state)
	}

}

o.state.Delivered = state.Delivered
o.state.AckFloor = state.AckFloor
o.state.Pending = pending
Expand Down
13 changes: 7 additions & 6 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3552,7 +3552,7 @@ func TestJetStreamClusterNoR1AssetsDuringLameDuck(t *testing.T) {
s.WaitForShutdown()
}

// If a consumer has not been registered (possible in heavily loaded systems with lots of assets)
// If a consumer has not been registered (possible in heavily loaded systems with lots of assets)
// it could miss the signal of a message going away. If that message was pending and expires the
// ack floor could fall below the stream first sequence. This test will force that condition and
// make sure the system resolves itself.
Expand All @@ -3575,7 +3575,9 @@ func TestJetStreamClusterConsumerAckFloorDrift(t *testing.T) {
sub, err := js.PullSubscribe("foo", "C")
require_NoError(t, err)

for i := 0; i < 10; i++ {
// Publish as many messages as the ack floor check threshold +5.
totalMessages := 55
for i := 0; i < totalMessages; i++ {
sendStreamMsg(t, nc, "foo", "HELLO")
}

Expand Down Expand Up @@ -3619,10 +3621,9 @@ func TestJetStreamClusterConsumerAckFloorDrift(t *testing.T) {
o := mset.lookupConsumer("C")
require_NotNil(t, o)
o.mu.Lock()
err = o.setStoreState(state)
o.applyState(state)
cfs := o.store.(*consumerFileStore)
o.mu.Unlock()
require_NoError(t, err)
// The lower layer will ignore, so set more directly.
cfs.mu.Lock()
cfs.state = *state
Expand All @@ -3640,10 +3641,10 @@ func TestJetStreamClusterConsumerAckFloorDrift(t *testing.T) {
ci, err := js.ConsumerInfo("TEST", "C")
require_NoError(t, err)
// Make sure we catch this and adjust.
if ci.AckFloor.Stream == 10 && ci.AckFloor.Consumer == 10 {
if ci.AckFloor.Stream == uint64(totalMessages) && ci.AckFloor.Consumer == 10 {
return nil
}
return fmt.Errorf("AckFloor not correct, expected 10, got %+v", ci.AckFloor)
return fmt.Errorf("AckFloor not correct, expected %d, got %+v", totalMessages, ci.AckFloor)
})
}

Expand Down
7 changes: 2 additions & 5 deletions server/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1689,8 +1689,6 @@ func (o *consumerMemStore) Update(state *ConsumerState) error {
pending = make(map[uint64]*Pending, len(state.Pending))
for seq, p := range state.Pending {
pending[seq] = &Pending{p.Sequence, p.Timestamp}
}
for seq := range pending {
if seq <= state.AckFloor.Stream || seq > state.Delivered.Stream {
return fmt.Errorf("bad pending entry, sequence [%d] out of range", seq)
}
Expand All @@ -1705,18 +1703,17 @@ func (o *consumerMemStore) Update(state *ConsumerState) error {

// Replace our state.
o.mu.Lock()
defer o.mu.Unlock()

// Check to see if this is an outdated update.
if state.Delivered.Consumer < o.state.Delivered.Consumer {
o.mu.Unlock()
if state.Delivered.Consumer < o.state.Delivered.Consumer || state.AckFloor.Stream < o.state.AckFloor.Stream {
return fmt.Errorf("old update ignored")
}

o.state.Delivered = state.Delivered
o.state.AckFloor = state.AckFloor
o.state.Pending = pending
o.state.Redelivered = redelivered
o.mu.Unlock()

return nil
}
Expand Down