From 50ce2af915e98c904c532e5be4f119811ae7cae7 Mon Sep 17 00:00:00 2001 From: Jarno Rajahalme Date: Thu, 16 Nov 2023 12:15:14 +0200 Subject: [PATCH] shared client: Do not busy loop on errors Change the receive loop to wait for a trigger from the send loop to restart receiving after errors have been received. On any error all the current requests are canceled, so there is no point receiving before a new request is sent. This prevents receiver busy-looping on sticky error conditions, such as i/o timeout. Also consider io.EOF as a connection close, as the TLS/TCP connections are read through the io package, which returns io.EOF on graceful connection close. Explicitly wait for the write timeout when sending a new request to the handler, as the handler may be quitting at the same time and never actually handle the new request. Signed-off-by: Jarno Rajahalme --- shared_client.go | 58 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 47 insertions(+), 11 deletions(-) diff --git a/shared_client.go b/shared_client.go index b15345122..e63f1bc78 100644 --- a/shared_client.go +++ b/shared_client.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "io" "net" "sync" "time" @@ -143,21 +144,44 @@ func handler(wg *sync.WaitGroup, client *Client, conn *Conn, requests chan reque responses := make(chan sharedClientResponse) + receiverTrigger := make(chan struct{}) + triggerReceiver := func() { + select { + case receiverTrigger <- struct{}{}: + default: + } + } + // Receive loop wg.Add(1) go func() { defer wg.Done() defer close(responses) for { + // This will block but eventually return an i/o timeout, as we always set + // the timeouts before sending anything r, err := conn.ReadMsg() - if err != nil { - // handler is not reading on the channel after closing - if errors.Is(err, net.ErrClosed) { - return - } - responses <- sharedClientResponse{nil, 0, err} - } else { + if err == nil { responses <- sharedClientResponse{r, 0, nil} + continue // receive immediately again + } + + // handler is not reading on the channel after closing. + // UDP connections return net.ErrClosed, while TCP/TLS connections are read + // via the io package, which return io.EOF. + if errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) { + return + } + + // send error response to cancel all current requests. + responses <- sharedClientResponse{nil, 0, err} + + // wait for a trigger from the handler after any errors. Re-reading in + // this condition could busy loop, e.g., if a read timeout occurred. + _, ok := <-receiverTrigger + // exit immediately if the trigger channel is closed + if !ok { + return } } }() @@ -169,6 +193,7 @@ func handler(wg *sync.WaitGroup, client *Client, conn *Conn, requests chan reque waitingResponses := make(map[uint16]waiter) defer func() { conn.Close() + close(receiverTrigger) // Drain responses send by receive loop to allow it to exit. // It may be repeatedly reading after an i/o timeout, for example. @@ -194,6 +219,9 @@ func handler(wg *sync.WaitGroup, client *Client, conn *Conn, requests chan reque close(req.ch) } else { waitingResponses[req.msg.Id] = waiter{req.ch, start} + + // Wake up the receiver that may be waiting for a trigger after an error + triggerReceiver() } case resp, ok := <-responses: @@ -234,12 +262,20 @@ func (c *SharedClient) ExchangeSharedContext(ctx context.Context, m *Msg) (r *Ms } c.Unlock() + // As this request keeps c.requests open, sending a request to it may hang indefinitely if + // the handler happens to be quitting at the same time as we are sending a message here, so + // explicitly wait for a write timeout. + timeout := c.Client.writeTimeout() + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() respCh := make(chan sharedClientResponse) - c.requests <- request{ - ctx: ctx, - msg: m, - ch: respCh, + select { + case c.requests <- request{ctx: ctx, msg: m, ch: respCh}: + case <-ctx.Done(): + return nil, 0, ctx.Err() } + + // Since c.requests is unbuffered, the handler is guaranteed to eventually close 'respCh' resp := <-respCh return resp.msg, resp.rtt, resp.err }