Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

decaying connmgr tags for message delivery #328

Merged
merged 33 commits into from
May 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
4282ef9
track discovery API change
yusefnapora May 7, 2020
e41c6db
temporarily depend on PR commits
yusefnapora May 19, 2020
3ba9e84
add tagTracer to apply connmgr tags
yusefnapora May 19, 2020
0b9c04a
unit tests for tagTracer
yusefnapora May 19, 2020
151b943
bump connmgr dep to latest PR commit
yusefnapora May 19, 2020
27f5fe1
test delivery tags vs sybil storm
yusefnapora May 9, 2020
b18a85e
increase decaying tag interval
yusefnapora May 11, 2020
b492803
add nil check for decayer
yusefnapora May 11, 2020
a6d8c41
lock for reading in bumpDeliveryTag
yusefnapora May 11, 2020
1a7bdee
bump blankhost to 0.1.6
yusefnapora May 19, 2020
6e36142
add delivery tags for "near-first" msg deliveries
yusefnapora May 11, 2020
4b1cfba
free up drec.peers when we're done with it
yusefnapora May 11, 2020
ab1fb5b
use map to track near-first deliveries in tagTracer
yusefnapora May 13, 2020
a59feaa
update dependencies and use preset decay fns
yusefnapora May 19, 2020
9b522db
check reject reason before deleting near-first state
yusefnapora May 14, 2020
e2708e7
sort imports
yusefnapora May 14, 2020
24a6222
rm misleading comment
yusefnapora May 14, 2020
1c49b06
increase test timeout on travis
yusefnapora May 15, 2020
ee72b60
temporarily depend on master for -core and -connmgr
yusefnapora May 19, 2020
79ce9ac
close decaying tags when leaving topic
yusefnapora May 15, 2020
b4dc2d1
inline decayingDeliveryTag method into addDeliveryTag
yusefnapora May 15, 2020
bb9f17c
use fewer peers in connmgr test
yusefnapora May 15, 2020
5ef02ee
even fewer peers in connmgr test
yusefnapora May 18, 2020
ce6f171
add delay in TestTagTracerDeliveryTags
yusefnapora May 18, 2020
dc6ff33
play with test timing on travis
yusefnapora May 19, 2020
b577653
this is getting a bit silly
yusefnapora May 19, 2020
c69df44
loosen test expectations to appease travis
yusefnapora May 19, 2020
53ca716
better range check for expected value in test
yusefnapora May 19, 2020
eed9da6
rm empty slice alloc
yusefnapora May 19, 2020
daafc50
revert travis timeout change
yusefnapora May 19, 2020
a082669
update deps, go mod tidy
yusefnapora May 19, 2020
aa4d9a4
sleep in test after bumping fake clock
yusefnapora May 19, 2020
bdea33d
sleep longer for travis
yusefnapora May 19, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func defaultDiscoverOptions() *discoverOptions {
dialTimeout := time.Minute * 2
discoverOpts := &discoverOptions{
connFactory: func(host host.Host) (*discimpl.BackoffConnector, error) {
backoff := discimpl.NewExponentialBackoff(minBackoff, maxBackoff, discimpl.FullJitter, time.Second, 5.0, 0, rngSrc)
backoff := discimpl.NewExponentialBackoff(minBackoff, maxBackoff, discimpl.FullJitter, time.Second, 5.0, 0, rand.New(rngSrc))
return discimpl.NewBackoffConnector(host, cacheSize, dialTimeout, backoff)
},
}
Expand Down
7 changes: 6 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
module github.com/libp2p/go-libp2p-pubsub

require (
github.com/benbjohnson/clock v1.0.1
github.com/gogo/protobuf v1.3.1
github.com/ipfs/go-log v1.0.4
github.com/libp2p/go-libp2p-blankhost v0.1.6
github.com/libp2p/go-libp2p-connmgr v0.2.3
github.com/libp2p/go-libp2p-core v0.5.6
github.com/libp2p/go-libp2p-discovery v0.4.0
github.com/libp2p/go-libp2p-swarm v0.2.4
github.com/multiformats/go-multiaddr v0.2.2
github.com/multiformats/go-multiaddr-net v0.1.5
github.com/multiformats/go-multibase v0.0.2 // indirect
github.com/multiformats/go-multistream v0.1.1
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee
golang.org/x/crypto v0.0.0-20200117160349-530e935923ad // indirect
go.uber.org/zap v1.15.0 // indirect
golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37 // indirect
golang.org/x/sys v0.0.0-20200509044756-6aff5f38e54f // indirect
)

go 1.13
115 changes: 33 additions & 82 deletions go.sum

Large diffs are not rendered by default.

39 changes: 13 additions & 26 deletions gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,12 @@ func NewGossipSub(ctx context.Context, h host.Host, opts ...Option) (*PubSub, er
opportunisticGraftTicks: GossipSubOpportunisticGraftTicks,

fanoutTTL: GossipSubFanoutTTL,

tagTracer: newTagTracer(h.ConnManager()),
}

// use the withInternalTracer option to hook up the tag tracer
opts = append(opts, withInternalTracer(rt.tagTracer))
return NewPubSub(ctx, h, rt, opts...)
}

