Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consume identify events to evaluate routing table addition. #365

Merged
merged 6 commits into from
Jun 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 29 additions & 10 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/libp2p/go-eventbus"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
Expand Down Expand Up @@ -82,6 +83,10 @@ type IpfsDHT struct {
modeLk sync.Mutex

bucketSize int

subscriptions struct {
evtPeerIdentification event.Subscription
}
}

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
Expand All @@ -103,17 +108,24 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
}
dht := makeDHT(ctx, h, cfg.Datastore, cfg.Protocols, cfg.BucketSize)

subnot := (*subscriberNotifee)(dht)

// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))
dht.host.Network().Notify(subnot)

go dht.handleProtocolChanges(ctx)

dht.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
// remove ourselves from network notifs.
dht.host.Network().StopNotify((*netNotifiee)(dht))
dht.host.Network().StopNotify((*subscriberNotifee)(dht))

if dht.subscriptions.evtPeerIdentification != nil {
_ = dht.subscriptions.evtPeerIdentification.Close()
}
return nil
})

dht.proc.AddChild(subnot.Process(ctx))
dht.proc.AddChild(dht.providers.Process())
dht.Validator = cfg.Validator
dht.mode = ModeClient
Expand Down Expand Up @@ -174,14 +186,20 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
bucketSize: bucketSize,
}

var err error
evts := []interface{}{&event.EvtPeerIdentificationCompleted{}, &event.EvtPeerIdentificationFailed{}}
dht.subscriptions.evtPeerIdentification, err = h.EventBus().Subscribe(evts, eventbus.BufSize(256))
if err != nil {
logger.Errorf("dht not subscribed to peer identification events; things will fail; err: %s", err)
}

dht.ctx = dht.newContextWithLocalTags(ctx)

return dht
}

// putValueToPeer stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {

pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
pmes.Record = rec
rpmes, err := dht.sendRequest(ctx, p, pmes)
Expand All @@ -205,7 +223,6 @@ var errInvalidRecord = errors.New("received invalid record")
// NOTE: It will update the dht's peerstore with any new addresses
// it finds for the given peer.
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*peer.AddrInfo, error) {

pmes, err := dht.getValueSingle(ctx, p, key)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -307,10 +324,11 @@ func (dht *IpfsDHT) Update(ctx context.Context, p peer.ID) {
}

func (dht *IpfsDHT) UpdateConn(ctx context.Context, c network.Conn) {
if dht.shouldAddPeerToRoutingTable(c) {
logger.Event(ctx, "updatePeer", c.RemotePeer())
dht.routingTable.Update(c.RemotePeer())
if !dht.shouldAddPeerToRoutingTable(c) {
return
}
logger.Event(ctx, "updatePeer", c.RemotePeer())
dht.routingTable.Update(c.RemotePeer())
}

func (dht *IpfsDHT) shouldAddPeerToRoutingTable(c network.Conn) bool {
Expand All @@ -323,7 +341,7 @@ func (dht *IpfsDHT) shouldAddPeerToRoutingTable(c network.Conn) bool {
}

ai := dht.host.Peerstore().PeerInfo(c.RemotePeer())
if dht.peerIsOnSameSubnet(c) {
if dht.isPeerLocallyConnected(c) {
// TODO: for now, we can't easily tell if the peer on our subnet
// is dialable or not, so don't discriminate.

Expand Down Expand Up @@ -369,8 +387,9 @@ func isRelayAddr(a ma.Multiaddr) bool {
return isRelay
}

func (dht *IpfsDHT) peerIsOnSameSubnet(c network.Conn) bool {
return manet.IsPrivateAddr(c.RemoteMultiaddr())
func (dht *IpfsDHT) isPeerLocallyConnected(c network.Conn) bool {
addr := c.RemoteMultiaddr()
return manet.IsPrivateAddr(addr) || manet.IsIPLoopback(addr)
}

// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
Expand Down
1 change: 0 additions & 1 deletion dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
}

func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) {

ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand Down
32 changes: 19 additions & 13 deletions ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import (

"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/routing"
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"

ggio "github.com/gogo/protobuf/io"
u "github.com/ipfs/go-ipfs-util"
Expand All @@ -18,32 +21,34 @@ import (
)

func TestGetFailures(t *testing.T) {
t.SkipNow()
if testing.Short() {
t.SkipNow()
}

ctx := context.Background()
mn, err := mocknet.FullMeshConnected(ctx, 2)
if err != nil {
t.Fatal(err)
}
hosts := mn.Hosts()

d, err := New(ctx, hosts[0])
host1 := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport))
host2 := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport))

d, err := New(ctx, host1)
if err != nil {
t.Fatal(err)
}

// TODO: replace with identify event bus event
time.Sleep(time.Millisecond * 100)
d.Update(ctx, hosts[1].ID())

// Reply with failures to every message
hosts[1].SetStreamHandler(d.protocols[0], func(s network.Stream) {
host2.SetStreamHandler(d.protocols[0], func(s network.Stream) {
time.Sleep(400 * time.Millisecond)
s.Close()
})

host1.Peerstore().AddAddrs(host2.ID(), host2.Addrs(), peerstore.ConnectedAddrTTL)
_, err = host1.Network().DialPeer(ctx, host2.ID())
if err != nil {
t.Fatal(err)
}
time.Sleep(1 * time.Second)

// This one should time out
ctx1, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()
Expand All @@ -62,7 +67,7 @@ func TestGetFailures(t *testing.T) {
t.Log("Timeout test passed.")

// Reply with failures to every message
hosts[1].SetStreamHandler(d.protocols[0], func(s network.Stream) {
host2.SetStreamHandler(d.protocols[0], func(s network.Stream) {
defer s.Close()

pbr := ggio.NewDelimitedReader(s, network.MessageSizeMax)
Expand Down Expand Up @@ -114,7 +119,7 @@ func TestGetFailures(t *testing.T) {
Record: rec,
}

s, err := hosts[1].NewStream(context.Background(), hosts[0].ID(), d.protocols[0])
s, err := host2.NewStream(context.Background(), host1.ID(), d.protocols[0])
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -290,6 +295,7 @@ func TestLessThanKResponses(t *testing.T) {

// Test multiple queries against a node that closes its stream after every query.
func TestMultipleQueries(t *testing.T) {
t.SkipNow()
if testing.Short() {
t.SkipNow()
}
Expand Down
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ require (
github.com/ipfs/go-log v0.0.1
github.com/ipfs/go-todocounter v0.0.1
github.com/jbenet/goprocess v0.1.3
github.com/libp2p/go-libp2p v0.2.0
github.com/libp2p/go-eventbus v0.0.2
github.com/libp2p/go-libp2p v0.1.3-0.20190626170235-f299d252e778
github.com/libp2p/go-libp2p-circuit v0.1.0
github.com/libp2p/go-libp2p-core v0.0.6
github.com/libp2p/go-libp2p-core v0.0.7-0.20190626134135-aca080dccfc2
github.com/libp2p/go-libp2p-kbucket v0.2.0
github.com/libp2p/go-libp2p-peerstore v0.1.1
github.com/libp2p/go-libp2p-peerstore v0.1.2-0.20190621130618-cfa9bb890c1a
github.com/libp2p/go-libp2p-record v0.1.0
github.com/libp2p/go-libp2p-routing v0.1.0
github.com/libp2p/go-libp2p-swarm v0.1.0
Expand Down
Loading