Skip to content

Commit

Permalink
[FIXED] Don't InstallSnapshot during shutdown, would race with monito…
Browse files Browse the repository at this point in the history
…rStream/monitorConsumer

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
MauriceVanVeen committed Nov 20, 2024
1 parent 28c581b commit 6938f97
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 9 deletions.
6 changes: 0 additions & 6 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5700,12 +5700,6 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
if dflag {
n.Delete()
} else {
// Try to install snapshot on clean exit
if o.store != nil && (o.retention != LimitsPolicy || n.NeedSnapshot()) {
if snap, err := o.store.EncodedState(); err == nil {
n.InstallSnapshot(snap)
}
}
n.Stop()
}
}
Expand Down
1 change: 0 additions & 1 deletion server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2425,7 +2425,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// fully recovered from disk.
isRecovering := true

// Should only to be called from leader.
doSnapshot := func() {
if mset == nil || isRecovering || isRestore || time.Since(lastSnapTime) < minSnapDelta {
return
Expand Down
185 changes: 185 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4585,3 +4585,188 @@ func TestJetStreamClusterConsumerDontSendSnapshotOnLeaderChange(t *testing.T) {
}
}
}

func TestJetStreamClusterDontInstallSnapshotWhenStoppingStream(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Retention: nats.WorkQueuePolicy,
Replicas: 3,
})
require_NoError(t, err)

_, err = js.Publish("foo", nil)
require_NoError(t, err)

// Wait for all servers to have applied everything.
var maxApplied uint64
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
maxApplied = 0
for _, s := range c.servers {
acc, err := s.lookupAccount(globalAccountName)
if err != nil {
return err
}
mset, err := acc.lookupStream("TEST")
if err != nil {
return err
}
_, _, applied := mset.node.Progress()
if maxApplied == 0 {
maxApplied = applied
} else if applied < maxApplied {
return fmt.Errorf("applied not high enough, expected %d, got %d", applied, maxApplied)
} else if applied > maxApplied {
return fmt.Errorf("applied higher on one server, expected %d, got %d", applied, maxApplied)
}
}
return nil
})

// Install a snapshot on a follower.
s := c.randomNonStreamLeader(globalAccountName, "TEST")
acc, err := s.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
err = mset.node.InstallSnapshot(mset.stateSnapshotLocked())
require_NoError(t, err)

// Validate the snapshot reflects applied.
validateStreamState := func(snap *snapshot) {
t.Helper()
require_Equal(t, snap.lastIndex, maxApplied)
ss, err := DecodeStreamState(snap.data)
require_NoError(t, err)
require_Equal(t, ss.FirstSeq, 1)
require_Equal(t, ss.LastSeq, 1)
}
snap, err := mset.node.(*raft).loadLastSnapshot()
require_NoError(t, err)
validateStreamState(snap)

// Simulate a message being stored, but not calling Applied yet.
err = mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, 1, time.Now().UnixNano(), nil)
require_NoError(t, err)

// Simulate the stream being stopped before we're able to call Applied.
// If we'd install a snapshot during this, which would be a race condition,
// we'd store a snapshot with state that's ahead of applied.
err = mset.stop(false, false)
require_NoError(t, err)

// Validate the snapshot is the same as before.
snap, err = mset.node.(*raft).loadLastSnapshot()
require_NoError(t, err)
validateStreamState(snap)
}

func TestJetStreamClusterDontInstallSnapshotWhenStoppingConsumer(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Retention: nats.WorkQueuePolicy,
Replicas: 3,
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "CONSUMER",
Replicas: 3,
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

// Add a message and let the consumer ack it, this moves the consumer's RAFT applied up.
_, err = js.Publish("foo", nil)
require_NoError(t, err)
sub, err := js.PullSubscribe("foo", "CONSUMER")
require_NoError(t, err)
msgs, err := sub.Fetch(1)
require_NoError(t, err)
require_Len(t, len(msgs), 1)
err = msgs[0].AckSync()
require_NoError(t, err)

// Wait for all servers to have applied everything.
var maxApplied uint64
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
maxApplied = 0
for _, s := range c.servers {
acc, err := s.lookupAccount(globalAccountName)
if err != nil {
return err
}
mset, err := acc.lookupStream("TEST")
if err != nil {
return err
}
o := mset.lookupConsumer("CONSUMER")
if o == nil {
return errors.New("consumer not found")
}
_, _, applied := o.node.Progress()
if maxApplied == 0 {
maxApplied = applied
} else if applied < maxApplied {
return fmt.Errorf("applied not high enough, expected %d, got %d", applied, maxApplied)
} else if applied > maxApplied {
return fmt.Errorf("applied higher on one server, expected %d, got %d", applied, maxApplied)
}
}
return nil
})

// Install a snapshot on a follower.
s := c.randomNonStreamLeader(globalAccountName, "TEST")
acc, err := s.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
o := mset.lookupConsumer("CONSUMER")
require_NotNil(t, o)
snapBytes, err := o.store.EncodedState()
require_NoError(t, err)
err = o.node.InstallSnapshot(snapBytes)
require_NoError(t, err)

// Validate the snapshot reflects applied.
validateStreamState := func(snap *snapshot) {
t.Helper()
require_Equal(t, snap.lastIndex, maxApplied)
state, err := decodeConsumerState(snap.data)
require_NoError(t, err)
require_Equal(t, state.Delivered.Consumer, 1)
require_Equal(t, state.Delivered.Stream, 1)
}
snap, err := o.node.(*raft).loadLastSnapshot()
require_NoError(t, err)
validateStreamState(snap)

// Simulate a message being delivered, but not calling Applied yet.
err = o.store.UpdateDelivered(2, 2, 1, time.Now().UnixNano())
require_NoError(t, err)

// Simulate the consumer being stopped before we're able to call Applied.
// If we'd install a snapshot during this, which would be a race condition,
// we'd store a snapshot with state that's ahead of applied.
err = o.stop()
require_NoError(t, err)

// Validate the snapshot is the same as before.
snap, err = o.node.(*raft).loadLastSnapshot()
require_NoError(t, err)
validateStreamState(snap)
}
2 changes: 0 additions & 2 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5563,8 +5563,6 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
n.Delete()
sa = mset.sa
} else {
// Always attempt snapshot on clean exit.
n.InstallSnapshot(mset.stateSnapshotLocked())
n.Stop()
}
}
Expand Down

0 comments on commit 6938f97

Please sign in to comment.