Expand Down Expand Up @@ -224,6 +229,10 @@ func WithDirectPeers(pis []peer.AddrInfo) Option {

gs.direct = direct

if gs.tagTracer != nil {
gs.tagTracer.direct = direct
}

return nil
}
}
Expand Down Expand Up @@ -254,6 +263,7 @@ type GossipSubRouter struct {
tracer *pubsubTracer
score *peerScore
gossipTracer *gossipTracer
tagTracer *tagTracer

// whether PX is enabled; this should be enabled in bootstrappers and other well connected/trusted
// nodes.
Expand Down Expand Up @@ -318,6 +328,9 @@ func (gs *GossipSubRouter) Attach(p *PubSub) {
// and the gossip tracing
gs.gossipTracer.Start(gs)

// and the tracer for connmgr tags
gs.tagTracer.Start(gs)

// start using the same msg ID function as PubSub for caching messages.
gs.mcache.SetMsgIdFn(p.msgID)

Expand Down Expand Up @@ -347,12 +360,6 @@ func (gs *GossipSubRouter) AddPeer(p peer.ID, proto protocol.ID) {
gs.tracer.AddPeer(p, proto)
gs.peers[p] = proto

// tag peer if it is a direct peer
_, direct := gs.direct[p]
if direct {
gs.p.host.ConnManager().TagPeer(p, "pubsub:direct", 1000)
}

// track the connection direction
outbound := false
conns := gs.p.host.Network().ConnsToPeer(p)
Expand Down Expand Up @@ -621,7 +628,6 @@ func (gs *GossipSubRouter) handleGraft(p peer.ID, ctl *pb.ControlMessage) []*pb.
log.Debugf("GRAFT: add mesh link from %s in %s", p, topic)
gs.tracer.Graft(p, topic)
peers[p] = struct{}{}
gs.tagPeer(p, topic)
}

if len(prune) == 0 {
Expand Down Expand Up @@ -649,7 +655,6 @@ func (gs *GossipSubRouter) handlePrune(p peer.ID, ctl *pb.ControlMessage) {
log.Debugf("PRUNE: Remove mesh link to %s in %s", p, topic)
gs.tracer.Prune(p, topic)
delete(peers, p)
gs.untagPeer(p, topic)
// is there a backoff specified by the peer? if so obey it.
backoff := prune.GetBackoff()
if backoff > 0 {
Expand Down Expand Up @@ -889,7 +894,6 @@ func (gs *GossipSubRouter) Join(topic string) {
log.Debugf("JOIN: Add mesh link to %s in %s", p, topic)
gs.tracer.Graft(p, topic)
gs.sendGraft(p, topic)
gs.tagPeer(p, topic)
}
}

Expand All @@ -908,7 +912,6 @@ func (gs *GossipSubRouter) Leave(topic string) {
log.Debugf("LEAVE: Remove mesh link to %s in %s", p, topic)
gs.tracer.Prune(p, topic)
gs.sendPrune(p, topic)
gs.untagPeer(p, topic)
}
}

Expand Down Expand Up @@ -1168,7 +1171,6 @@ func (gs *GossipSubRouter) heartbeat() {
prunePeer := func(p peer.ID) {
gs.tracer.Prune(p, topic)
delete(peers, p)
gs.untagPeer(p, topic)
gs.addBackoff(p, topic)
topics := toprune[p]
toprune[p] = append(topics, topic)
Expand All @@ -1178,7 +1180,6 @@ func (gs *GossipSubRouter) heartbeat() {
log.Debugf("HEARTBEAT: Add mesh link to %s in %s", p, topic)
gs.tracer.Graft(p, topic)
peers[p] = struct{}{}
gs.tagPeer(p, topic)
topics := tograft[p]
tograft[p] = append(topics, topic)
}
Expand Down Expand Up @@ -1672,20 +1673,6 @@ func (gs *GossipSubRouter) getPeers(topic string, count int, filter func(peer.ID
return peers
}

func (gs *GossipSubRouter) tagPeer(p peer.ID, topic string) {
tag := topicTag(topic)
gs.p.host.ConnManager().TagPeer(p, tag, 20)
}

func (gs *GossipSubRouter) untagPeer(p peer.ID, topic string) {
tag := topicTag(topic)
gs.p.host.ConnManager().UntagPeer(p, tag)
}

func topicTag(topic string) string {
return fmt.Sprintf("pubsub:%s", topic)
}

func peerListToMap(peers []peer.ID) map[peer.ID]struct{} {
pmap := make(map[peer.ID]struct{})
for _, p := range peers {
Expand Down
172 changes: 172 additions & 0 deletions gossipsub_connmgr_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package pubsub

import (
"context"
"testing"
"time"

"github.com/benbjohnson/clock"
"github.com/libp2p/go-libp2p-core/host"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"

bhost "github.com/libp2p/go-libp2p-blankhost"
connmgr "github.com/libp2p/go-libp2p-connmgr"
"github.com/libp2p/go-libp2p-core/peer"
)

func TestGossipsubConnTagMessageDeliveries(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

oldGossipSubD := GossipSubD
oldGossipSubDlo := GossipSubDlo
oldGossipSubDHi := GossipSubDhi
oldGossipSubConnTagDecayInterval := GossipSubConnTagDecayInterval
oldGossipSubConnTagMessageDeliveryCap := GossipSubConnTagMessageDeliveryCap

oldSilencePeriod := connmgr.SilencePeriod

// set the gossipsub D parameters low, so that we have some peers outside the mesh
GossipSubDlo = 3
GossipSubD = 3
GossipSubDhi = 3
// also set the tag decay interval so we don't have to wait forever for tests
GossipSubConnTagDecayInterval = time.Second

// set the cap for deliveries above GossipSubConnTagValueMeshPeer, so the sybils
// will be forced out even if they end up in someone's mesh
GossipSubConnTagMessageDeliveryCap = 50

connmgr.SilencePeriod = time.Millisecond
// reset globals after test
defer func() {
GossipSubD = oldGossipSubD
GossipSubDlo = oldGossipSubDlo
GossipSubDhi = oldGossipSubDHi
GossipSubConnTagDecayInterval = oldGossipSubConnTagDecayInterval
GossipSubConnTagMessageDeliveryCap = oldGossipSubConnTagMessageDeliveryCap
connmgr.SilencePeriod = oldSilencePeriod
}()

decayClock := clock.NewMock()
decayCfg := connmgr.DecayerCfg{
Resolution: time.Second,
Clock: decayClock,
}

nHonest := 5
nSquatter := 10
connLimit := 10

connmgrs := make([]*connmgr.BasicConnMgr, nHonest)
honestHosts := make([]host.Host, nHonest)
honestPeers := make(map[peer.ID]struct{})

for i := 0; i < nHonest; i++ {
connmgrs[i] = connmgr.NewConnManager(nHonest, connLimit, 0,
connmgr.DecayerConfig(&decayCfg))

netw := swarmt.GenSwarm(t, ctx)
h := bhost.NewBlankHost(netw, bhost.WithConnectionManager(connmgrs[i]))
honestHosts[i] = h
honestPeers[h.ID()] = struct{}{}
}

// use flood publishing, so non-mesh peers will still be delivering messages
// to everyone
psubs := getGossipsubs(ctx, honestHosts,
WithFloodPublish(true))

// sybil squatters to be connected later
sybilHosts := getNetHosts(t, ctx, nSquatter)
squatters := make([]*sybilSquatter, 0, nSquatter)
for _, h := range sybilHosts {
squatter := &sybilSquatter{h: h}
h.SetStreamHandler(GossipSubID_v10, squatter.handleStream)
squatters = append(squatters, squatter)
}

// connect the honest hosts
connectAll(t, honestHosts)

for _, h := range honestHosts {
if len(h.Network().Conns()) != nHonest-1 {
t.Errorf("expected to have conns to all honest peers, have %d", len(h.Network().Conns()))
}
}

// subscribe everyone to the topic
topic := "test"
var msgs []*Subscription
for _, ps := range psubs {
subch, err := ps.Subscribe(topic)
if err != nil {
t.Fatal(err)
}

msgs = append(msgs, subch)
}

// sleep to allow meshes to form
time.Sleep(2 * time.Second)

// have all the hosts publish enough messages to ensure that they get some delivery credit
nMessages := GossipSubConnTagMessageDeliveryCap * 2
for _, ps := range psubs {
for i := 0; i < nMessages; i++ {
ps.Publish(topic, []byte("hello"))
}
}

// advance the fake time for the tag decay
decayClock.Add(time.Second)

// verify that they've given each other delivery connection tags
tag := "pubsub-deliveries:test"
for _, h := range honestHosts {
for _, h2 := range honestHosts {
if h.ID() == h2.ID() {
continue
}
val := getTagValue(h.ConnManager(), h2.ID(), tag)
if val == 0 {
t.Errorf("Expected non-zero delivery tag value for peer %s", h2.ID())
}
}
}

// now connect the sybils to put pressure on the real hosts' connection managers
allHosts := append(honestHosts, sybilHosts...)
connectAll(t, allHosts)

// verify that we have a bunch of connections
for _, h := range honestHosts {
if len(h.Network().Conns()) != nHonest+nSquatter-1 {
t.Errorf("expected to have conns to all peers, have %d", len(h.Network().Conns()))
}
}

// force the connection managers to trim, so we don't need to muck about with timing as much
for _, cm := range connmgrs {
cm.TrimOpenConns(ctx)
}

// we should still have conns to all the honest peers, but not the sybils
for _, h := range honestHosts {
nHonestConns := 0
nDishonestConns := 0
for _, conn := range h.Network().Conns() {
if _, ok := honestPeers[conn.RemotePeer()]; !ok {
nDishonestConns++
} else {
nHonestConns++
}
}
if nDishonestConns > connLimit-nHonest {
t.Errorf("expected most dishonest conns to be pruned, have %d", nDishonestConns)
}
if nHonestConns != nHonest-1 {
t.Errorf("expected all honest conns to be preserved, have %d", nHonestConns)
}
}
}
2 changes: 1 addition & 1 deletion gossipsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1172,7 +1172,7 @@ func TestGossipsubDirectPeers(t *testing.T) {
c.Close()
}

time.Sleep(3 * time.Second)
time.Sleep(5 * time.Second)

if len(h[1].Network().ConnsToPeer(h[2].ID())) == 0 {
t.Fatal("expected a connection between direct peers")
Expand Down
12 changes: 12 additions & 0 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,18 @@ func WithEventTracer(tracer EventTracer) Option {
}
}

// withInternalTracer adds an internal event tracer to the pubsub system
func withInternalTracer(tracer internalTracer) Option {
return func(p *PubSub) error {
if p.tracer != nil {
p.tracer.internal = append(p.tracer.internal, tracer)
} else {
p.tracer = &pubsubTracer{internal: []internalTracer{tracer}, pid: p.host.ID(), msgID: p.msgID}
}
return nil
}
}

// WithMaxMessageSize sets the global maximum message size for pubsub wire
// messages. The default value is 1MiB (DefaultMaxMessageSize).
//
Expand Down
Loading