diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index fef9958add4..51b9e504da9 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1141,17 +1141,24 @@ type recoveryUpdates struct { // Streams and consumers are recovered from disk, and the meta layer's mappings // should clean them up, but under crash scenarios there could be orphans. func (js *jetStream) checkForOrphans() { - consumerName := func(o *consumer) string { - o.mu.RLock() - defer o.mu.RUnlock() - return o.name - } - // Can not hold jetstream lock while trying to delete streams or consumers. js.mu.Lock() s, cc := js.srv, js.cluster s.Debugf("JetStream cluster checking for orphans") + // We only want to cleanup any orphans if we know we are current with the meta-leader. + meta := cc.meta + if meta == nil || meta.GroupLeader() == _EMPTY_ { + js.mu.Unlock() + s.Debugf("JetStream cluster skipping check for orphans, no meta-leader") + return + } + if !meta.Healthy() { + js.mu.Unlock() + s.Debugf("JetStream cluster skipping check for orphans, not current with the meta-leader") + return + } + var streams []*stream var consumers []*consumer @@ -1164,8 +1171,7 @@ func (js *jetStream) checkForOrphans() { } else { // This one is good, check consumers now. for _, o := range mset.getConsumers() { - consumer := consumerName(o) - if sa.consumers[consumer] == nil { + if sa.consumers[o.String()] == nil { consumers = append(consumers, o) } } @@ -1369,7 +1375,7 @@ func (js *jetStream) monitorCluster() { // Clear. ru = nil s.Debugf("Recovered JetStream cluster metadata") - js.checkForOrphans() + time.AfterFunc(30*time.Second, js.checkForOrphans) // Do a health check here as well. go checkHealth() continue diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index c97cd64838a..4a0152e312d 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -1373,6 +1373,7 @@ func TestJetStreamClusterConsumerNRGCleanup(t *testing.T) { numStreams++ } } + f.Close() } require_Equal(t, numConsumers, 0) require_Equal(t, numStreams, 0) @@ -2493,3 +2494,58 @@ func TestJetStreamClusterWQRoundRobinSubjectRetention(t *testing.T) { require_Equal(t, si.State.NumDeleted, 20) require_Equal(t, si.State.NumSubjects, 4) } + +func TestJetStreamClusterMetaSyncOrphanCleanup(t *testing.T) { + c := createJetStreamClusterWithTemplateAndModHook(t, jsClusterTempl, "R3S", 3, + func(serverName, clusterName, storeDir, conf string) string { + return fmt.Sprintf("%s\nserver_tags: [server:%s]", conf, serverName) + }) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + // Create a bunch of streams on S1 + for i := 0; i < 100; i++ { + stream := fmt.Sprintf("TEST-%d", i) + subject := fmt.Sprintf("TEST.%d", i) + _, err := js.AddStream(&nats.StreamConfig{ + Name: stream, + Subjects: []string{subject}, + Storage: nats.FileStorage, + Placement: &nats.Placement{Tags: []string{"server:S-1"}}, + }) + require_NoError(t, err) + // Put in 10 msgs to each + for j := 0; j < 10; j++ { + _, err := js.Publish(subject, nil) + require_NoError(t, err) + } + } + + // Now we will shutdown S1 and remove all of its meta-data to trip the condition. + s := c.serverByName("S-1") + require_True(t, s != nil) + + sd := s.JetStreamConfig().StoreDir + nd := filepath.Join(sd, "$SYS", "_js_", "_meta_") + s.Shutdown() + s.WaitForShutdown() + os.RemoveAll(nd) + s = c.restartServer(s) + c.waitOnServerCurrent(s) + jsz, err := s.Jsz(nil) + require_NoError(t, err) + require_Equal(t, jsz.Streams, 100) + + // These will be recreated by the meta layer, but if the orphan detection deleted them they will be empty, + // so check all streams to make sure they still have data. + acc := s.GlobalAccount() + var state StreamState + for i := 0; i < 100; i++ { + mset, err := acc.lookupStream(fmt.Sprintf("TEST-%d", i)) + require_NoError(t, err) + mset.store.FastState(&state) + require_Equal(t, state.Msgs, 10) + } +}