Skip to content

Commit 74f005d

Browse files
Fix JS API in-flight metric (#6373)
After a drain this would have been misreporting, as we did not remove drained entries from the `apiInflight` count. Signed-off-by: Neil Twigg <neil@nats.io>
2 parents 587d910 + 296edb7 commit 74f005d

File tree

3 files changed

+9
-5
lines changed

3 files changed

+9
-5
lines changed

server/ipqueue.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -246,14 +246,16 @@ func (q *ipQueue[T]) size() uint64 {
246246
}
247247

248248
// Empty the queue and consumes the notification signal if present.
249+
// Returns the number of items that were drained from the queue.
249250
// Note that this could cause a reader go routine that has been
250251
// notified that there is something in the queue (reading from queue's `ch`)
251252
// may then get nothing if `drain()` is invoked before the `pop()` or `popOne()`.
252-
func (q *ipQueue[T]) drain() {
253+
func (q *ipQueue[T]) drain() int {
253254
if q == nil {
254-
return
255+
return 0
255256
}
256257
q.Lock()
258+
olen := len(q.elts) - q.pos
257259
q.elts, q.pos, q.sz = nil, 0, 0
258260
// Consume the signal if it was present to reduce the chance of a reader
259261
// routine to be think that there is something in the queue...
@@ -262,6 +264,7 @@ func (q *ipQueue[T]) drain() {
262264
default:
263265
}
264266
q.Unlock()
267+
return olen
265268
}
266269

267270
// Since the length of the queue goes to 0 after a pop(), it is good to

server/jetstream_api.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -894,7 +894,8 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
894894
limit := atomic.LoadInt64(&js.queueLimit)
895895
if pending >= int(limit) {
896896
s.rateLimitFormatWarnf("JetStream API queue limit reached, dropping %d requests", pending)
897-
s.jsAPIRoutedReqs.drain()
897+
drained := int64(s.jsAPIRoutedReqs.drain())
898+
atomic.AddInt64(&js.apiInflight, -drained)
898899

899900
s.publishAdvisory(nil, JSAdvisoryAPILimitReached, JSAPILimitReachedAdvisory{
900901
TypedEvent: TypedEvent{
@@ -904,7 +905,7 @@ func (js *jetStream) apiDispatch(sub *subscription, c *client, acc *Account, sub
904905
},
905906
Server: s.Name(),
906907
Domain: js.config.Domain,
907-
Dropped: int64(pending),
908+
Dropped: drained,
908909
})
909910
}
910911
}

server/raft.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -1962,7 +1962,7 @@ runner:
19621962
// just will remove them from the central monitoring map
19631963
queues := []interface {
19641964
unregister()
1965-
drain()
1965+
drain() int
19661966
}{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply}
19671967
for _, q := range queues {
19681968
q.drain()

0 commit comments

Comments
 (0)