From f1e52bd8974b54e80c1965fa4c1a1ac0c98ec215 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Thu, 9 Jan 2025 16:00:04 +0000 Subject: [PATCH] Add `TestJetStreamClusterRoutedAPIRecoverPerformance` Signed-off-by: Neil Twigg --- server/jetstream_cluster_4_test.go | 66 ++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 2e33ccfa1a..ce97f7cd5a 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -5265,3 +5265,69 @@ func TestJetStreamClusterOnlyPublishAdvisoriesWhenInterest(t *testing.T) { // it should succeed. require_True(t, s1.publishAdvisory(s1.GlobalAccount(), subj, "test")) } + +func TestJetStreamClusterRoutedAPIRecoverPerformance(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, _ := jsClientConnect(t, c.randomNonLeader()) + defer nc.Close() + + // We only run 16 JetStream API workers. + mp := runtime.GOMAXPROCS(0) + if mp > 16 { + mp = 16 + } + + leader := c.leader() + ljs := leader.js.Load() + + // Take the JS lock, which allows the JS API queue to build up. + ljs.mu.Lock() + defer ljs.mu.Unlock() + + count := JSDefaultRequestQueueLimit - 1 + ch := make(chan *nats.Msg, count) + + inbox := nc.NewRespInbox() + _, err := nc.ChanSubscribe(inbox, ch) + require_NoError(t, err) + + // To ensure a fair starting line, we need to submit as many tasks as + // there are JS workers whilst holding the JS lock. This will ensure that + // each JS API worker is properly wedged. + msg := &nats.Msg{ + Subject: fmt.Sprintf(JSApiConsumerInfoT, "Doesnt", "Exist"), + Reply: "no_one_here", + } + for i := 0; i < mp; i++ { + require_NoError(t, nc.PublishMsg(msg)) + } + + // Then we want to submit a fixed number of tasks, big enough to fill + // the queue, so that we can measure them. + msg = &nats.Msg{ + Subject: fmt.Sprintf(JSApiConsumerInfoT, "Doesnt", "Exist"), + Reply: inbox, + } + for i := 0; i < count; i++ { + require_NoError(t, nc.PublishMsg(msg)) + } + checkFor(t, 5*time.Second, 25*time.Millisecond, func() error { + if queued := leader.jsAPIRoutedReqs.len(); queued != count { + return fmt.Errorf("expected %d queued requests, got %d", count, queued) + } + return nil + }) + + // Now we're going to release the lock and start timing. The workers + // will now race to clear the queues and we'll wait to see how long + // it takes for them all to respond. + start := time.Now() + ljs.mu.Unlock() + for i := 0; i < count; i++ { + <-ch + } + ljs.mu.Lock() + t.Logf("Took %s to clear %d items", time.Since(start), count) +}