Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-picks for 2.10.23-RC.5 #6171

Merged
merged 36 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
785321b
NRG: Ignore AEs from older terms
neilalexander Jul 16, 2024
ddb3ad3
NRG: Ensure proposal and AE response queues drain after stepdown
neilalexander Jul 17, 2024
6f3d87f
NRG: Fix term handling in candidate state and use higher term from vo…
neilalexander Jul 18, 2024
e52d3e1
NRG: Remove stepdown channel, handle inline
neilalexander Jan 24, 2024
937e786
NRG: Don't revert `term` to `pterm` on AE mismatch
neilalexander Jul 22, 2024
4b725df
NRG: Don't reset WAL on append entry response
neilalexander Jul 23, 2024
9e1d1c3
NRG: De-flake `TestNRGSwitchStateClearsQueues`
neilalexander Jul 23, 2024
d6f1f9c
De-flake `TestNRGSwitchStateClearsQueues`
neilalexander Jul 26, 2024
541e5b1
NRG: Send AE response when term is lower than ours
neilalexander Jul 29, 2024
86b0cae
Fix candidate stepdown logic
ReubenMathew Jul 29, 2024
85ad22d
De-flake `TestNRGSwitchStateClearsQueues`
neilalexander Aug 8, 2024
1929e30
NRG: Don't revert `pterm` to beginning of log when installing snapshots
neilalexander Aug 15, 2024
85236e2
Fixed deadlock when removing a peer that happened to be the leader.
derekcollison Sep 22, 2024
947b7c5
Fix drift in WAL, truncate AppendEntry without quorum
MauriceVanVeen Oct 3, 2024
7a6affa
(2.11) Don't send meta snapshot when becoming metaleader (#5700)
neilalexander Oct 3, 2024
71e9e87
NRG: Do not revert term on truncate WAL
neilalexander Oct 8, 2024
f55f34e
Fix data race in `TestNRGTermDoesntRollBackToPtermOnCatchup`
neilalexander Oct 11, 2024
3025778
NRG (2.11): Start catchup from `n.commit` & fix AppendEntry is stored…
MauriceVanVeen Oct 15, 2024
7ec99f3
NRG: Revert implementation from #5987
MauriceVanVeen Oct 22, 2024
a5b7a4f
NRG: Correct pterm if mismatched & don't truncate what was already co…
MauriceVanVeen Oct 22, 2024
2dfb1d4
NRG: Add tests for correcting pterm with committed entries
MauriceVanVeen Oct 22, 2024
6e36c52
[FIXED] Catchup must not extend past requested sequence range
MauriceVanVeen Oct 24, 2024
aa643dd
[FIXED] Don't replace leader's snapshot during shutdown
MauriceVanVeen Oct 29, 2024
2681ce0
NRG: Always write term/vote and peer state synchronously
neilalexander Aug 29, 2024
ff22481
[FIXED] Don't remove snapshot if truncate to applied
MauriceVanVeen Oct 29, 2024
e40112e
NRG: Don't switch to candidate when waiting for pending applies
MauriceVanVeen Oct 30, 2024
8a4f379
NRG: Do not set `pterm`/`pindex` if no snapshots and/or log entries
neilalexander Oct 31, 2024
fa5128d
NRG: Don't delete RAFT state if stream/consumer creation failed durin…
MauriceVanVeen Oct 31, 2024
a3b9178
NRG: Vote request cancels catchup, new leader could have rejected sub…
MauriceVanVeen Nov 3, 2024
a64c5bd
NRG: Wait for goroutines to shutdown when recreating group
neilalexander Aug 26, 2024
cc204b0
NRG: Update group peers if mismatched
neilalexander Oct 1, 2024
a838553
NRG: Refactor shutdown, update `switchState` to CAS
neilalexander Nov 5, 2024
9266be8
NRG: Use correct sequence when truncating to previous pterm/pindex
MauriceVanVeen Nov 4, 2024
9e4c421
De-flake `TestNRGCandidateDontStepdownDueToLeaderOfPreviousTerm`
neilalexander Nov 11, 2024
7c0bec1
De-flake `TestNRGSimpleElection`
neilalexander Nov 14, 2024
4606175
NRG: Don't delete RAFT state if stream/consumer creation failed durin…
MauriceVanVeen Oct 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1563,6 +1563,16 @@ func (o *consumer) updateDeliveryInterest(localInterest bool) bool {
return false
}

const (
defaultConsumerNotActiveStartInterval = 30 * time.Second
defaultConsumerNotActiveMaxInterval = 5 * time.Minute
)

