Skip to content

Commit

Permalink
[FIXED] Bad meta state on restart could cause deletion of assets. (#5767
Browse files Browse the repository at this point in the history
)

When checking for orphaned assets in a cluster, make sure we are
properly synched with the leader before proceeding.
When R>1 the server would rebuild them after getting the proper meta
state, but in cases of R1 this could lead to dataloss.

Signed-off-by: Derek Collison <derek@nats.io>
  • Loading branch information
derekcollison authored Aug 8, 2024
2 parents e9ee2a0 + 4ea376b commit f8be157
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 9 deletions.
24 changes: 15 additions & 9 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1141,17 +1141,24 @@ type recoveryUpdates struct {
// Streams and consumers are recovered from disk, and the meta layer's mappings
// should clean them up, but under crash scenarios there could be orphans.
func (js *jetStream) checkForOrphans() {
consumerName := func(o *consumer) string {
o.mu.RLock()
defer o.mu.RUnlock()
return o.name
}

// Can not hold jetstream lock while trying to delete streams or consumers.
js.mu.Lock()
s, cc := js.srv, js.cluster
s.Debugf("JetStream cluster checking for orphans")

// We only want to cleanup any orphans if we know we are current with the meta-leader.
meta := cc.meta
if meta == nil || meta.GroupLeader() == _EMPTY_ {
js.mu.Unlock()
s.Debugf("JetStream cluster skipping check for orphans, no meta-leader")
return
}
if !meta.Healthy() {
js.mu.Unlock()
s.Debugf("JetStream cluster skipping check for orphans, not current with the meta-leader")
return
}

var streams []*stream
var consumers []*consumer

Expand All @@ -1164,8 +1171,7 @@ func (js *jetStream) checkForOrphans() {
} else {
// This one is good, check consumers now.
for _, o := range mset.getConsumers() {
consumer := consumerName(o)
if sa.consumers[consumer] == nil {
if sa.consumers[o.String()] == nil {
consumers = append(consumers, o)
}
}
Expand Down Expand Up @@ -1369,7 +1375,7 @@ func (js *jetStream) monitorCluster() {
// Clear.
ru = nil
s.Debugf("Recovered JetStream cluster metadata")
js.checkForOrphans()
time.AfterFunc(30*time.Second, js.checkForOrphans)
// Do a health check here as well.
go checkHealth()
continue
Expand Down
56 changes: 56 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1373,6 +1373,7 @@ func TestJetStreamClusterConsumerNRGCleanup(t *testing.T) {
numStreams++
}
}
f.Close()
}
require_Equal(t, numConsumers, 0)
require_Equal(t, numStreams, 0)
Expand Down Expand Up @@ -2493,3 +2494,58 @@ func TestJetStreamClusterWQRoundRobinSubjectRetention(t *testing.T) {
require_Equal(t, si.State.NumDeleted, 20)
require_Equal(t, si.State.NumSubjects, 4)
}

func TestJetStreamClusterMetaSyncOrphanCleanup(t *testing.T) {
c := createJetStreamClusterWithTemplateAndModHook(t, jsClusterTempl, "R3S", 3,
func(serverName, clusterName, storeDir, conf string) string {
return fmt.Sprintf("%s\nserver_tags: [server:%s]", conf, serverName)
})
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

// Create a bunch of streams on S1
for i := 0; i < 100; i++ {
stream := fmt.Sprintf("TEST-%d", i)
subject := fmt.Sprintf("TEST.%d", i)
_, err := js.AddStream(&nats.StreamConfig{
Name: stream,
Subjects: []string{subject},
Storage: nats.FileStorage,
Placement: &nats.Placement{Tags: []string{"server:S-1"}},
})
require_NoError(t, err)
// Put in 10 msgs to each
for j := 0; j < 10; j++ {
_, err := js.Publish(subject, nil)
require_NoError(t, err)
}
}

// Now we will shutdown S1 and remove all of its meta-data to trip the condition.
s := c.serverByName("S-1")
require_True(t, s != nil)

sd := s.JetStreamConfig().StoreDir
nd := filepath.Join(sd, "$SYS", "_js_", "_meta_")
s.Shutdown()
s.WaitForShutdown()
os.RemoveAll(nd)
s = c.restartServer(s)
c.waitOnServerCurrent(s)
jsz, err := s.Jsz(nil)
require_NoError(t, err)
require_Equal(t, jsz.Streams, 100)

// These will be recreated by the meta layer, but if the orphan detection deleted them they will be empty,
// so check all streams to make sure they still have data.
acc := s.GlobalAccount()
var state StreamState
for i := 0; i < 100; i++ {
mset, err := acc.lookupStream(fmt.Sprintf("TEST-%d", i))
require_NoError(t, err)
mset.store.FastState(&state)
require_Equal(t, state.Msgs, 10)
}
}

0 comments on commit f8be157

Please sign in to comment.