Skip to content

Commit

Permalink
[FIXED] Don't InstallSnapshot during shutdown, would race with `mon…
Browse files Browse the repository at this point in the history
…itorStream`/`monitorConsumer` (#6153)

When stopping a stream or consumer, we would attempt to install a
snapshot. However, this would race with what's happening in
`monitorStream`/`monitorConsumer` at that time.

For example:
1. In `applyStreamEntries` we call into `mset.processJetStreamMsg` to
persist one or multiple messages.
2. We call `mset.stop(..)` either before or during the above.
3. In `mset.stop(..)` we'd wait for `mset.processJetStreamMsg` to
release the lock so we can enter `mset.stateSnapshotLocked()`. **We
create a snapshot with new state here!**
4. Now we call into `InstallSnapshot` to persist above snapshot, but
`n.applied` does not contain the right value, the value will be lower.
5. Then `applyStreamEntries` finishes and we end with calling
`n.Applied(..)`.

This would be a race condition depending on if 4 happened before or
after 5.

It's essential that the snapshot we make is aligned with the `n.applied`
value. If we don't that means we'll replay and need to increase
`mset.clfs` which will snowball into stream desync due to this shift.

The only place where we can guarantee that the snapshot and applied are
aligned is in `doSnapshot` of `monitorStream` and `monitorConsumer` (and
`monitorCluster`), so we must not attempt installing snapshots outside
of those.


Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
derekcollison authored and MauriceVanVeen committed Nov 21, 2024
1 parent f6afc3e commit ad6877e
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 9 deletions.
6 changes: 0 additions & 6 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5245,12 +5245,6 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
if dflag {
n.Delete()
} else {
// Try to install snapshot on clean exit
if o.store != nil && (o.retention != LimitsPolicy || n.NeedSnapshot()) {
if snap, err := o.store.EncodedState(); err == nil {
n.InstallSnapshot(snap)
}
}
n.Stop()
}
}
Expand Down
1 change: 0 additions & 1 deletion server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2405,7 +2405,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
// fully recovered from disk.
isRecovering := true

// Should only to be called from leader.
doSnapshot := func() {
if mset == nil || isRecovering || isRestore || time.Since(lastSnapTime) < minSnapDelta {
return
Expand Down
185 changes: 185 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4118,3 +4118,188 @@ func TestJetStreamClusterConsumerDontSendSnapshotOnLeaderChange(t *testing.T) {
}
}
}

func TestJetStreamClusterDontInstallSnapshotWhenStoppingStream(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Retention: nats.WorkQueuePolicy,
Replicas: 3,
})
require_NoError(t, err)

_, err = js.Publish("foo", nil)
require_NoError(t, err)

// Wait for all servers to have applied everything.
var maxApplied uint64
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
maxApplied = 0
for _, s := range c.servers {
acc, err := s.lookupAccount(globalAccountName)
if err != nil {
return err
}
mset, err := acc.lookupStream("TEST")
if err != nil {
return err
}
_, _, applied := mset.node.Progress()
if maxApplied == 0 {
maxApplied = applied
} else if applied < maxApplied {
return fmt.Errorf("applied not high enough, expected %d, got %d", applied, maxApplied)
} else if applied > maxApplied {
return fmt.Errorf("applied higher on one server, expected %d, got %d", applied, maxApplied)
}
}
return nil
})

// Install a snapshot on a follower.
s := c.randomNonStreamLeader(globalAccountName, "TEST")
acc, err := s.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
err = mset.node.InstallSnapshot(mset.stateSnapshotLocked())
require_NoError(t, err)

// Validate the snapshot reflects applied.
validateStreamState := func(snap *snapshot) {
t.Helper()
require_Equal(t, snap.lastIndex, maxApplied)
ss, err := DecodeStreamState(snap.data)
require_NoError(t, err)
require_Equal(t, ss.FirstSeq, 1)
require_Equal(t, ss.LastSeq, 1)
}
snap, err := mset.node.(*raft).loadLastSnapshot()
require_NoError(t, err)
validateStreamState(snap)

// Simulate a message being stored, but not calling Applied yet.
err = mset.processJetStreamMsg("foo", _EMPTY_, nil, nil, 1, time.Now().UnixNano(), nil)
require_NoError(t, err)

// Simulate the stream being stopped before we're able to call Applied.
// If we'd install a snapshot during this, which would be a race condition,
// we'd store a snapshot with state that's ahead of applied.
err = mset.stop(false, false)
require_NoError(t, err)

// Validate the snapshot is the same as before.
snap, err = mset.node.(*raft).loadLastSnapshot()
require_NoError(t, err)
validateStreamState(snap)
}

func TestJetStreamClusterDontInstallSnapshotWhenStoppingConsumer(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Retention: nats.WorkQueuePolicy,
Replicas: 3,
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "CONSUMER",
Replicas: 3,
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

// Add a message and let the consumer ack it, this moves the consumer's RAFT applied up.
_, err = js.Publish("foo", nil)
require_NoError(t, err)
sub, err := js.PullSubscribe("foo", "CONSUMER")
require_NoError(t, err)
msgs, err := sub.Fetch(1)
require_NoError(t, err)
require_Len(t, len(msgs), 1)
err = msgs[0].AckSync()
require_NoError(t, err)

// Wait for all servers to have applied everything.
var maxApplied uint64
checkFor(t, 5*time.Second, 100*time.Millisecond, func() error {
maxApplied = 0
for _, s := range c.servers {
acc, err := s.lookupAccount(globalAccountName)
if err != nil {
return err
}
mset, err := acc.lookupStream("TEST")
if err != nil {
return err
}
o := mset.lookupConsumer("CONSUMER")
if o == nil {
return errors.New("consumer not found")
}
_, _, applied := o.node.Progress()
if maxApplied == 0 {
maxApplied = applied
} else if applied < maxApplied {
return fmt.Errorf("applied not high enough, expected %d, got %d", applied, maxApplied)
} else if applied > maxApplied {
return fmt.Errorf("applied higher on one server, expected %d, got %d", applied, maxApplied)
}
}
return nil
})

// Install a snapshot on a follower.
s := c.randomNonStreamLeader(globalAccountName, "TEST")
acc, err := s.lookupAccount(globalAccountName)
require_NoError(t, err)
mset, err := acc.lookupStream("TEST")
require_NoError(t, err)
o := mset.lookupConsumer("CONSUMER")
require_NotNil(t, o)
snapBytes, err := o.store.EncodedState()
require_NoError(t, err)
err = o.node.InstallSnapshot(snapBytes)
require_NoError(t, err)

// Validate the snapshot reflects applied.
validateStreamState := func(snap *snapshot) {
t.Helper()
require_Equal(t, snap.lastIndex, maxApplied)
state, err := decodeConsumerState(snap.data)
require_NoError(t, err)
require_Equal(t, state.Delivered.Consumer, 1)
require_Equal(t, state.Delivered.Stream, 1)
}
snap, err := o.node.(*raft).loadLastSnapshot()
require_NoError(t, err)
validateStreamState(snap)

// Simulate a message being delivered, but not calling Applied yet.
err = o.store.UpdateDelivered(2, 2, 1, time.Now().UnixNano())
require_NoError(t, err)

// Simulate the consumer being stopped before we're able to call Applied.
// If we'd install a snapshot during this, which would be a race condition,
// we'd store a snapshot with state that's ahead of applied.
err = o.stop()
require_NoError(t, err)

// Validate the snapshot is the same as before.
snap, err = o.node.(*raft).loadLastSnapshot()
require_NoError(t, err)
validateStreamState(snap)
}
2 changes: 0 additions & 2 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -5183,8 +5183,6 @@ func (mset *stream) stop(deleteFlag, advisory bool) error {
n.Delete()
sa = mset.sa
} else {
// Always attempt snapshot on clean exit.
n.InstallSnapshot(mset.stateSnapshotLocked())
n.Stop()
}
}
Expand Down

0 comments on commit ad6877e

Please sign in to comment.