var (
consumerNotActiveStartInterval = defaultConsumerNotActiveStartInterval
consumerNotActiveMaxInterval = defaultConsumerNotActiveMaxInterval
)

func (o *consumer) deleteNotActive() {
o.mu.Lock()
if o.mset == nil {
Expand Down Expand Up @@ -1628,12 +1638,8 @@ func (o *consumer) deleteNotActive() {
// Check to make sure we went away.
// Don't think this needs to be a monitored go routine.
go func() {
const (
startInterval = 30 * time.Second
maxInterval = 5 * time.Minute
)
jitter := time.Duration(rand.Int63n(int64(startInterval)))
interval := startInterval + jitter
jitter := time.Duration(rand.Int63n(int64(consumerNotActiveStartInterval)))
interval := consumerNotActiveStartInterval + jitter
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
Expand All @@ -1648,7 +1654,7 @@ func (o *consumer) deleteNotActive() {
if nca != nil && nca == ca {
s.Warnf("Consumer assignment for '%s > %s > %s' not cleaned up, retrying", acc, stream, name)
meta.ForwardProposal(removeEntry)
if interval < maxInterval {
if interval < consumerNotActiveMaxInterval {
interval *= 2
ticker.Reset(interval)
}
Expand Down
16 changes: 12 additions & 4 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -10023,14 +10023,22 @@ func (alg StoreCompression) Decompress(buf []byte) ([]byte, error) {
// sets O_SYNC on the open file if SyncAlways is set. The dios semaphore is
// handled automatically by this function, so don't wrap calls to it in dios.
func (fs *fileStore) writeFileWithOptionalSync(name string, data []byte, perm fs.FileMode) error {
if fs.fcfg.SyncAlways {
return writeFileWithSync(name, data, perm)
}
<-dios
defer func() {
dios <- struct{}{}
}()
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
if fs.fcfg.SyncAlways {
flags |= os.O_SYNC
}
return os.WriteFile(name, data, perm)
}

func writeFileWithSync(name string, data []byte, perm fs.FileMode) error {
<-dios
defer func() {
dios <- struct{}{}
}()
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC | os.O_SYNC
f, err := os.OpenFile(name, flags, perm)
if err != nil {
return err
Expand Down
62 changes: 28 additions & 34 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1431,10 +1431,6 @@ func (js *jetStream) monitorCluster() {
aq.recycle(&ces)

case isLeader = <-lch:
// For meta layer synchronize everyone to our state on becoming leader.
if isLeader && n.ApplyQ().len() == 0 {
n.SendSnapshot(js.metaSnapshot())
}
// Process the change.
js.processLeaderChange(isLeader)
if isLeader {
Expand Down Expand Up @@ -2129,8 +2125,32 @@ func (js *jetStream) createRaftGroup(accName string, rg *raftGroup, storage Stor
}

// Check if we already have this assigned.
retry:
if node := s.lookupRaftNode(rg.Name); node != nil {
if node.State() == Closed {
// We're waiting for this node to finish shutting down before we replace it.
js.mu.Unlock()
node.WaitForStop()
js.mu.Lock()
goto retry
}
s.Debugf("JetStream cluster already has raft group %q assigned", rg.Name)
// Check and see if the group has the same peers. If not then we
// will update the known peers, which will send a peerstate if leader.
groupPeerIDs := append([]string{}, rg.Peers...)
var samePeers bool
if nodePeers := node.Peers(); len(rg.Peers) == len(nodePeers) {
nodePeerIDs := make([]string, 0, len(nodePeers))
for _, n := range nodePeers {
nodePeerIDs = append(nodePeerIDs, n.ID)
}
slices.Sort(groupPeerIDs)
slices.Sort(nodePeerIDs)
samePeers = slices.Equal(groupPeerIDs, nodePeerIDs)
}
if !samePeers {
node.UpdateKnownPeers(groupPeerIDs)
}
rg.node = node
js.mu.Unlock()
return nil
Expand Down Expand Up @@ -8959,17 +8979,6 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
// mset.store never changes after being set, don't need lock.
mset.store.FastState(&state)

// Reset notion of first if this request wants sequences before our starting sequence
// and we would have nothing to send. If we have partial messages still need to send skips for those.
// We will keep sreq's first sequence to not create sequence mismatches on the follower, but we extend the last to our current state.
if sreq.FirstSeq < state.FirstSeq && state.FirstSeq > sreq.LastSeq {
s.Debugf("Catchup for stream '%s > %s' resetting request first sequence from %d to %d",
mset.account(), mset.name(), sreq.FirstSeq, state.FirstSeq)
if state.LastSeq > sreq.LastSeq {
sreq.LastSeq = state.LastSeq
}
}

// Setup sequences to walk through.
seq, last := sreq.FirstSeq, sreq.LastSeq
mset.setCatchupPeer(sreq.Peer, last-seq)
Expand Down Expand Up @@ -9133,25 +9142,10 @@ func (mset *stream) runCatchup(sendSubject string, sreq *streamSyncRequest) {
if drOk && dr.First > 0 {
sendDR()
}
// Check for a condition where our state's first is now past the last that we could have sent.
// If so reset last and continue sending.
var state StreamState
mset.mu.RLock()
mset.store.FastState(&state)
mset.mu.RUnlock()
if last < state.FirstSeq {
last = state.LastSeq
}
// Recheck our exit condition.
if seq == last {
if drOk && dr.First > 0 {
sendDR()
}
s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name())
// EOF
s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil)
return false
}
s.Noticef("Catchup for stream '%s > %s' complete", mset.account(), mset.name())
// EOF
s.sendInternalMsgLocked(sendSubject, _EMPTY_, nil, nil)
return false
}
select {
case <-remoteQuitCh:
Expand Down
20 changes: 16 additions & 4 deletions server/jetstream_cluster_2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6647,12 +6647,24 @@ func TestJetStreamClusterSnapshotBeforePurgeAndCatchup(t *testing.T) {
return nil
})

// Make sure we only sent 1002 sync catchup msgs.
// This is for the new messages, the delete range, and the EOF.
// Make sure we only sent 2 sync catchup msgs.
// This is for the delete range, and the EOF.
nmsgs, _, _ := sub.Pending()
if nmsgs != 1002 {
t.Fatalf("Expected only 1002 sync catchup msgs to be sent signaling eof, but got %d", nmsgs)
if nmsgs != 2 {
t.Fatalf("Expected only 2 sync catchup msgs to be sent signaling eof, but got %d", nmsgs)
}

msg, err := sub.NextMsg(0)
require_NoError(t, err)
mbuf := msg.Data[1:]
dr, err := decodeDeleteRange(mbuf)
require_NoError(t, err)
require_Equal(t, dr.First, 1001)
require_Equal(t, dr.Num, 1000)

msg, err = sub.NextMsg(0)
require_NoError(t, err)
require_Equal(t, len(msg.Data), 0)
}

func TestJetStreamClusterStreamResetWithLargeFirstSeq(t *testing.T) {
Expand Down
14 changes: 12 additions & 2 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,11 @@ func TestJetStreamClusterParallelConsumerCreation(t *testing.T) {
}

func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) {
consumerNotActiveStartInterval = time.Second * 5
defer func() {
consumerNotActiveStartInterval = defaultConsumerNotActiveStartInterval
}()

c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

Expand Down Expand Up @@ -1632,6 +1637,7 @@ func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) {
time.Sleep(2 * time.Second)

// Restart first and wait so that we know it will try cleanup without a metaleader.
// It will fail as there's no metaleader at that time, it should keep retrying on an interval.
c.restartServer(rs)
time.Sleep(time.Second)

Expand All @@ -1643,8 +1649,9 @@ func TestJetStreamClusterGhostEphemeralsAfterRestart(t *testing.T) {
defer nc.Close()

subj := fmt.Sprintf(JSApiConsumerListT, "TEST")
checkFor(t, 10*time.Second, 200*time.Millisecond, func() error {
m, err := nc.Request(subj, nil, time.Second)
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
// Request will take at most 4 seconds if some consumers can't be found.
m, err := nc.Request(subj, nil, 5*time.Second)
if err != nil {
return err
}
Expand Down Expand Up @@ -3910,6 +3917,7 @@ func TestJetStreamClusterStreamNodeShutdownBugOnStop(t *testing.T) {
node.InstallSnapshot(mset.stateSnapshot())
// Stop the stream
mset.stop(false, false)
node.WaitForStop()

if numNodes := s.numRaftNodes(); numNodes != numNodesStart-1 {
t.Fatalf("RAFT nodes after stream stop incorrect: %d vs %d", numNodesStart, numNodes)
Expand Down Expand Up @@ -5801,6 +5809,8 @@ func TestJetStreamClusterDetectOrphanNRGs(t *testing.T) {

// Should only be meta NRG left.
require_True(t, s.numRaftNodes() == 1)
s.rnMu.RLock()
defer s.rnMu.RUnlock()
require_True(t, s.lookupRaftNode(sgn) == nil)
require_True(t, s.lookupRaftNode(ogn) == nil)
}
Expand Down
Loading