Skip to content

Commit

Permalink
internal/jsonrpc2_v2: rework concurrency in idleListener
Browse files Browse the repository at this point in the history
This eliminates a race between a successful Accept call and a
concurrent Close call, which previously could have shut down the
'run' goroutine before Accept sent to the newConns channel, causing
Accept to deadlock.

In fact, it eliminates the long-running 'run' goroutine entirely
(replacing it with a time.Timer), and in the process avoids leaking
O(N) closed connections when only the very first one is long-lived.

It also eliminates a potential double-Close bug: if the run method had
called l.wrapped.Close due to an idle timeout, a subsequent call to
Close would invoke l.wrapped.Close again. The io.Closer method
explicitly documents doubled Close calls as undefined behavior, and
many Closer implementations (especially test fakes) panic or deadlock
in that case.

It also eliminates a timer leak if the Listener rapidly oscillates
between active and idle: previously the implementation used
time.After, but it now uses an explicit time.Timer which can be
stopped (and garbage-collected) when the listener becomes active.

Idleness is now tracked based on the connection's Close method rather
than Read: we have no guarantee in general that a caller will ever
actually invoke Read (if, for example, they Close the connection as
soon as it is dialed), but we can reasonably expect a caller to at
least try to ensure that Close is always called.

We now also verify, using a finalizer on a best-effort basis, that the
Close method on each connection is called. We use the finalizer to
verify the Close call — rather than to close the connection implicitly
— because closing the connection in a finalizer would delay the start
of the idle timer by an arbitrary and unbounded duration after the
last connection is actually no longer in use.

Fixes golang/go#46047.
Fixes golang/go#51435.
For golang/go#46520.
For golang/go#49387.

Change-Id: If173a3ed7a44aff14734b72c8340122e8d020f26
Reviewed-on: https://go-review.googlesource.com/c/tools/+/388597
Run-TryBot: Bryan Mills <bcmills@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Ian Cottrell <iancottrell@google.com>
Auto-Submit: Bryan Mills <bcmills@google.com>
gopls-CI: kokoro <noreply+kokoro@google.com>
  • Loading branch information
