Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error handling in metaSnapshot #6361

Merged
merged 1 commit into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,12 @@ func (s *Server) JetStreamSnapshotMeta() error {
return errNotLeader
}

return meta.InstallSnapshot(js.metaSnapshot())
snap, err := js.metaSnapshot()
if err != nil {
return err
}

return meta.InstallSnapshot(snap)
}

func (s *Server) JetStreamStepdownStream(account, stream string) error {
Expand Down Expand Up @@ -1341,7 +1346,10 @@ func (js *jetStream) monitorCluster() {
}
// For the meta layer we want to snapshot when asked if we need one or have any entries that we can compact.
if ne, _ := n.Size(); ne > 0 || n.NeedSnapshot() {
if err := n.InstallSnapshot(js.metaSnapshot()); err == nil {
snap, err := js.metaSnapshot()
if err != nil {
s.Warnf("Error generating JetStream cluster snapshot: %v", err)
} else if err = n.InstallSnapshot(snap); err == nil {
lastSnapTime = time.Now()
} else if err != errNoSnapAvailable && err != errNodeClosed {
s.Warnf("Error snapshotting JetStream cluster state: %v", err)
Expand Down Expand Up @@ -1538,7 +1546,7 @@ func (js *jetStream) clusterStreamConfig(accName, streamName string) (StreamConf
return StreamConfig{}, false
}

func (js *jetStream) metaSnapshot() []byte {
func (js *jetStream) metaSnapshot() ([]byte, error) {
start := time.Now()
js.mu.RLock()
s := js.srv
Expand Down Expand Up @@ -1578,16 +1586,22 @@ func (js *jetStream) metaSnapshot() []byte {

if len(streams) == 0 {
js.mu.RUnlock()
return nil
return nil, nil
}

// Track how long it took to marshal the JSON
mstart := time.Now()
b, _ := json.Marshal(streams)
b, err := json.Marshal(streams)
mend := time.Since(mstart)

js.mu.RUnlock()

// Must not be possible for a JSON marshaling error to result
// in an empty snapshot.
if err != nil {
return nil, err
}

// Track how long it took to compress the JSON
cstart := time.Now()
snap := s2.Encode(nil, b)
Expand All @@ -1597,7 +1611,7 @@ func (js *jetStream) metaSnapshot() []byte {
s.rateLimitFormatWarnf("Metalayer snapshot took %.3fs (streams: %d, consumers: %d, marshal: %.3fs, s2: %.3fs, uncompressed: %d, compressed: %d)",
took.Seconds(), nsa, nca, mend.Seconds(), cend.Seconds(), len(b), len(snap))
}
return snap
return snap, nil
}

func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecovering bool) error {
Expand Down
3 changes: 2 additions & 1 deletion server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4472,7 +4472,8 @@ func TestJetStreamClusterMetaSnapshotMustNotIncludePendingConsumers(t *testing.T
mjs.mu.Unlock()

// Create snapshot, this should not contain pending consumers.
snap := mjs.metaSnapshot()
snap, err := mjs.metaSnapshot()
require_NoError(t, err)

ru := &recoveryUpdates{
removeStreams: make(map[string]*streamAssignment),
Expand Down
3 changes: 2 additions & 1 deletion server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11293,7 +11293,8 @@ func TestNoRaceJetStreamClusterLargeMetaSnapshotTiming(t *testing.T) {
n := js.getMetaGroup()
// Now let's see how long it takes to create a meta snapshot and how big it is.
start := time.Now()
snap := js.metaSnapshot()
snap, err := js.metaSnapshot()
require_NoError(t, err)
require_NoError(t, n.InstallSnapshot(snap))
t.Logf("Took %v to snap meta with size of %v\n", time.Since(start), friendlyBytes(len(snap)))
}
Loading