Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-picks for 2.10.25-RC.2 #6371

Merged
merged 5 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/nats-io/nkeys v0.4.9
github.com/nats-io/nuid v1.0.1
go.uber.org/automaxprocs v1.6.0
golang.org/x/crypto v0.31.0
golang.org/x/crypto v0.32.0
golang.org/x/sys v0.29.0
golang.org/x/time v0.9.0
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMT
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
Expand Down
135 changes: 26 additions & 109 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,12 @@ func (s *Server) JetStreamSnapshotMeta() error {
return errNotLeader
}

return meta.InstallSnapshot(js.metaSnapshot())
snap, err := js.metaSnapshot()
if err != nil {
return err
}

return meta.InstallSnapshot(snap)
}

func (s *Server) JetStreamStepdownStream(account, stream string) error {
Expand Down Expand Up @@ -437,73 +442,6 @@ func (cc *jetStreamCluster) isStreamCurrent(account, stream string) bool {
return false
}

// Restart the stream in question.
// Should only be called when the stream is known to be in a bad state.
func (js *jetStream) restartStream(acc *Account, csa *streamAssignment) {
js.mu.Lock()
s, cc := js.srv, js.cluster
if cc == nil {
js.mu.Unlock()
return
}
// Need to lookup the one directly from the meta layer, what we get handed is a copy if coming from isStreamHealthy.
asa := cc.streams[acc.Name]
if asa == nil {
js.mu.Unlock()
return
}
sa := asa[csa.Config.Name]
if sa == nil {
js.mu.Unlock()
return
}
// Make sure to clear out the raft node if still present in the meta layer.
if rg := sa.Group; rg != nil && rg.node != nil {
if rg.node.State() != Closed {
rg.node.Stop()
}
rg.node = nil
}
sinceCreation := time.Since(sa.Created)
js.mu.Unlock()

// Process stream assignment to recreate.
// Check that we have given system enough time to start us up.
// This will be longer than obvious, and matches consumer logic in case system very busy.
if sinceCreation < 10*time.Second {
s.Debugf("Not restarting missing stream '%s > %s', too soon since creation %v",
acc, csa.Config.Name, sinceCreation)
return
}

js.processStreamAssignment(sa)

// If we had consumers assigned to this server they will be present in the copy, csa.
// They also need to be processed. The csa consumers is a copy of only our consumers,
// those assigned to us, but the consumer assignment's there are direct from the meta
// layer to make this part much easier and avoid excessive lookups.
for _, cca := range csa.consumers {
if cca.deleted {
continue
}
// Need to look up original as well here to make sure node is nil.
js.mu.Lock()
ca := sa.consumers[cca.Name]
if ca != nil && ca.Group != nil {
// Make sure the node is stopped if still running.
if node := ca.Group.node; node != nil && node.State() != Closed {
node.Stop()
}
// Make sure node is wiped.
ca.Group.node = nil
}
js.mu.Unlock()
if ca != nil {
js.processConsumerAssignment(ca)
}
}
}

// isStreamHealthy will determine if the stream is up to date or very close.
// For R1 it will make sure the stream is present on this server.
func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {
Expand All @@ -529,7 +467,6 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {
// First lookup stream and make sure its there.
mset, err := acc.lookupStream(streamName)
if err != nil {
js.restartStream(acc, sa)
return false
}

Expand All @@ -554,8 +491,6 @@ func (js *jetStream) isStreamHealthy(acc *Account, sa *streamAssignment) bool {
s.Warnf("Detected stream cluster node skew '%s > %s'", acc.GetName(), streamName)
node.Delete()
mset.resetClusteredState(nil)
} else if node.State() == Closed {
js.restartStream(acc, sa)
}
}
return false
Expand Down Expand Up @@ -585,37 +520,9 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum
node := ca.Group.node
js.mu.RUnlock()

// When we try to restart we nil out the node if applicable
// and reprocess the consumer assignment.
restartConsumer := func() {
mset.mu.RLock()
accName, streamName := mset.acc.GetName(), mset.cfg.Name
mset.mu.RUnlock()

js.mu.Lock()
deleted := ca.deleted
// Check that we have not just been created.
if !deleted && time.Since(ca.Created) < 10*time.Second {
s.Debugf("Not restarting missing consumer '%s > %s > %s', too soon since creation %v",
accName, streamName, consumer, time.Since(ca.Created))
js.mu.Unlock()
return
}
// Make sure the node is stopped if still running.
if node != nil && node.State() != Closed {
node.Stop()
}
ca.Group.node = nil
js.mu.Unlock()
if !deleted {
js.processConsumerAssignment(ca)
}
}

// Check if not running at all.
o := mset.lookupConsumer(consumer)
if o == nil {
restartConsumer()
return false
}

Expand All @@ -630,11 +537,12 @@ func (js *jetStream) isConsumerHealthy(mset *stream, consumer string, ca *consum
s.Warnf("Detected consumer cluster node skew '%s > %s > %s'", accName, streamName, consumer)
node.Delete()
o.deleteWithoutAdvisory()
restartConsumer()
} else if node.State() == Closed {
// We have a consumer, and it should have a running node but it is closed.
o.stop()
restartConsumer()

// When we try to restart we nil out the node and reprocess the consumer assignment.
js.mu.Lock()
ca.Group.node = nil
js.mu.Unlock()
js.processConsumerAssignment(ca)
}
}
return false
Expand Down Expand Up @@ -1340,7 +1248,10 @@ func (js *jetStream) monitorCluster() {
}
// For the meta layer we want to snapshot when asked if we need one or have any entries that we can compact.
if ne, _ := n.Size(); ne > 0 || n.NeedSnapshot() {
if err := n.InstallSnapshot(js.metaSnapshot()); err == nil {
snap, err := js.metaSnapshot()
if err != nil {
s.Warnf("Error generating JetStream cluster snapshot: %v", err)
} else if err = n.InstallSnapshot(snap); err == nil {
lastSnapTime = time.Now()
} else if err != errNoSnapAvailable && err != errNodeClosed {
s.Warnf("Error snapshotting JetStream cluster state: %v", err)
Expand Down Expand Up @@ -1534,7 +1445,7 @@ func (js *jetStream) clusterStreamConfig(accName, streamName string) (StreamConf
return StreamConfig{}, false
}

func (js *jetStream) metaSnapshot() []byte {
func (js *jetStream) metaSnapshot() ([]byte, error) {
start := time.Now()
js.mu.RLock()
s := js.srv
Expand Down Expand Up @@ -1574,16 +1485,22 @@ func (js *jetStream) metaSnapshot() []byte {

if len(streams) == 0 {
js.mu.RUnlock()
return nil
return nil, nil
}

// Track how long it took to marshal the JSON
mstart := time.Now()
b, _ := json.Marshal(streams)
b, err := json.Marshal(streams)
mend := time.Since(mstart)

js.mu.RUnlock()

// Must not be possible for a JSON marshaling error to result
// in an empty snapshot.
if err != nil {
return nil, err
}

// Track how long it took to compress the JSON
cstart := time.Now()
snap := s2.Encode(nil, b)
Expand All @@ -1593,7 +1510,7 @@ func (js *jetStream) metaSnapshot() []byte {
s.rateLimitFormatWarnf("Metalayer snapshot took %.3fs (streams: %d, consumers: %d, marshal: %.3fs, s2: %.3fs, uncompressed: %d, compressed: %d)",
took.Seconds(), nsa, nca, mend.Seconds(), cend.Seconds(), len(b), len(snap))
}
return snap
return snap, nil
}

func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecovering bool) error {
Expand Down
Loading
Loading