Skip to content

Commit

Permalink
De-flake tests (#5765)
Browse files Browse the repository at this point in the history
This PR tries to deflake some unit tests.

Signed-off-by: Neil Twigg <neil@nats.io>
  • Loading branch information
derekcollison authored Aug 8, 2024
2 parents d471c10 + 6651c2e commit e9ee2a0
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 40 deletions.
2 changes: 2 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6080,6 +6080,7 @@ func TestJetStreamClusterConsumerAckSyncReporting(t *testing.T) {

// Now ack the skipped message
skipped.AckSync()
c.waitOnAllCurrent()
for _, s := range c.servers {
jsz, err := s.Jsz(opts)
require_NoError(t, err)
Expand All @@ -6090,6 +6091,7 @@ func TestJetStreamClusterConsumerAckSyncReporting(t *testing.T) {

// Now ack the last message
last.AckSync()
c.waitOnAllCurrent()
for _, s := range c.servers {
jsz, err := s.Jsz(opts)
require_NoError(t, err)
Expand Down
15 changes: 4 additions & 11 deletions server/jetstream_cluster_2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2660,22 +2660,15 @@ func TestJetStreamClusterStreamCatchupNoState(t *testing.T) {
nc.Close()
c.stopAll()
// Remove all state by truncating for the non-leader.
for _, fn := range []string{"1.blk", "1.idx", "1.fss"} {
fname := filepath.Join(config.StoreDir, "$G", "streams", "TEST", "msgs", fn)
fd, err := os.OpenFile(fname, os.O_RDWR, defaultFilePerms)
if err != nil {
continue
}
fd.Truncate(0)
fd.Close()
}
// For both make sure we have no raft snapshots.
snapDir := filepath.Join(lconfig.StoreDir, "$SYS", "_js_", gname, "snapshots")
os.RemoveAll(snapDir)
require_NoError(t, os.RemoveAll(snapDir))
msgsDir := filepath.Join(lconfig.StoreDir, "$SYS", "_js_", gname, "msgs")
require_NoError(t, os.RemoveAll(msgsDir))
// Remove all our raft state, we do not want to hold onto our term and index which
// results in a coin toss for who becomes the leader.
raftDir := filepath.Join(config.StoreDir, "$SYS", "_js_", gname)
os.RemoveAll(raftDir)
require_NoError(t, os.RemoveAll(raftDir))

// Now restart.
c.restartAll()
Expand Down
1 change: 1 addition & 0 deletions server/jetstream_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1484,6 +1484,7 @@ func (c *cluster) waitOnAccount(account string) {
func (c *cluster) waitOnClusterReady() {
c.t.Helper()
c.waitOnClusterReadyWithNumPeers(len(c.servers))
c.waitOnLeader()
}

func (c *cluster) waitOnClusterReadyWithNumPeers(numPeersExpected int) {
Expand Down
18 changes: 4 additions & 14 deletions server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10620,13 +10620,8 @@ func TestNoRaceJetStreamClusterMemoryStreamLastSequenceResetAfterRestart(t *test
s.Shutdown()
s.WaitForShutdown()
s = c.restartServer(s)
checkFor(t, 30*time.Second, time.Second, func() error {
hs := s.healthz(nil)
if hs.Error != _EMPTY_ {
return errors.New(hs.Error)
}
return nil
})
c.waitOnServerHealthz(s)
c.waitOnAllCurrent()
// Make sure all streams are current after healthz returns ok.
for i := 1; i <= numStreams; i++ {
stream := fmt.Sprintf("TEST:%d", i)
Expand Down Expand Up @@ -10693,13 +10688,8 @@ func TestNoRaceJetStreamClusterMemoryWorkQueueLastSequenceResetAfterRestart(t *t
s.Shutdown()
s.WaitForShutdown()
s = c.restartServer(s)
checkFor(t, 30*time.Second, time.Second, func() error {
hs := s.healthz(nil)
if hs.Error != _EMPTY_ {
return errors.New(hs.Error)
}
return nil
})
c.waitOnServerHealthz(s)
c.waitOnAllCurrent()
// Make sure all streams are current after healthz returns ok.
for i := 1; i <= numStreams; i++ {
stream := fmt.Sprintf("TEST:%d", i)
Expand Down
21 changes: 6 additions & 15 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,23 +360,14 @@ func TestNRGSimpleElection(t *testing.T) {
func TestNRGSwitchStateClearsQueues(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()
s := c.servers[0] // RunBasicJetStreamServer not available

rg := c.createMemRaftGroup("TEST", 3, newStateAdder)
rg.waitOnLeader()

// Ensure there are no other nodes running that could
// send something into our IP queues or it may break the
// below assertions.
for _, n := range rg {
if !n.node().Leader() {
n.stop()
}
n := &raft{
prop: newIPQueue[*Entry](s, "prop"),
resp: newIPQueue[*appendEntryResponse](s, "resp"),
leadc: make(chan bool, 1), // for switchState
}

rg.lockAll()
defer rg.unlockAll()

n := rg.leader().node().(*raft)
n.state.Store(int32(Leader))
require_Equal(t, n.prop.len(), 0)
require_Equal(t, n.resp.len(), 0)

Expand Down

0 comments on commit e9ee2a0

Please sign in to comment.