Bryan C. Mills authored and gopherbot committed Oct 17, 2022
1 parent 5935531 commit 371ef16
Show file tree
Hide file tree
Showing 3 changed files with 334 additions and 112 deletions.
248 changes: 171 additions & 77 deletions internal/jsonrpc2_v2/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package jsonrpc2
import (
"context"
"errors"
"fmt"
"io"
"runtime"
"strings"
Expand Down Expand Up @@ -171,112 +172,205 @@ func isClosingError(err error) bool {
}

// NewIdleListener wraps a listener with an idle timeout.
// When there are no active connections for at least the timeout duration a
// call to accept will fail with ErrIdleTimeout.
//
// When there are no active connections for at least the timeout duration,
// calls to Accept will fail with ErrIdleTimeout.
//
// A connection is considered inactive as soon as its Close method is called.
func NewIdleListener(timeout time.Duration, wrap Listener) Listener {
l := &idleListener{
timeout: timeout,
wrapped: wrap,
newConns: make(chan *idleCloser),
closed: make(chan struct{}),
wasTimeout: make(chan struct{}),
wrapped: wrap,
timeout: timeout,
active: make(chan int, 1),
timedOut: make(chan struct{}),
idleTimer: make(chan *time.Timer, 1),
}
go l.run()
l.idleTimer <- time.AfterFunc(l.timeout, l.timerExpired)
return l
}

type idleListener struct {
wrapped Listener
timeout time.Duration
newConns chan *idleCloser
closed chan struct{}
wasTimeout chan struct{}
closeOnce sync.Once
}
wrapped Listener
timeout time.Duration

type idleCloser struct {
wrapped io.ReadWriteCloser
closed chan struct{}
closeOnce sync.Once
// Only one of these channels is receivable at any given time.
active chan int // count of active connections; closed when Close is called if not timed out
timedOut chan struct{} // closed when the idle timer expires
idleTimer chan *time.Timer // holds the timer only when idle
}

func (c *idleCloser) Read(p []byte) (int, error) {
n, err := c.wrapped.Read(p)
if err != nil && isClosingError(err) {
c.closeOnce.Do(func() { close(c.closed) })
// Accept accepts an incoming connection.
//
// If an incoming connection is accepted concurrent to the listener being closed
// due to idleness, the new connection is immediately closed.
func (l *idleListener) Accept(ctx context.Context) (io.ReadWriteCloser, error) {
rwc, err := l.wrapped.Accept(ctx)

if err != nil && !isClosingError(err) {
return nil, err
}
return n, err
}

func (c *idleCloser) Write(p []byte) (int, error) {
// we do not close on write failure, we rely on the wrapped writer to do that
// if it is appropriate, which we will detect in the next read.
return c.wrapped.Write(p)
}
select {
case n, ok := <-l.active:
if err != nil {
if ok {
l.active <- n
}
return nil, err
}
if ok {
l.active <- n + 1
} else {
// l.wrapped.Close Close has been called, but Accept returned a
// connection. This race can occur with concurrent Accept and Close calls
// with any net.Listener, and it is benign: since the listener was closed
// explicitly, it can't have also timed out.
}
return l.newConn(rwc), nil

func (c *idleCloser) Close() error {
// we rely on closing the wrapped stream to signal to the next read that we
// are closed, rather than triggering the closed signal directly
return c.wrapped.Close()
}
case <-l.timedOut:
if err == nil {
// Keeping the connection open would leave the listener simultaneously
// active and closed due to idleness, which would be contradictory and
// confusing. Close the connection and pretend that it never happened.
rwc.Close()
}
return nil, ErrIdleTimeout

func (l *idleListener) Accept(ctx context.Context) (io.ReadWriteCloser, error) {
rwc, err := l.wrapped.Accept(ctx)
if err != nil {
if isClosingError(err) {
// underlying listener was closed
l.closeOnce.Do(func() { close(l.closed) })
// was it closed because of the idle timeout?
select {
case <-l.wasTimeout:
err = ErrIdleTimeout
default:
}
case timer := <-l.idleTimer:
if err != nil {
// The idle timer hasn't run yet, so err can't be ErrIdleTimeout.
// Leave the idle timer as it was and return whatever error we got.
l.idleTimer <- timer
return nil, err
}
return nil, err
}
conn := &idleCloser{
wrapped: rwc,
closed: make(chan struct{}),

if !timer.Stop() {
// Failed to stop the timer — the timer goroutine is in the process of
// firing. Send the timer back to the timer goroutine so that it can
// safely close the timedOut channel, and then wait for the listener to
// actually be closed before we return ErrIdleTimeout.
l.idleTimer <- timer
rwc.Close()
<-l.timedOut
return nil, ErrIdleTimeout
}

l.active <- 1
return l.newConn(rwc), nil
}
l.newConns <- conn
return conn, err
}

func (l *idleListener) Close() error {
defer l.closeOnce.Do(func() { close(l.closed) })
select {
case _, ok := <-l.active:
if ok {
close(l.active)
}

case <-l.timedOut:
// Already closed by the timer; take care not to double-close if the caller
// only explicitly invokes this Close method once, since the io.Closer
// interface explicitly leaves doubled Close calls undefined.
return ErrIdleTimeout

case timer := <-l.idleTimer:
if !timer.Stop() {
// Couldn't stop the timer. It shouldn't take long to run, so just wait
// (so that the Listener is guaranteed to be closed before we return)
// and pretend that this call happened afterward.
// That way we won't leak any timers or goroutines when Close returns.
l.idleTimer <- timer
<-l.timedOut
return ErrIdleTimeout
}
close(l.active)
}

return l.wrapped.Close()
}

func (l *idleListener) Dialer() Dialer {
return l.wrapped.Dialer()
}

func (l *idleListener) run() {
var conns []*idleCloser
for {
var firstClosed chan struct{} // left at nil if there are no active conns
var timeout <-chan time.Time // left at nil if there are active conns
if len(conns) > 0 {
firstClosed = conns[0].closed
func (l *idleListener) timerExpired() {
select {
case n, ok := <-l.active:
if ok {
panic(fmt.Sprintf("jsonrpc2: idleListener idle timer fired with %d connections still active", n))
} else {
timeout = time.After(l.timeout)
panic("jsonrpc2: Close finished with idle timer still running")
}
select {
case <-l.closed:
// the main listener closed, no need to keep going

case <-l.timedOut:
panic("jsonrpc2: idleListener idle timer fired more than once")

case <-l.idleTimer:
// The timer for this very call!
}

// Close the Listener with all channels still blocked to ensure that this call
// to l.wrapped.Close doesn't race with the one in l.Close.
defer close(l.timedOut)
l.wrapped.Close()
}

func (l *idleListener) connClosed() {
select {
case n, ok := <-l.active:
if !ok {
// l is already closed, so it can't close due to idleness,
// and we don't need to track the number of active connections any more.
return
case conn := <-l.newConns:
// a new conn arrived, add it to the list
conns = append(conns, conn)
case <-timeout:
// we timed out, only happens when there are no active conns
// close the underlying listener, and allow the normal closing process to happen
close(l.wasTimeout)
l.wrapped.Close()
case <-firstClosed:
// a conn closed, remove it from the active list
conns = conns[:copy(conns, conns[1:])]
}
n--
if n == 0 {
l.idleTimer <- time.AfterFunc(l.timeout, l.timerExpired)
} else {
l.active <- n
}

case <-l.timedOut:
panic("jsonrpc2: idleListener idle timer fired before last active connection was closed")

case <-l.idleTimer:
panic("jsonrpc2: idleListener idle timer active before last active connection was closed")
}
}

type idleListenerConn struct {
wrapped io.ReadWriteCloser
l *idleListener
closeOnce sync.Once
}

func (l *idleListener) newConn(rwc io.ReadWriteCloser) *idleListenerConn {
c := &idleListenerConn{
wrapped: rwc,
l: l,
}

// A caller that forgets to call Close may disrupt the idleListener's
// accounting, even though the file descriptor for the underlying connection
// may eventually be garbage-collected anyway.
//
// Set a (best-effort) finalizer to verify that a Close call always occurs.
// (We will clear the finalizer explicitly in Close.)
runtime.SetFinalizer(c, func(c *idleListenerConn) {
panic("jsonrpc2: IdleListener connection became unreachable without a call to Close")
})

return c
}

func (c *idleListenerConn) Read(p []byte) (int, error) { return c.wrapped.Read(p) }
func (c *idleListenerConn) Write(p []byte) (int, error) { return c.wrapped.Write(p) }

func (c *idleListenerConn) Close() error {
defer c.closeOnce.Do(func() {
c.l.connClosed()
runtime.SetFinalizer(c, nil)
})
return c.wrapped.Close()
}
Loading

0 comments on commit 371ef16

Please sign in to comment.