Skip to content

Commit

Permalink
Merge pull request #26 from raulk/fix/concurrency-and-silence-period
Browse files Browse the repository at this point in the history
Fix concurrency and silence period not being honoured
  • Loading branch information
raulk committed Dec 17, 2018
2 parents 6f4d058 + 97f80a6 commit e597ebd
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 32 deletions.
52 changes: 32 additions & 20 deletions p2p/net/connmgr/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import (
"time"

logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-interface-connmgr"
ifconnmgr "github.com/libp2p/go-libp2p-interface-connmgr"
inet "github.com/libp2p/go-libp2p-net"
"github.com/libp2p/go-libp2p-peer"
peer "github.com/libp2p/go-libp2p-peer"
ma "github.com/multiformats/go-multiaddr"
)

const silencePeriod = 10 * time.Second

var log = logging.Logger("connmgr")

// BasicConnMgr is a ConnManager that trims connections whenever the count exceeds the
Expand All @@ -23,17 +25,16 @@ var log = logging.Logger("connmgr")
//
// See configuration parameters in NewConnManager.
type BasicConnMgr struct {
highWater int
lowWater int

lk sync.Mutex
highWater int
lowWater int
connCount int
gracePeriod time.Duration
peers map[peer.ID]*peerInfo

peers map[peer.ID]*peerInfo
connCount int

lk sync.Mutex

lastTrim time.Time
// channel-based semaphore that enforces only a single trim is in progress
trimRunningCh chan struct{}
lastTrim time.Time
}

var _ ifconnmgr.ConnManager = (*BasicConnMgr)(nil)
Expand All @@ -46,10 +47,11 @@ var _ ifconnmgr.ConnManager = (*BasicConnMgr)(nil)
// subject to pruning.
func NewConnManager(low, hi int, grace time.Duration) *BasicConnMgr {
return &BasicConnMgr{
highWater: hi,
lowWater: low,
gracePeriod: grace,
peers: make(map[peer.ID]*peerInfo),
highWater: hi,
lowWater: low,
gracePeriod: grace,
peers: make(map[peer.ID]*peerInfo),
trimRunningCh: make(chan struct{}, 1),
}
}

