Skip to content

Commit

Permalink
Merge branch 'master' into faster-bcast-queue
Browse files Browse the repository at this point in the history
  • Loading branch information
rboyer committed Jan 3, 2019
2 parents f8abb95 + a902764 commit 2690c35
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 2 deletions.
6 changes: 4 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,7 @@ branches:
only:
- master

install:
- make deps
install: true

env:
- GO111MODULE=on # Enable Go modules in 1.11
23 changes: 23 additions & 0 deletions net_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,16 +221,39 @@ func (t *NetTransport) Shutdown() error {
// and hands them off to the stream channel.
func (t *NetTransport) tcpListen(tcpLn *net.TCPListener) {
defer t.wg.Done()

// baseDelay is the initial delay after an AcceptTCP() error before attempting again
const baseDelay = 5 * time.Millisecond

// maxDelay is the maximum delay after an AcceptTCP() error before attempting again.
// In the case that tcpListen() is error-looping, it will delay the shutdown check.
// Therefore, changes to maxDelay may have an effect on the latency of shutdown.
const maxDelay = 1 * time.Second

var loopDelay time.Duration
for {
conn, err := tcpLn.AcceptTCP()
if err != nil {
if s := atomic.LoadInt32(&t.shutdown); s == 1 {
break
}

if loopDelay == 0 {
loopDelay = baseDelay
} else {
loopDelay *= 2
}

if loopDelay > maxDelay {
loopDelay = maxDelay
}

t.logger.Printf("[ERR] memberlist: Error accepting TCP connection: %v", err)
time.Sleep(loopDelay)
continue
}
// No error, reset loop delay
loopDelay = 0

t.streamCh <- conn
}
Expand Down
75 changes: 75 additions & 0 deletions transport_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package memberlist

import (
"log"
"net"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -133,3 +137,74 @@ func TestTransport_Send(t *testing.T) {
// assert send ordering. Sort both slices to be tolerant of re-ordering.
require.ElementsMatch(t, expected, received)
}

type testCountingWriter struct {
t *testing.T
numCalls *int32
}

func (tw testCountingWriter) Write(p []byte) (n int, err error) {
atomic.AddInt32(tw.numCalls, 1)
if !strings.Contains(string(p), "memberlist: Error accepting TCP connection") {
tw.t.Error("did not receive expected log message")
}
tw.t.Log("countingWriter:", string(p))
return len(p), nil
}

// TestTransport_TcpListenBackoff tests that AcceptTCP() errors in NetTransport#tcpListen()
// do not result in a tight loop and spam the log. We verify this here by counting the number
// of entries logged in a given time period.
func TestTransport_TcpListenBackoff(t *testing.T) {

// testTime is the amount of time we will allow NetTransport#tcpListen() to run
// This needs to be long enough that to verify that maxDelay is in force,
// but not so long as to be obnoxious when running the test suite.
const testTime = 4 * time.Second

var numCalls int32
countingWriter := testCountingWriter{t, &numCalls}
countingLogger := log.New(countingWriter, "test", log.LstdFlags)
transport := NetTransport{
streamCh: make(chan net.Conn),
logger: countingLogger,
}
transport.wg.Add(1)

// create a listener that will cause AcceptTCP calls to fail
listener, _ := net.ListenTCP("tcp", nil)
listener.Close()
go transport.tcpListen(listener)

// sleep (+yield) for testTime seconds before asking the accept loop to shut down
time.Sleep(testTime)
atomic.StoreInt32(&transport.shutdown, 1)

// Verify that the wg was completed on exit (but without blocking this test)
// maxDelay == 1s, so we will give the routine 1.25s to loop around and shut down.
c := make(chan struct{})
go func() {
defer close(c)
transport.wg.Wait()
}()
select {
case <- c:
case <-time.After(1250 * time.Millisecond):
t.Error("timed out waiting for transport waitgroup to be done after flagging shutdown")
}

// In testTime==4s, we expect to loop approximately 12 times (and log approximately 11 errors),
// with the following delays (in ms):
// 0+5+10+20+40+80+160+320+640+1000+1000+1000 == 4275 ms
// Too few calls suggests that the minDelay is not in force; too many calls suggests that the
// maxDelay is not in force or that the back-off isn't working at all.
// We'll leave a little flex; the important thing here is the asymptotic behavior.
// If the minDelay or maxDelay in NetTransport#tcpListen() are modified, this test may fail
// and need to be adjusted.
require.True(t, numCalls > 8)
require.True(t, numCalls < 14)

// no connections should have been accepted and sent to the channel
require.Equal(t, len(transport.streamCh), 0)
}

0 comments on commit 2690c35

Please sign in to comment.