diff --git a/server/consumer.go b/server/consumer.go index 5755451710..f8c0307c92 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -5245,12 +5245,6 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error { if dflag { n.Delete() } else { - // Try to install snapshot on clean exit - if o.store != nil && (o.retention != LimitsPolicy || n.NeedSnapshot()) { - if snap, err := o.store.EncodedState(); err == nil { - n.InstallSnapshot(snap) - } - } n.Stop() } } diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 349e05a43b..f8b8ad1969 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -2405,7 +2405,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // fully recovered from disk. isRecovering := true - // Should only to be called from leader. doSnapshot := func() { if mset == nil || isRecovering || isRestore || time.Since(lastSnapTime) < minSnapDelta { return diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index e3b29acb1b..92c9159d33 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -4118,3 +4118,188 @@ func TestJetStreamClusterConsumerDontSendSnapshotOnLeaderChange(t *testing.T) { } } } + +func TestJetStreamClusterDontInstallSnapshotWhenStoppingStream(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Retention: nats.WorkQueuePolicy, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.Publish("foo", nil) + require_NoError(t, err) + + // Wait for all servers to have applied everything. + var maxApplied uint64 + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + maxApplied = 0 + for _, s := range c.servers { + acc, err := s.lookupAccount(globalAccountName) + if err != nil { + return err + } + mset, err := acc.lookupStream("TEST") + if err != nil { + return err + } + _, _, applied := mset.node.Progress() + if maxApplied == 0 { + maxApplied = applied + } else if applied < maxApplied { + return fmt.Errorf("applied not high enough, expected %d, got %d", applied, maxApplied) + } else if applied > maxApplied { + return fmt.Errorf("applied higher on one server, expected %d, got %d", applied, maxApplied) + } + } + return nil + }) + + // Install a snapshot on a follower. + s := c.randomNonStreamLeader(globalAccountName, "TEST") + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + err = mset.node.InstallSnapshot(mset.stateSnapshotLocked()) + require_NoError(t, err) + + // Validate the snapshot reflects applied. + validateStreamState := func(snap *snapshot) { + t.Helper() + require_Equal(t, snap.lastIndex, maxApplied) + ss, err := DecodeStreamState(snap.data) + require_NoError(t, err) + require_Equal(t, ss.FirstSeq, 1) + require_Equal(t, ss.LastSeq, 1) + } + snap, err := mset.node.(*raft).loadLastSnapshot() + require_NoError(t, err) + validateStreamState(snap) + + // Simulate a message being stored, but not calling Applied yet. + err = mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, 1, time.Now().UnixNano(), nil) + require_NoError(t, err) + + // Simulate the stream being stopped before we're able to call Applied. + // If we'd install a snapshot during this, which would be a race condition, + // we'd store a snapshot with state that's ahead of applied. + err = mset.stop(false, false) + require_NoError(t, err) + + // Validate the snapshot is the same as before. + snap, err = mset.node.(*raft).loadLastSnapshot() + require_NoError(t, err) + validateStreamState(snap) +} + +func TestJetStreamClusterDontInstallSnapshotWhenStoppingConsumer(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Retention: nats.WorkQueuePolicy, + Replicas: 3, + }) + require_NoError(t, err) + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "CONSUMER", + Replicas: 3, + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + + // Add a message and let the consumer ack it, this moves the consumer's RAFT applied up. + _, err = js.Publish("foo", nil) + require_NoError(t, err) + sub, err := js.PullSubscribe("foo", "CONSUMER") + require_NoError(t, err) + msgs, err := sub.Fetch(1) + require_NoError(t, err) + require_Len(t, len(msgs), 1) + err = msgs[0].AckSync() + require_NoError(t, err) + + // Wait for all servers to have applied everything. + var maxApplied uint64 + checkFor(t, 5*time.Second, 100*time.Millisecond, func() error { + maxApplied = 0 + for _, s := range c.servers { + acc, err := s.lookupAccount(globalAccountName) + if err != nil { + return err + } + mset, err := acc.lookupStream("TEST") + if err != nil { + return err + } + o := mset.lookupConsumer("CONSUMER") + if o == nil { + return errors.New("consumer not found") + } + _, _, applied := o.node.Progress() + if maxApplied == 0 { + maxApplied = applied + } else if applied < maxApplied { + return fmt.Errorf("applied not high enough, expected %d, got %d", applied, maxApplied) + } else if applied > maxApplied { + return fmt.Errorf("applied higher on one server, expected %d, got %d", applied, maxApplied) + } + } + return nil + }) + + // Install a snapshot on a follower. + s := c.randomNonStreamLeader(globalAccountName, "TEST") + acc, err := s.lookupAccount(globalAccountName) + require_NoError(t, err) + mset, err := acc.lookupStream("TEST") + require_NoError(t, err) + o := mset.lookupConsumer("CONSUMER") + require_NotNil(t, o) + snapBytes, err := o.store.EncodedState() + require_NoError(t, err) + err = o.node.InstallSnapshot(snapBytes) + require_NoError(t, err) + + // Validate the snapshot reflects applied. + validateStreamState := func(snap *snapshot) { + t.Helper() + require_Equal(t, snap.lastIndex, maxApplied) + state, err := decodeConsumerState(snap.data) + require_NoError(t, err) + require_Equal(t, state.Delivered.Consumer, 1) + require_Equal(t, state.Delivered.Stream, 1) + } + snap, err := o.node.(*raft).loadLastSnapshot() + require_NoError(t, err) + validateStreamState(snap) + + // Simulate a message being delivered, but not calling Applied yet. + err = o.store.UpdateDelivered(2, 2, 1, time.Now().UnixNano()) + require_NoError(t, err) + + // Simulate the consumer being stopped before we're able to call Applied. + // If we'd install a snapshot during this, which would be a race condition, + // we'd store a snapshot with state that's ahead of applied. + err = o.stop() + require_NoError(t, err) + + // Validate the snapshot is the same as before. + snap, err = o.node.(*raft).loadLastSnapshot() + require_NoError(t, err) + validateStreamState(snap) +} diff --git a/server/stream.go b/server/stream.go index 57a1fa015a..1c0824cdb4 100644 --- a/server/stream.go +++ b/server/stream.go @@ -5183,8 +5183,6 @@ func (mset *stream) stop(deleteFlag, advisory bool) error { n.Delete() sa = mset.sa } else { - // Always attempt snapshot on clean exit. - n.InstallSnapshot(mset.stateSnapshotLocked()) n.Stop() } }