Skip to content

Commit

Permalink
Backport #4592 to 2.9 (#4651)
Browse files Browse the repository at this point in the history
Backport #4592 to 2.9.23

---------

Signed-off-by: Jean-Noël Moyne <jnmoyne@gmail.com>
  • Loading branch information
jnmoyne authored Oct 11, 2023
1 parent 6a73e68 commit 05fe77f
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 20 deletions.
2 changes: 2 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16773,6 +16773,8 @@ func TestJetStreamWorkQueueSourceRestart(t *testing.T) {
sub, err := js.PullSubscribe("foo", "dur", nats.BindStream("TEST"))
require_NoError(t, err)

time.Sleep(100 * time.Millisecond)

ci, err := js.ConsumerInfo("TEST", "dur")
require_NoError(t, err)
require_True(t, ci.NumPending == uint64(sent))
Expand Down
3 changes: 1 addition & 2 deletions server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1873,7 +1873,6 @@ func TestNoRaceJetStreamSuperClusterSources(t *testing.T) {
msg := fmt.Sprintf("R-MSG-%d", i+1)
for _, sname := range []string{"foo", "bar", "baz"} {
m := nats.NewMsg(sname)
m.Header.Set(nats.MsgIdHdr, sname+"-"+msg)
m.Data = []byte(msg)
if _, err := js.PublishMsg(m); err != nil {
t.Errorf("Unexpected publish error: %v", err)
Expand All @@ -1890,7 +1889,7 @@ func TestNoRaceJetStreamSuperClusterSources(t *testing.T) {
sc.clusterForName("C3").waitOnStreamLeader("$G", "MS2")
<-doneCh

checkFor(t, 15*time.Second, 100*time.Millisecond, func() error {
checkFor(t, 15*time.Second, time.Second, func() error {
si, err := js2.StreamInfo("MS2")
if err != nil {
return err
Expand Down
62 changes: 44 additions & 18 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ type stream struct {
mirror *sourceInfo

// Sources
sources map[string]*sourceInfo
sources map[string]*sourceInfo
sourcesConsumerSetup *time.Timer

// Indicates we have direct consumers.
directs int
Expand Down Expand Up @@ -681,6 +682,11 @@ func (mset *stream) setLeader(isLeader bool) error {
return err
}
} else {
// cancel timer to create the source consumers if not fired yet
if mset.sourcesConsumerSetup != nil {
mset.sourcesConsumerSetup.Stop()
mset.sourcesConsumerSetup = nil
}
// Stop responding to sync requests.
mset.stopClusterSubs()
// Unsubscribe from direct stream.
Expand Down Expand Up @@ -2211,7 +2217,7 @@ func (mset *stream) scheduleSetupMirrorConsumerRetryAsap() {
}
// To make *sure* that the next request will not fail, add a bit of buffer
// and some randomness.
next += time.Duration(rand.Intn(50)) + 10*time.Millisecond
next += time.Duration(rand.Intn(int(10*time.Millisecond))) + 10*time.Millisecond
time.AfterFunc(next, func() {
mset.mu.Lock()
mset.setupMirrorConsumer()
Expand Down Expand Up @@ -2530,7 +2536,7 @@ func (mset *stream) scheduleSetSourceConsumerRetryAsap(si *sourceInfo, seq uint6
}
// To make *sure* that the next request will not fail, add a bit of buffer
// and some randomness.
next += time.Duration(rand.Intn(50)) + 10*time.Millisecond
next += time.Duration(rand.Intn(int(10*time.Millisecond))) + 10*time.Millisecond
mset.scheduleSetSourceConsumerRetry(si.iname, seq, next, startTime)
}

Expand Down Expand Up @@ -3025,15 +3031,8 @@ func (mset *stream) setStartingSequenceForSource(sname string) {
}

// Lock should be held.
// This will do a reverse scan on startup or leader election
// searching for the starting sequence number.
// This can be slow in degenerative cases.
// Lock should be held.
func (mset *stream) startingSequenceForSources() {
if len(mset.cfg.Sources) == 0 {
return
}
// Always reset here.
// Resets the SourceInfo for all the sources
func (mset *stream) resetSourceInfo() {
mset.sources = make(map[string]*sourceInfo)

for _, ssi := range mset.cfg.Sources {
Expand All @@ -3043,6 +3042,20 @@ func (mset *stream) startingSequenceForSources() {
si := &sourceInfo{name: ssi.Name, iname: ssi.iname}
mset.sources[ssi.iname] = si
}
}

// Lock should be held.
// This will do a reverse scan on startup or leader election
// searching for the starting sequence number.
// This can be slow in degenerative cases.
// Lock should be held.
func (mset *stream) startingSequenceForSources() {
if len(mset.cfg.Sources) == 0 {
return
}

// Always reset here.
mset.resetSourceInfo()

var state StreamState
mset.store.FastState(&state)
Expand Down Expand Up @@ -3113,6 +3126,11 @@ func (mset *stream) setupSourceConsumers() error {
}
}

// If we are no longer the leader, give up
if !mset.isLeader() {
return nil
}

mset.startingSequenceForSources()

// Setup our consumers at the proper starting position.
Expand All @@ -3138,13 +3156,21 @@ func (mset *stream) subscribeToStream() error {
}
// Check if we need to setup mirroring.
if mset.cfg.Mirror != nil {
if err := mset.setupMirrorConsumer(); err != nil {
return err
}
// setup the initial mirror sourceInfo
mset.mirror = &sourceInfo{name: mset.cfg.Mirror.Name}

// delay the actual mirror consumer creation for after a delay
mset.scheduleSetupMirrorConsumerRetryAsap()
} else if len(mset.cfg.Sources) > 0 {
if err := mset.setupSourceConsumers(); err != nil {
return err
}
// Setup the initial source infos for the sources
mset.resetSourceInfo()
// Delay the actual source consumer(s) creation(s) for after a delay

mset.sourcesConsumerSetup = time.AfterFunc(time.Duration(rand.Intn(int(10*time.Millisecond)))+10*time.Millisecond, func() {
mset.mu.Lock()
mset.setupSourceConsumers()
mset.mu.Unlock()
})
}
// Check for direct get access.
// We spin up followers for clustered streams in monitorStream().
Expand Down

0 comments on commit 05fe77f

Please sign in to comment.