From 0dcbc3aa141fd756e00177fbabc9648a9c103e49 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Fri, 7 Feb 2020 13:06:37 +0530 Subject: [PATCH] simple refactor & tests for Dht mode changes & protocol changes --- dht.go | 22 +++------------ subscriber_notifee.go | 66 +++++++++++++++++++++++++++++++++---------- 2 files changed, 55 insertions(+), 33 deletions(-) diff --git a/dht.go b/dht.go index f7bf19705..edde31e3b 100644 --- a/dht.go +++ b/dht.go @@ -9,7 +9,6 @@ import ( "sync" "time" - "github.com/libp2p/go-eventbus" "github.com/libp2p/go-libp2p-core/event" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" @@ -125,17 +124,11 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er // register for network notifs. dht.host.Network().Notify(subnot) - dht.proc = goprocessctx.WithContextAndTeardown(ctx, func() error { - // remove ourselves from network notifs. - dht.host.Network().StopNotify((*subscriberNotifee)(dht)) + dht.proc = goprocessctx.WithContext(ctx) - if dht.subscriptions.evtPeerIdentification != nil { - _ = dht.subscriptions.evtPeerIdentification.Close() - } - return nil - }) - - dht.proc.AddChild(subnot.Process(ctx)) + // register for network notifs. + dht.proc.Go((*subscriberNotifee)(dht).subscribe) + // handle providers dht.proc.AddChild(dht.providers.Process()) dht.Validator = cfg.Validator @@ -203,13 +196,6 @@ func makeDHT(ctx context.Context, h host.Host, cfg opts.Options) *IpfsDHT { triggerRtRefresh: make(chan chan<- error), } - var err error - evts := []interface{}{&event.EvtPeerIdentificationCompleted{}, &event.EvtPeerIdentificationFailed{}} - dht.subscriptions.evtPeerIdentification, err = h.EventBus().Subscribe(evts, eventbus.BufSize(256)) - if err != nil { - logger.Errorf("dht not subscribed to peer identification events; things will fail; err: %s", err) - } - dht.ctx = dht.newContextWithLocalTags(ctx) return dht diff --git a/subscriber_notifee.go b/subscriber_notifee.go index d2bf136ed..5f373cc5f 100644 --- a/subscriber_notifee.go +++ b/subscriber_notifee.go @@ -1,12 +1,12 @@ package dht import ( - "context" - "github.com/jbenet/goprocess" - goprocessctx "github.com/jbenet/goprocess/context" "github.com/libp2p/go-libp2p-core/event" "github.com/libp2p/go-libp2p-core/network" + + "github.com/libp2p/go-eventbus" + ma "github.com/multiformats/go-multiaddr" ) @@ -19,27 +19,52 @@ func (nn *subscriberNotifee) DHT() *IpfsDHT { return (*IpfsDHT)(nn) } -func (nn *subscriberNotifee) Process(ctx context.Context) goprocess.Process { - proc := goprocessctx.WithContext(ctx) - proc.Go(nn.subscribe) - return proc -} - func (nn *subscriberNotifee) subscribe(proc goprocess.Process) { dht := nn.DHT() + + dht.host.Network().Notify(nn) + defer dht.host.Network().StopNotify(nn) + + var err error + evts := []interface{}{ + &event.EvtPeerIdentificationCompleted{}, + } + + // subscribe to the EvtPeerIdentificationCompleted event which notifies us every time a peer successfully completes identification + sub, err := dht.host.EventBus().Subscribe(evts, eventbus.BufSize(256)) + if err != nil { + logger.Errorf("dht not subscribed to peer identification events; things will fail; err: %s", err) + } + defer sub.Close() + for { select { - case evt, more := <-dht.subscriptions.evtPeerIdentification.Out(): + case evt, more := <-sub.Out(): + // we will not be getting any more events if !more { return } - switch ev := evt.(type) { - case event.EvtPeerIdentificationCompleted: - protos, err := dht.peerstore.SupportsProtocols(ev.Peer, dht.protocolStrs()...) - if err == nil && len(protos) != 0 { - dht.Update(dht.ctx, ev.Peer) + + // something has gone really wrong if we get an event for another type + ev, ok := evt.(event.EvtPeerIdentificationCompleted) + if !ok { + logger.Errorf("got wrong type from subscription: %T", ev) + return + } + + // if the peer supports the DHT protocol, add it to our RT and kick a refresh if needed + protos, err := dht.peerstore.SupportsProtocols(ev.Peer, dht.protocolStrs()...) + if err == nil && len(protos) != 0 { + refresh := dht.routingTable.Size() <= minRTRefreshThreshold + dht.Update(dht.ctx, ev.Peer) + if refresh && dht.autoRefresh { + select { + case dht.triggerRtRefresh <- nil: + default: + } } } + case <-proc.Closing(): return } @@ -67,6 +92,17 @@ func (nn *subscriberNotifee) Disconnected(n network.Network, v network.Conn) { dht.routingTable.Remove(p) + if dht.routingTable.Size() < minRTRefreshThreshold { + // TODO: Actively bootstrap. For now, just try to add the currently connected peers. + for _, p := range dht.host.Network().Peers() { + // Don't bother probing, we do that on connect. + protos, err := dht.peerstore.SupportsProtocols(p, dht.protocolStrs()...) + if err == nil && len(protos) != 0 { + dht.Update(dht.Context(), p) + } + } + } + dht.smlk.Lock() defer dht.smlk.Unlock() ms, ok := dht.strmap[p]