Skip to content

Commit 5fbe067

Browse files
committed
Error handling in metaSnapshot
Since we use `MarshalJSON()` on some types, we must be careful that a JSON marshalling error cannot be dropped, resulting in an empty snapshot. Signed-off-by: Neil Twigg <neil@nats.io>
1 parent 1dcd873 commit 5fbe067

File tree

3 files changed

+24
-8
lines changed

3 files changed

+24
-8
lines changed

server/jetstream_cluster.go

+20-6
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,12 @@ func (s *Server) JetStreamSnapshotMeta() error {
268268
return errNotLeader
269269
}
270270

271-
return meta.InstallSnapshot(js.metaSnapshot())
271+
snap, err := js.metaSnapshot()
272+
if err != nil {
273+
return err
274+
}
275+
276+
return meta.InstallSnapshot(snap)
272277
}
273278

274279
func (s *Server) JetStreamStepdownStream(account, stream string) error {
@@ -1341,7 +1346,10 @@ func (js *jetStream) monitorCluster() {
13411346
}
13421347
// For the meta layer we want to snapshot when asked if we need one or have any entries that we can compact.
13431348
if ne, _ := n.Size(); ne > 0 || n.NeedSnapshot() {
1344-
if err := n.InstallSnapshot(js.metaSnapshot()); err == nil {
1349+
snap, err := js.metaSnapshot()
1350+
if err != nil {
1351+
s.Warnf("Error generating JetStream cluster snapshot: %v", err)
1352+
} else if err = n.InstallSnapshot(snap); err == nil {
13451353
lastSnapTime = time.Now()
13461354
} else if err != errNoSnapAvailable && err != errNodeClosed {
13471355
s.Warnf("Error snapshotting JetStream cluster state: %v", err)
@@ -1538,7 +1546,7 @@ func (js *jetStream) clusterStreamConfig(accName, streamName string) (StreamConf
15381546
return StreamConfig{}, false
15391547
}
15401548

1541-
func (js *jetStream) metaSnapshot() []byte {
1549+
func (js *jetStream) metaSnapshot() ([]byte, error) {
15421550
start := time.Now()
15431551
js.mu.RLock()
15441552
s := js.srv
@@ -1578,16 +1586,22 @@ func (js *jetStream) metaSnapshot() []byte {
15781586

15791587
if len(streams) == 0 {
15801588
js.mu.RUnlock()
1581-
return nil
1589+
return nil, nil
15821590
}
15831591

15841592
// Track how long it took to marshal the JSON
15851593
mstart := time.Now()
1586-
b, _ := json.Marshal(streams)
1594+
b, err := json.Marshal(streams)
15871595
mend := time.Since(mstart)
15881596

15891597
js.mu.RUnlock()
15901598

1599+
// Must not be possible for a JSON marshalling error to result
1600+
// in an empty snapshot.
1601+
if err != nil {
1602+
return nil, err
1603+
}
1604+
15911605
// Track how long it took to compress the JSON
15921606
cstart := time.Now()
15931607
snap := s2.Encode(nil, b)
@@ -1597,7 +1611,7 @@ func (js *jetStream) metaSnapshot() []byte {
15971611
s.rateLimitFormatWarnf("Metalayer snapshot took %.3fs (streams: %d, consumers: %d, marshal: %.3fs, s2: %.3fs, uncompressed: %d, compressed: %d)",
15981612
took.Seconds(), nsa, nca, mend.Seconds(), cend.Seconds(), len(b), len(snap))
15991613
}
1600-
return snap
1614+
return snap, nil
16011615
}
16021616

16031617
func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecovering bool) error {

server/jetstream_cluster_4_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -4472,7 +4472,8 @@ func TestJetStreamClusterMetaSnapshotMustNotIncludePendingConsumers(t *testing.T
44724472
mjs.mu.Unlock()
44734473

44744474
// Create snapshot, this should not contain pending consumers.
4475-
snap := mjs.metaSnapshot()
4475+
snap, err := mjs.metaSnapshot()
4476+
require_NoError(t, err)
44764477

44774478
ru := &recoveryUpdates{
44784479
removeStreams: make(map[string]*streamAssignment),

server/norace_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -11293,7 +11293,8 @@ func TestNoRaceJetStreamClusterLargeMetaSnapshotTiming(t *testing.T) {
1129311293
n := js.getMetaGroup()
1129411294
// Now let's see how long it takes to create a meta snapshot and how big it is.
1129511295
start := time.Now()
11296-
snap := js.metaSnapshot()
11296+
snap, err := js.metaSnapshot()
11297+
require_NoError(t, err)
1129711298
require_NoError(t, n.InstallSnapshot(snap))
1129811299
t.Logf("Took %v to snap meta with size of %v\n", time.Since(start), friendlyBytes(len(snap)))
1129911300
}

0 commit comments

Comments
 (0)