From 8815072e34c4e0f28317142911c4caaa88be23df Mon Sep 17 00:00:00 2001 From: Derek Collison Date: Sun, 30 Jan 2022 14:54:24 -0800 Subject: [PATCH] Fix flapping test Signed-off-by: Derek Collison --- server/filestore.go | 5 ++--- server/norace_test.go | 11 +++++++++-- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/server/filestore.go b/server/filestore.go index 5a22ccfaee5..0bd8f688f18 100644 --- a/server/filestore.go +++ b/server/filestore.go @@ -4900,9 +4900,8 @@ func (o *consumerFileStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64) err if p = o.state.Pending[sseq]; p != nil { p.Sequence, p.Timestamp = dseq, ts } - } - // Add to pending if needed. - if p == nil { + } else { + // Add to pending. o.state.Pending[sseq] = &Pending{dseq, ts} } // Update delivered as needed. diff --git a/server/norace_test.go b/server/norace_test.go index 39552affc07..ad59afb555d 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -3332,7 +3332,8 @@ func TestNoRaceJetStreamClusterCorruptWAL(t *testing.T) { } } // Make sure acks processed. - nc.Flush() + time.Sleep(200 * time.Millisecond) + nc.Close() // Check consumer consistency. checkConsumerWith := func(delivered, ackFloor uint64, ackPending int) { @@ -3383,6 +3384,7 @@ func TestNoRaceJetStreamClusterCorruptWAL(t *testing.T) { node := o.raftNode().(*raft) fs := node.wal.(*fileStore) fcfg, cfg := fs.fcfg, fs.cfg.StreamConfig + // Stop all the servers. c.stopAll() // Manipulate directly with cluster down. @@ -3451,8 +3453,13 @@ func TestNoRaceJetStreamClusterCorruptWAL(t *testing.T) { state = fs.State() fs.Truncate(state.FirstSeq) + sub, err = js.PullSubscribe("foo", "dlc") + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + // This will cause us to stepdown and truncate our WAL. - fetchMsgs(t, sub, 100, 50*time.Millisecond) + fetchMsgs(t, sub, 100, 5*time.Second) checkFor(t, 20*time.Second, 500*time.Millisecond, func() error { // Make sure we changed leaders.