Skip to content

Commit

Permalink
Track recovery from slow consumer events in routes (#5101)
Browse files Browse the repository at this point in the history
Adds extra logging for routes to notice when a route that has
temporarily entered the Slow Consumer state has recovered.
Also changes the counter for route slow consumer to only count each time
the server detects that a route became a slow consumer.

Example logs:

```
[20] 2024/02/17 06:18:48.891218 [INF] 10.84.13.21:33226 - rid:3579 - Slow Consumer Detected: WriteDeadline of 10s exceeded with 160926 chunks of 349984270 total bytes.
[20] 2024/02/17 06:18:58.919430 [INF] 10.84.13.21:33226 - rid:3579 - Slow Consumer State: WriteDeadline of 10s exceeded with 237701 chunks of 515825015 total bytes.
[20] 2024/02/17 06:19:08.979502 [INF] 10.84.13.21:33226 - rid:3579 - Slow Consumer State: WriteDeadline of 10s exceeded with 329433 chunks of 715565941 total bytes.
[20] 2024/02/17 06:19:18.994486 [INF] 10.84.13.21:33226 - rid:3579 - Slow Consumer State: WriteDeadline of 10s exceeded with 533167 chunks of 1158645903 total bytes.
[20] 2024/02/17 06:19:29.025948 [INF] 10.84.13.21:33226 - rid:3579 - Slow Consumer State: WriteDeadline of 10s exceeded with 622193 chunks of 1352896878 total bytes.
[20] 2024/02/17 06:19:39.048491 [INF] 10.84.13.21:33226 - rid:3579 - Slow Consumer State: WriteDeadline of 10s exceeded with 624307 chunks of 1357420714 total bytes.
[20] 2024/02/17 06:19:49.164586 [INF] 10.84.13.21:33226 - rid:3579 - Slow Consumer State: WriteDeadline of 10s exceeded with 595656 chunks of 1294726213 total bytes.
[20] 2024/02/17 06:19:59.177052 [INF] 10.84.13.21:33226 - rid:3579 - Slow Consumer State: WriteDeadline of 10s exceeded with 495507 chunks of 1076217270 total bytes.
[20] 2024/02/17 06:20:09.188345 [INF] 10.84.13.21:33226 - rid:3579 - Slow Consumer State: WriteDeadline of 10s exceeded with 401938 chunks of 872752366 total bytes.
[20] 2024/02/17 06:20:15.430644 [INF] 10.84.13.21:33226 - rid:3579 - Slow Consumer Recovered: Flush took 6.239s with 218191 chunks of 473998204 total bytes.
```

Signed-off-by: Waldemar Quevedo <wally@nats.io>
  • Loading branch information
wallyqs authored Feb 24, 2024
1 parent 3058698 commit 5f6595b
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 3 deletions.
24 changes: 21 additions & 3 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ const (
connectProcessFinished // Marks if this connection has finished the connect process.
compressionNegotiated // Marks if this connection has negotiated compression level with remote.
didTLSFirst // Marks if this connection requested and was accepted doing the TLS handshake first (prior to INFO).
isSlowConsumer // Marks connection as a slow consumer.
)

// set the flag (would be equivalent to set the boolean to true)
Expand Down Expand Up @@ -1651,9 +1652,11 @@ func (c *client) flushOutbound() bool {
}

// Ignore ErrShortWrite errors, they will be handled as partials.
var gotWriteTimeout bool
if err != nil && err != io.ErrShortWrite {
// Handle timeout error (slow consumer) differently
if ne, ok := err.(net.Error); ok && ne.Timeout() {
gotWriteTimeout = true
if closed := c.handleWriteTimeout(n, attempted, len(orig)); closed {
return true
}
Expand Down Expand Up @@ -1691,6 +1694,11 @@ func (c *client) flushOutbound() bool {
close(c.out.stc)
c.out.stc = nil
}
// Check if the connection is recovering from being a slow consumer.
if !gotWriteTimeout && c.flags.isSet(isSlowConsumer) {
c.Noticef("Slow Consumer Recovered: Flush took %.3fs with %d chunks of %d total bytes.", time.Since(start).Seconds(), len(orig), attempted)
c.flags.clear(isSlowConsumer)
}

return true
}
Expand All @@ -1716,14 +1724,22 @@ func (c *client) handleWriteTimeout(written, attempted int64, numChunks int) boo
c.markConnAsClosed(SlowConsumerWriteDeadline)
return true
}
alreadySC := c.flags.isSet(isSlowConsumer)
scState := "Detected"
if alreadySC {
scState = "State"
}

// Aggregate slow consumers.
atomic.AddInt64(&c.srv.slowConsumers, 1)
switch c.kind {
case CLIENT:
c.srv.scStats.clients.Add(1)
case ROUTER:
c.srv.scStats.routes.Add(1)
// Only count each Slow Consumer event once.
if !alreadySC {
c.srv.scStats.routes.Add(1)
}
case GATEWAY:
c.srv.scStats.gateways.Add(1)
case LEAF:
Expand All @@ -1732,13 +1748,15 @@ func (c *client) handleWriteTimeout(written, attempted int64, numChunks int) boo
if c.acc != nil {
atomic.AddInt64(&c.acc.slowConsumers, 1)
}
c.Noticef("Slow Consumer Detected: WriteDeadline of %v exceeded with %d chunks of %d total bytes.",
c.out.wdl, numChunks, attempted)
c.Noticef("Slow Consumer %s: WriteDeadline of %v exceeded with %d chunks of %d total bytes.",
scState, c.out.wdl, numChunks, attempted)

// We always close CLIENT connections, or when nothing was written at all...
if c.kind == CLIENT || written == 0 {
c.markConnAsClosed(SlowConsumerWriteDeadline)
return true
} else {
c.flags.setIfNotSet(isSlowConsumer)
}
return false
}
Expand Down
123 changes: 123 additions & 0 deletions server/routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4160,6 +4160,129 @@ func TestRouteNoLeakOnSlowConsumer(t *testing.T) {
}
}

