Skip to content

Commit

Permalink
De-flake setup of mem WQ restart test (#6278)
Browse files Browse the repository at this point in the history
`TestNoRaceJetStreamClusterMemoryWorkQueueLastSequenceResetAfterRestart`
sometimes fails with a `nats: timeout`, but that's only in the setup
code. That setup code is made to run in parallel to have the test run a
lot faster, but it can sometimes result in timeouts on CI.

Allow for some retries in that section since it's only for setup. And if
the setup fails the asserts after will not be able to work anyway.

Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
  • Loading branch information
derekcollison authored Dec 18, 2024
2 parents dc8b344 + f70721d commit e5ccb2d
Showing 1 changed file with 31 additions and 16 deletions.
47 changes: 31 additions & 16 deletions server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10729,27 +10729,42 @@ func TestNoRaceJetStreamClusterMemoryWorkQueueLastSequenceResetAfterRestart(t *t
nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: fmt.Sprintf("TEST:%d", n),
Storage: nats.MemoryStorage,
Retention: nats.WorkQueuePolicy,
Subjects: []string{fmt.Sprintf("foo.%d.*", n)},
Replicas: 3,
}, nats.MaxWait(30*time.Second))
require_NoError(t, err)
checkFor(t, 5*time.Second, time.Second, func() error {
_, err := js.AddStream(&nats.StreamConfig{
Name: fmt.Sprintf("TEST:%d", n),
Storage: nats.MemoryStorage,
Retention: nats.WorkQueuePolicy,
Subjects: []string{fmt.Sprintf("foo.%d.*", n)},
Replicas: 3,
}, nats.MaxWait(time.Second))
return err
})

subj := fmt.Sprintf("foo.%d.bar", n)
for i := 0; i < 22; i++ {
js.Publish(subj, nil)
checkFor(t, 5*time.Second, time.Second, func() error {
_, err := js.Publish(subj, nil)
return err
})
}
// Now consumer them all as well.
sub, err := js.PullSubscribe(subj, "wq")
require_NoError(t, err)
msgs, err := sub.Fetch(22, nats.MaxWait(20*time.Second))
require_NoError(t, err)
// Now consume them all as well.
var err error
var sub *nats.Subscription
checkFor(t, 5*time.Second, time.Second, func() error {
sub, err = js.PullSubscribe(subj, "wq")
return err
})

var msgs []*nats.Msg
checkFor(t, 5*time.Second, time.Second, func() error {
msgs, err = sub.Fetch(22, nats.MaxWait(time.Second))
return err
})
require_Equal(t, len(msgs), 22)
for _, m := range msgs {
err := m.AckSync()
require_NoError(t, err)
checkFor(t, 5*time.Second, time.Second, func() error {
return m.AckSync()
})
}
}(i)
}
Expand Down

0 comments on commit e5ccb2d

Please sign in to comment.