Skip to content

Commit

Permalink
Utilize identify events to add peers to the routing table (#472)
Browse files Browse the repository at this point in the history
* feat: consume identify events to evaluate routing table addition
* fix: routing table no longer gets an update just because new messages have arrived or been sent
* fix: add already connected peers into the routing table before listening to events

Co-authored-by: Raúl Kripalani <raul.kripalani@gmail.com>
Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com>
  • Loading branch information
3 people authored and Stebalien committed Apr 3, 2020
1 parent 41f47f7 commit 509c0bc
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 179 deletions.
12 changes: 4 additions & 8 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
dht.enableValues = cfg.EnableValues

// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))

dht.proc.Go((*subscriberNotifee)(dht).subscribe)
// handle providers
dht.proc.AddChild(dht.ProviderManager.Process())
dht.Validator = cfg.Validator

Expand Down Expand Up @@ -182,12 +182,8 @@ func makeDHT(ctx context.Context, h host.Host, cfg opts.Options) *IpfsDHT {
triggerRtRefresh: make(chan chan<- error),
}

// create a DHT proc with the given teardown
dht.proc = goprocess.WithTeardown(func() error {
// remove ourselves from network notifs.
dht.host.Network().StopNotify((*netNotifiee)(dht))
return nil
})
// create a DHT proc with the given context
dht.proc = goprocessctx.WithContext(ctx)

// create a tagged context derived from the original context
ctxTags := dht.newContextWithLocalTags(ctx)
Expand Down
14 changes: 0 additions & 14 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,6 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
return false
}

dht.updateFromMessage(ctx, mPeer, &req)

if resp == nil {
continue
}
Expand Down Expand Up @@ -187,9 +185,6 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message
return nil, err
}

// update the peer (on valid msgs only)
dht.updateFromMessage(ctx, p, rpmes)

stats.Record(ctx,
metrics.SentRequests.M(1),
metrics.SentBytes.M(int64(pmes.Size())),
Expand Down Expand Up @@ -230,15 +225,6 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message
return nil
}

func (dht *IpfsDHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Message) error {
// Make sure that this node is actually a DHT server, not just a client.
protos, err := dht.peerstore.SupportsProtocols(p, dht.protocolStrs()...)
if err == nil && len(protos) > 0 {
dht.Update(ctx, p)
}
return nil
}

func (dht *IpfsDHT) messageSenderForPeer(ctx context.Context, p peer.ID) (*messageSender, error) {
dht.smlk.Lock()
ms, ok := dht.strmap[p]
Expand Down
28 changes: 17 additions & 11 deletions ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/routing"
opts "github.com/libp2p/go-libp2p-kad-dht/opts"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"

ggio "github.com/gogo/protobuf/io"
u "github.com/ipfs/go-ipfs-util"
Expand Down Expand Up @@ -67,25 +70,28 @@ func TestGetFailures(t *testing.T) {
}

ctx := context.Background()
mn, err := mocknet.FullMeshConnected(ctx, 2)
if err != nil {
t.Fatal(err)
}
hosts := mn.Hosts()

os := []opts.Option{opts.DisableAutoRefresh()}
d, err := New(ctx, hosts[0], os...)
host1 := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport))
host2 := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport))

d, err := New(ctx, host1, opts.DisableAutoRefresh())
if err != nil {
t.Fatal(err)
}
d.Update(ctx, hosts[1].ID())

// Reply with failures to every message
hosts[1].SetStreamHandler(d.protocols[0], func(s network.Stream) {
host2.SetStreamHandler(d.protocols[0], func(s network.Stream) {
time.Sleep(400 * time.Millisecond)
s.Close()
})

host1.Peerstore().AddAddrs(host2.ID(), host2.Addrs(), peerstore.ConnectedAddrTTL)
_, err = host1.Network().DialPeer(ctx, host2.ID())
if err != nil {
t.Fatal(err)
}
time.Sleep(1 * time.Second)

// This one should time out
ctx1, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
Expand All @@ -104,7 +110,7 @@ func TestGetFailures(t *testing.T) {
t.Log("Timeout test passed.")

// Reply with failures to every message
hosts[1].SetStreamHandler(d.protocols[0], func(s network.Stream) {
host2.SetStreamHandler(d.protocols[0], func(s network.Stream) {
defer s.Close()

pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
Expand Down Expand Up @@ -156,7 +162,7 @@ func TestGetFailures(t *testing.T) {
Record: rec,
}

s, err := hosts[1].NewStream(context.Background(), hosts[0].ID(), d.protocols[0])
s, err := host2.NewStream(context.Background(), host1.ID(), d.protocols[0])
if err != nil {
t.Fatal(err)
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/ipfs/go-ipfs-util v0.0.1
github.com/ipfs/go-log v1.0.2
github.com/jbenet/goprocess v0.1.3
github.com/libp2p/go-eventbus v0.1.0
github.com/libp2p/go-libp2p v0.6.1
github.com/libp2p/go-libp2p-core v0.5.0
github.com/libp2p/go-libp2p-kbucket v0.2.3
Expand Down
144 changes: 0 additions & 144 deletions notif.go

This file was deleted.

4 changes: 2 additions & 2 deletions notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ func TestNotifieeMultipleConn(t *testing.T) {
d1 := setupDHT(ctx, t, false)
d2 := setupDHT(ctx, t, false)

nn1 := (*netNotifiee)(d1)
nn2 := (*netNotifiee)(d2)
nn1 := (*subscriberNotifee)(d1)
nn2 := (*subscriberNotifee)(d2)

connect(t, ctx, d1, d2)
c12 := d1.host.Network().ConnsToPeer(d2.self)[0]
Expand Down
Loading

0 comments on commit 509c0bc

Please sign in to comment.