func TestRouteSlowConsumerRecover(t *testing.T) {
o1 := DefaultOptions()
o1.Cluster.PoolSize = -1
s1 := RunServer(o1)
defer s1.Shutdown()

rtt := 1500 * time.Nanosecond
upRate := 1024 * 1024
downRate := 128 * 1024
np := createNetProxy(rtt, upRate, downRate, fmt.Sprintf("nats://127.0.0.1:%d", o1.Cluster.Port), true)
defer np.stop()

o2 := DefaultOptions()
o2.Cluster.PoolSize = -1
o2.Routes = RoutesFromStr(np.routeURL())
s2 := RunServer(o2)
defer s2.Shutdown()

checkClusterFormed(t, s1, s2)

changeWriteDeadline := func(s *Server, duration time.Duration) {
s.mu.Lock()
for _, cl := range s.routes {
for _, c := range cl {
c.mu.Lock()
c.out.wdl = duration
c.mu.Unlock()
}
}
s.mu.Unlock()
}
hasSlowConsumerRoutes := func(s *Server) bool {
var sc bool
s.mu.Lock()
Loop:
for _, cl := range s.routes {
for _, c := range cl {
c.mu.Lock()
sc = c.flags.isSet(isSlowConsumer)
c.mu.Unlock()
if sc {
break Loop
}
}
}
s.mu.Unlock()
return sc
}

// Start with a shorter write deadline to cause errors
// then bump it again later to let it recover.
changeWriteDeadline(s1, 1*time.Second)

ncA, err := nats.Connect(s1.Addr().String())
require_NoError(t, err)

ncB, err := nats.Connect(s2.Addr().String())
require_NoError(t, err)

var wg sync.WaitGroup
ncB.Subscribe("test", func(*nats.Msg) {
ncB.Close()
})
ncB.Flush()

ctx, cancel := context.WithTimeout(context.Background(), 800*time.Millisecond)
defer cancel()

go func() {
var total int
payload := fmt.Appendf(nil, strings.Repeat("A", 132*1024))
for range time.NewTicker(30 * time.Millisecond).C {
select {
case <-ctx.Done():
wg.Done()
return
default:
}
ncA.Publish("test", payload)
ncA.Flush()
total++
}
}()
wg.Add(1)

checkFor(t, 20*time.Second, 2*time.Millisecond, func() error {
if s1.NumRoutes() < 1 {
return fmt.Errorf("No routes connected")
}
if !hasSlowConsumerRoutes(s1) {
if s1.NumSlowConsumersRoutes() > 0 {
// In case it has recovered already.
return nil
}
return fmt.Errorf("Expected Slow Consumer routes")
}
return nil
})
cancel()
changeWriteDeadline(s1, 5*time.Second)
np.updateRTT(0)
checkFor(t, 20*time.Second, 10*time.Millisecond, func() error {
if s1.NumRoutes() < 1 {
return fmt.Errorf("No routes connected")
}
if hasSlowConsumerRoutes(s1) {
return fmt.Errorf("Expected Slow Consumer routes to recover")
}
return nil
})

checkFor(t, 20*time.Second, 100*time.Millisecond, func() error {
var got, expected int64
got = int64(s1.NumSlowConsumersRoutes())
expected = 1
if got != expected {
return fmt.Errorf("got: %d, expected: %d", got, expected)
}
return nil
})
wg.Wait()
}

func TestRouteNoLeakOnAuthTimeout(t *testing.T) {
opts := DefaultOptions()
opts.Cluster.Username = "foo"
Expand Down

0 comments on commit 5f6595b

Please sign in to comment.