@@ -267,7 +267,12 @@ func (s *Server) JetStreamSnapshotMeta() error {
267
267
return errNotLeader
268
268
}
269
269
270
- return meta .InstallSnapshot (js .metaSnapshot ())
270
+ snap , err := js .metaSnapshot ()
271
+ if err != nil {
272
+ return err
273
+ }
274
+
275
+ return meta .InstallSnapshot (snap )
271
276
}
272
277
273
278
func (s * Server ) JetStreamStepdownStream (account , stream string ) error {
@@ -437,73 +442,6 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool {
437
442
return false
438
443
}
439
444
440
- // Restart the stream in question.
441
- // Should only be called when the stream is known to be in a bad state.
442
- func (js * jetStream ) restartStream (acc * Account , csa * streamAssignment ) {
443
- js .mu .Lock ()
444
- s , cc := js .srv , js .cluster
445
- if cc == nil {
446
- js .mu .Unlock ()
447
- return
448
- }
449
- // Need to lookup the one directly from the meta layer, what we get handed is a copy if coming from isStreamHealthy.
450
- asa := cc .streams [acc .Name ]
451
- if asa == nil {
452
- js .mu .Unlock ()
453
- return
454
- }
455
- sa := asa [csa .Config .Name ]
456
- if sa == nil {
457
- js .mu .Unlock ()
458
- return
459
- }
460
- // Make sure to clear out the raft node if still present in the meta layer.
461
- if rg := sa .Group ; rg != nil && rg .node != nil {
462
- if rg .node .State () != Closed {
463
- rg .node .Stop ()
464
- }
465
- rg .node = nil
466
- }
467
- sinceCreation := time .Since (sa .Created )
468
- js .mu .Unlock ()
469
-
470
- // Process stream assignment to recreate.
471
- // Check that we have given system enough time to start us up.
472
- // This will be longer than obvious, and matches consumer logic in case system very busy.
473
- if sinceCreation < 10 * time .Second {
474
- s .Debugf ("Not restarting missing stream '%s > %s', too soon since creation %v" ,
475
- acc , csa .Config .Name , sinceCreation )
476
- return
477
- }
478
-
479
- js .processStreamAssignment (sa )
480
-
481
- // If we had consumers assigned to this server they will be present in the copy, csa.
482
- // They also need to be processed. The csa consumers is a copy of only our consumers,
483
- // those assigned to us, but the consumer assignment's there are direct from the meta
484
- // layer to make this part much easier and avoid excessive lookups.
485
- for _ , cca := range csa .consumers {
486
- if cca .deleted {
487
- continue
488
- }
489
- // Need to look up original as well here to make sure node is nil.
490
- js .mu .Lock ()
491
- ca := sa .consumers [cca .Name ]
492
- if ca != nil && ca .Group != nil {
493
- // Make sure the node is stopped if still running.
494
- if node := ca .Group .node ; node != nil && node .State () != Closed {
495
- node .Stop ()
496
- }
497
- // Make sure node is wiped.
498
- ca .Group .node = nil
499
- }
500
- js .mu .Unlock ()
501
- if ca != nil {
502
- js .processConsumerAssignment (ca )
503
- }
504
- }
505
- }
506
-
507
445
// isStreamHealthy will determine if the stream is up to date or very close.
508
446
// For R1 it will make sure the stream is present on this server.
509
447
func (js * jetStream ) isStreamHealthy (acc * Account , sa * streamAssignment ) bool {
@@ -529,7 +467,6 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {
529
467
// First lookup stream and make sure its there.
530
468
mset , err := acc .lookupStream (streamName )
531
469
if err != nil {
532
- js .restartStream (acc , sa )
533
470
return false
534
471
}
535
472
@@ -554,8 +491,6 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {
554
491
s .Warnf ("Detected stream cluster node skew '%s > %s'" , acc .GetName (), streamName )
555
492
node .Delete ()
556
493
mset .resetClusteredState (nil )
557
- } else if node .State () == Closed {
558
- js .restartStream (acc , sa )
559
494
}
560
495
}
561
496
return false
@@ -585,37 +520,9 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum
585
520
node := ca .Group .node
586
521
js .mu .RUnlock ()
587
522
588
- // When we try to restart we nil out the node if applicable
589
- // and reprocess the consumer assignment.
590
- restartConsumer := func () {
591
- mset .mu .RLock ()
592
- accName , streamName := mset .acc .GetName (), mset .cfg .Name
593
- mset .mu .RUnlock ()
594
-
595
- js .mu .Lock ()
596
- deleted := ca .deleted
597
- // Check that we have not just been created.
598
- if ! deleted && time .Since (ca .Created ) < 10 * time .Second {
599
- s .Debugf ("Not restarting missing consumer '%s > %s > %s', too soon since creation %v" ,
600
- accName , streamName , consumer , time .Since (ca .Created ))
601
- js .mu .Unlock ()
602
- return
603
- }
604
- // Make sure the node is stopped if still running.
605
- if node != nil && node .State () != Closed {
606
- node .Stop ()
607
- }
608
- ca .Group .node = nil
609
- js .mu .Unlock ()
610
- if ! deleted {
611
- js .processConsumerAssignment (ca )
612
- }
613
- }
614
-
615
523
// Check if not running at all.
616
524
o := mset .lookupConsumer (consumer )
617
525
if o == nil {
618
- restartConsumer ()
619
526
return false
620
527
}
621
528
@@ -630,11 +537,12 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum
630
537
s .Warnf ("Detected consumer cluster node skew '%s > %s > %s'" , accName , streamName , consumer )
631
538
node .Delete ()
632
539
o .deleteWithoutAdvisory ()
633
- restartConsumer ()
634
- } else if node .State () == Closed {
635
- // We have a consumer, and it should have a running node but it is closed.
636
- o .stop ()
637
- restartConsumer ()
540
+
541
+ // When we try to restart we nil out the node and reprocess the consumer assignment.
542
+ js .mu .Lock ()
543
+ ca .Group .node = nil
544
+ js .mu .Unlock ()
545
+ js .processConsumerAssignment (ca )
638
546
}
639
547
}
640
548
return false
@@ -1340,7 +1248,10 @@ func (js *jetStream) monitorCluster() {
1340
1248
}
1341
1249
// For the meta layer we want to snapshot when asked if we need one or have any entries that we can compact.
1342
1250
if ne , _ := n .Size (); ne > 0 || n .NeedSnapshot () {
1343
- if err := n .InstallSnapshot (js .metaSnapshot ()); err == nil {
1251
+ snap , err := js .metaSnapshot ()
1252
+ if err != nil {
1253
+ s .Warnf ("Error generating JetStream cluster snapshot: %v" , err )
1254
+ } else if err = n .InstallSnapshot (snap ); err == nil {
1344
1255
lastSnapTime = time .Now ()
1345
1256
} else if err != errNoSnapAvailable && err != errNodeClosed {
1346
1257
s .Warnf ("Error snapshotting JetStream cluster state: %v" , err )
@@ -1534,7 +1445,7 @@ func (js *jetStream) clusterStreamConfig(accName, streamName string) (StreamConf
1534
1445
return StreamConfig {}, false
1535
1446
}
1536
1447
1537
- func (js * jetStream ) metaSnapshot () []byte {
1448
+ func (js * jetStream ) metaSnapshot () ( []byte , error ) {
1538
1449
start := time .Now ()
1539
1450
js .mu .RLock ()
1540
1451
s := js .srv
@@ -1574,16 +1485,22 @@ func (js *jetStream) metaSnapshot() []byte {
1574
1485
1575
1486
if len (streams ) == 0 {
1576
1487
js .mu .RUnlock ()
1577
- return nil
1488
+ return nil , nil
1578
1489
}
1579
1490
1580
1491
// Track how long it took to marshal the JSON
1581
1492
mstart := time .Now ()
1582
- b , _ := json .Marshal (streams )
1493
+ b , err := json .Marshal (streams )
1583
1494
mend := time .Since (mstart )
1584
1495
1585
1496
js .mu .RUnlock ()
1586
1497
1498
+ // Must not be possible for a JSON marshaling error to result
1499
+ // in an empty snapshot.
1500
+ if err != nil {
1501
+ return nil , err
1502
+ }
1503
+
1587
1504
// Track how long it took to compress the JSON
1588
1505
cstart := time .Now ()
1589
1506
snap := s2 .Encode (nil , b )
@@ -1593,7 +1510,7 @@ func (js *jetStream) metaSnapshot() []byte {
1593
1510
s .rateLimitFormatWarnf ("Metalayer snapshot took %.3fs (streams: %d, consumers: %d, marshal: %.3fs, s2: %.3fs, uncompressed: %d, compressed: %d)" ,
1594
1511
took .Seconds (), nsa , nca , mend .Seconds (), cend .Seconds (), len (b ), len (snap ))
1595
1512
}
1596
- return snap
1513
+ return snap , nil
1597
1514
}
1598
1515
1599
1516
func (js * jetStream ) applyMetaSnapshot (buf []byte , ru * recoveryUpdates , isRecovering bool ) error {
0 commit comments