Skip to content

Commit

Permalink
shared client: Do not busy loop on errors
Browse files Browse the repository at this point in the history
Change the receive loop to wait for a trigger from the send loop to
restart receiving after errors have been received. On any error all the
current requests are canceled, so there is no point receiving before a
new request is sent. This prevents receiver busy-looping on sticky error
conditions, such as i/o timeout.

Also consider io.EOF as a connection close, as the TLS/TCP connections
are read through the io package, which returns io.EOF on graceful
connection close.

Explicitly wait for the write timeout when sending a new request to the
handler, as the handler may be quitting at the same time and never
actually handle the new request.

Signed-off-by: Jarno Rajahalme <jarno@isovalent.com>
  • Loading branch information
jrajahalme committed Nov 16, 2023
1 parent eaf71f6 commit 50ce2af
Showing 1 changed file with 47 additions and 11 deletions.
58 changes: 47 additions & 11 deletions shared_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"io"
"net"
"sync"
"time"
Expand Down Expand Up @@ -143,21 +144,44 @@ func handler(wg *sync.WaitGroup, client *Client, conn *Conn, requests chan reque

responses := make(chan sharedClientResponse)

receiverTrigger := make(chan struct{})
triggerReceiver := func() {
select {
case receiverTrigger <- struct{}{}:
default:
}
}

// Receive loop
wg.Add(1)
go func() {
defer wg.Done()
defer close(responses)
for {
// This will block but eventually return an i/o timeout, as we always set
// the timeouts before sending anything
r, err := conn.ReadMsg()
if err != nil {
// handler is not reading on the channel after closing
if errors.Is(err, net.ErrClosed) {
return
}
responses <- sharedClientResponse{nil, 0, err}
} else {
if err == nil {
responses <- sharedClientResponse{r, 0, nil}
continue // receive immediately again
}

// handler is not reading on the channel after closing.
// UDP connections return net.ErrClosed, while TCP/TLS connections are read
// via the io package, which return io.EOF.
if errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) {
return
}

// send error response to cancel all current requests.
responses <- sharedClientResponse{nil, 0, err}

// wait for a trigger from the handler after any errors. Re-reading in
// this condition could busy loop, e.g., if a read timeout occurred.
_, ok := <-receiverTrigger
// exit immediately if the trigger channel is closed
if !ok {
return
}
}
}()
Expand All @@ -169,6 +193,7 @@ func handler(wg *sync.WaitGroup, client *Client, conn *Conn, requests chan reque
waitingResponses := make(map[uint16]waiter)
defer func() {
conn.Close()
close(receiverTrigger)

// Drain responses send by receive loop to allow it to exit.
// It may be repeatedly reading after an i/o timeout, for example.
Expand All @@ -194,6 +219,9 @@ func handler(wg *sync.WaitGroup, client *Client, conn *Conn, requests chan reque
close(req.ch)
} else {
waitingResponses[req.msg.Id] = waiter{req.ch, start}

// Wake up the receiver that may be waiting for a trigger after an error
triggerReceiver()
}

case resp, ok := <-responses:
Expand Down Expand Up @@ -234,12 +262,20 @@ func (c *SharedClient) ExchangeSharedContext(ctx context.Context, m *Msg) (r *Ms
}
c.Unlock()

// As this request keeps c.requests open, sending a request to it may hang indefinitely if
// the handler happens to be quitting at the same time as we are sending a message here, so
// explicitly wait for a write timeout.
timeout := c.Client.writeTimeout()
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
respCh := make(chan sharedClientResponse)
c.requests <- request{
ctx: ctx,
msg: m,
ch: respCh,
select {
case c.requests <- request{ctx: ctx, msg: m, ch: respCh}:
case <-ctx.Done():
return nil, 0, ctx.Err()
}

// Since c.requests is unbuffered, the handler is guaranteed to eventually close 'respCh'
resp := <-respCh
return resp.msg, resp.rtt, resp.err
}
Expand Down

0 comments on commit 50ce2af

Please sign in to comment.