Expand All @@ -67,13 +69,27 @@ type peerInfo struct {
// equal the low watermark. Peers are sorted in ascending order based on their total value,
// pruning those peers with the lowest scores first, as long as they are not within their
// grace period.
//
// TODO: error return value so we can cleanly signal we are aborting because:
// (a) there's another trim in progress, or (b) the silence period is in effect.
func (cm *BasicConnMgr) TrimOpenConns(ctx context.Context) {
select {
case cm.trimRunningCh <- struct{}{}:
default:
return
}
defer func() { <-cm.trimRunningCh }()
if time.Since(cm.lastTrim) < silencePeriod {
// skip this attempt to trim as the last one just took place.
return
}
defer log.EventBegin(ctx, "connCleanup").Done()
for _, c := range cm.getConnsToClose(ctx) {
log.Info("closing conn: ", c.RemotePeer())
log.Event(ctx, "closeConn", c.RemotePeer())
c.Close()
}
cm.lastTrim = time.Now()
}

// getConnsToClose runs the heuristics described in TrimOpenConns and returns the
Expand All @@ -87,8 +103,6 @@ func (cm *BasicConnMgr) getConnsToClose(ctx context.Context) []inet.Conn {
return nil
}
now := time.Now()
cm.lastTrim = now

if len(cm.peers) < cm.lowWater {
log.Info("open connection count below limit")
return nil
Expand Down Expand Up @@ -263,9 +277,7 @@ func (nn *cmNotifee) Connected(n inet.Network, c inet.Conn) {
cm.connCount++

if cm.connCount > nn.highWater {
if cm.lastTrim.IsZero() || time.Since(cm.lastTrim) > time.Second*10 {
go cm.TrimOpenConns(context.Background())
}
go cm.TrimOpenConns(context.Background())
}
}

Expand Down
61 changes: 49 additions & 12 deletions p2p/net/connmgr/connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,17 @@ import (

type tconn struct {
inet.Conn
peer peer.ID
closed bool

peer peer.ID
closed bool
disconnectNotify func(net inet.Network, conn inet.Conn)
}

func (c *tconn) Close() error {
c.closed = true
if c.disconnectNotify != nil {
c.disconnectNotify(nil, c)
}
return nil
}

Expand All @@ -34,9 +39,9 @@ func (c *tconn) RemoteMultiaddr() ma.Multiaddr {
return addr
}

func randConn(t *testing.T) inet.Conn {
func randConn(t *testing.T, discNotify func(inet.Network, inet.Conn)) inet.Conn {
pid := tu.RandPeerIDFatal(t)
return &tconn{peer: pid}
return &tconn{peer: pid, disconnectNotify: discNotify}
}

func TestConnTrimming(t *testing.T) {
Expand All @@ -45,7 +50,7 @@ func TestConnTrimming(t *testing.T) {

var conns []inet.Conn
for i := 0; i < 300; i++ {
rc := randConn(t)
rc := randConn(t, nil)
conns = append(conns, rc)
not.Connected(nil, rc)
}
Expand Down Expand Up @@ -98,7 +103,7 @@ func TestConnsToClose(t *testing.T) {
cm = NewConnManager(1, 1, time.Duration(10*time.Minute))
not := cm.Notifee()
for i := 0; i < 5; i++ {
conn := randConn(t)
conn := randConn(t, nil)
not.Connected(nil, conn)
}
conns = cm.getConnsToClose(context.Background())
Expand All @@ -111,7 +116,7 @@ func TestGetTagInfo(t *testing.T) {
start := time.Now()
cm := NewConnManager(1, 1, time.Duration(10*time.Minute))
not := cm.Notifee()
conn := randConn(t)
conn := randConn(t, nil)
not.Connected(nil, conn)
end := time.Now()

Expand Down Expand Up @@ -192,7 +197,7 @@ func TestTagPeerNonExistant(t *testing.T) {
func TestUntagPeer(t *testing.T) {
cm := NewConnManager(1, 1, time.Duration(10*time.Minute))
not := cm.Notifee()
conn := randConn(t)
conn := randConn(t, nil)
not.Connected(nil, conn)
rp := conn.RemotePeer()
cm.TagPeer(rp, "tag", 5)
Expand Down Expand Up @@ -223,7 +228,7 @@ func TestGetInfo(t *testing.T) {
gp := time.Duration(10 * time.Minute)
cm := NewConnManager(1, 5, gp)
not := cm.Notifee()
conn := randConn(t)
conn := randConn(t, nil)
not.Connected(nil, conn)
cm.TrimOpenConns(context.Background())
end := time.Now()
Expand All @@ -250,7 +255,7 @@ func TestDoubleConnection(t *testing.T) {
gp := time.Duration(10 * time.Minute)
cm := NewConnManager(1, 5, gp)
not := cm.Notifee()
conn := randConn(t)
conn := randConn(t, nil)
not.Connected(nil, conn)
cm.TagPeer(conn.RemotePeer(), "foo", 10)
not.Connected(nil, conn)
Expand All @@ -266,11 +271,11 @@ func TestDisconnected(t *testing.T) {
gp := time.Duration(10 * time.Minute)
cm := NewConnManager(1, 5, gp)
not := cm.Notifee()
conn := randConn(t)
conn := randConn(t, nil)
not.Connected(nil, conn)
cm.TagPeer(conn.RemotePeer(), "foo", 10)

not.Disconnected(nil, randConn(t))
not.Disconnected(nil, randConn(t, nil))
if cm.connCount != 1 {
t.Fatal("unexpected number of connections")
}
Expand All @@ -294,3 +299,35 @@ func TestDisconnected(t *testing.T) {
t.Fatal("unexpected number of peers")
}
}

// see https://github.com/libp2p/go-libp2p-connmgr/issues/23
func TestQuickBurstRespectsSilencePeriod(t *testing.T) {
cm := NewConnManager(10, 20, 0)
not := cm.Notifee()

var conns []inet.Conn

// quickly produce 30 connections (sending us above the high watermark)
for i := 0; i < 30; i++ {
rc := randConn(t, not.Disconnected)
conns = append(conns, rc)
not.Connected(nil, rc)
}

// wait for a few seconds
time.Sleep(time.Second * 3)

// only the first trim is allowed in; make sure we close at most 20 connections, not all of them.
var closed int
for _, c := range conns {
if c.(*tconn).closed {
closed++
}
}
if closed > 20 {
t.Fatalf("should have closed at most 20 connections, closed: %d", closed)
}
if total := closed + cm.connCount; total != 30 {
t.Fatalf("expected closed connections + open conn count to equal 30, value: %d", total)
}
}

0 comments on commit e597ebd

Please sign in to comment.