From 7ac1f6b435b2f39ba1c97ea1c19d4643854aca49 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Tue, 7 Apr 2020 15:15:06 -0700 Subject: [PATCH 01/10] Dual DHT scaffold --- dual/dual.go | 176 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 176 insertions(+) create mode 100644 dual/dual.go diff --git a/dual/dual.go b/dual/dual.go new file mode 100644 index 000000000..0480401c4 --- /dev/null +++ b/dual/dual.go @@ -0,0 +1,176 @@ +// Package dual provides an implementaiton of a split or "dual" dht, where two parallel instances +// are maintained for the global internet and the local LAN respectively. +package dual + +import ( + "context" + "fmt" + "sync" + + "github.com/ipfs/go-cid" + ci "github.com/libp2p/go-libp2p-core/crypto" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/routing" + dht "github.com/libp2p/go-libp2p-kad-dht" +) + +// DHT implements the routing interface to provide two concrete DHT implementationts for use +// in IPFS that are used to support both global network users and disjoint LAN usecases. +type DHT struct { + WAN *dht.IpfsDHT + LAN *dht.IpfsDHT +} + +// Assert that IPFS assumptions about interfaces aren't broken. These aren't a +// guarantee, but we can use them to aid refactoring. +var ( + _ routing.ContentRouting = (*DHT)(nil) + _ routing.Routing = (*DHT)(nil) + _ routing.PeerRouting = (*DHT)(nil) + _ routing.PubKeyFetcher = (*DHT)(nil) + _ routing.ValueStore = (*DHT)(nil) +) + +// NewDHT creates a new DualDHT instance. Options provided are forwarded on to the two concrete +// IpfsDHT internal constructions, modulo additional options used by the Dual DHT to enforce +// the LAN-vs-WAN distinction. +func NewDHT(ctx context.Context, h host.Host, options ...dht.Option) (*DHT, error) { + wanOpts := append(options, + dht.ProtocolPrefix(dht.DefaultPrefix), + dht.QueryFilter(dht.PublicQueryFilter), + dht.RoutingTableFilter(dht.PublicRoutingTableFilter), + ) + wan, err := dht.New(ctx, h, wanOpts...) + if err != nil { + return nil, err + } + + lanOpts := append(options, + dht.ProtocolPrefix(dht.DefaultPrefix+"/lan"), + dht.QueryFilter(dht.PrivateQueryFilter), + dht.RoutingTableFilter(dht.PrivateRoutingTableFilter), + ) + lan, err := dht.New(ctx, h, lanOpts...) + if err != nil { + return nil, err + } + + impl := DHT{wan, lan} + return &impl, nil +} + +func (dht *DHT) activeWAN() bool { + wanPeers := dht.WAN.RoutingTable().ListPeers() + return len(wanPeers) > 0 +} + +// Provide adds the given cid to the content routing system. +func (dht *DHT) Provide(ctx context.Context, key cid.Cid, announce bool) error { + if dht.activeWAN() { + return dht.WAN.Provide(ctx, key, announce) + } + return dht.LAN.Provide(ctx, key, announce) +} + +// FindProvidersAsync searches for peers who are able to provide a given key +func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo { + if dht.activeWAN() { + return dht.WAN.FindProvidersAsync(ctx, key, count) + } + return dht.LAN.FindProvidersAsync(ctx, key, count) +} + +// FindPeer searches for a peer with given ID +func (dht *DHT) FindPeer(ctx context.Context, pid peer.ID) (peer.AddrInfo, error) { + // TODO: should these be run in parallel? + infoa, erra := dht.WAN.FindPeer(ctx, pid) + infob, errb := dht.LAN.FindPeer(ctx, pid) + return peer.AddrInfo{ + ID: pid, + Addrs: append(infoa.Addrs, infob.Addrs...), + }, mergeErrors(erra, errb) +} + +func mergeErrors(a, b error) error { + if a == nil && b == nil { + return nil + } else if a != nil && b != nil { + return fmt.Errorf("%v, %v", a, b) + } else if a != nil { + return a + } + return b +} + +// Bootstrap allows callers to hint to the routing system to get into a +// Boostrapped state and remain there. +func (dht *DHT) Bootstrap(ctx context.Context) error { + erra := dht.WAN.Bootstrap(ctx) + errb := dht.LAN.Bootstrap(ctx) + return mergeErrors(erra, errb) +} + +// PutValue adds value corresponding to given Key. +func (dht *DHT) PutValue(ctx context.Context, key string, val []byte, opts ...routing.Option) error { + if dht.activeWAN() { + return dht.WAN.PutValue(ctx, key, val, opts...) + } + return dht.LAN.PutValue(ctx, key, val, opts...) +} + +// GetValue searches for the value corresponding to given Key. +func (dht *DHT) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) { + vala, erra := dht.WAN.GetValue(ctx, key, opts...) + if vala != nil { + return vala, erra + } + valb, errb := dht.LAN.GetValue(ctx, key, opts...) + return valb, mergeErrors(erra, errb) +} + +// SearchValue searches for better values from this value +func (dht *DHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) { + streama, erra := dht.WAN.SearchValue(ctx, key, opts...) + streamb, errb := dht.WAN.SearchValue(ctx, key, opts...) + if erra == nil && errb == nil { + combinedStream := make(chan []byte) + var combinedWg sync.WaitGroup + combinedWg.Add(2) + go func(out chan []byte) { + for itm := range streama { + out <- itm + } + combinedWg.Done() + }(combinedStream) + go func(out chan []byte) { + for itm := range streamb { + out <- itm + } + combinedWg.Done() + }(combinedStream) + go func() { + combinedWg.Wait() + close(combinedStream) + }() + return combinedStream, nil + } else if erra == nil { + return streama, nil + } else if errb == nil { + return streamb, nil + } + return nil, mergeErrors(erra, errb) +} + +// GetPublicKey returns the public key for the given peer. +func (dht *DHT) GetPublicKey(ctx context.Context, pid peer.ID) (ci.PubKey, error) { + pka, erra := dht.WAN.GetPublicKey(ctx, pid) + if erra == nil { + return pka, nil + } + pkb, errb := dht.LAN.GetPublicKey(ctx, pid) + if errb == nil { + return pkb, nil + } + return nil, mergeErrors(erra, errb) +} From 5042d4b7393f4562ef4f50dc357a5745635727b8 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Tue, 7 Apr 2020 20:41:45 -0700 Subject: [PATCH 02/10] address comments, scaffold parallelism --- dht.go | 12 +-- dht_options.go | 14 +++- dual/dual.go | 183 +++++++++++++++++++++++++++++++++++------- subscriber_notifee.go | 12 ++- 4 files changed, 179 insertions(+), 42 deletions(-) diff --git a/dht.go b/dht.go index c5676d39b..2385aa1e9 100644 --- a/dht.go +++ b/dht.go @@ -84,7 +84,7 @@ type IpfsDHT struct { // DHT protocols we can respond to. serverProtocols []protocol.ID - auto bool + auto ModeOpt mode mode modeLk sync.Mutex @@ -157,15 +157,11 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) dht.Validator = cfg.validator + dht.auto = cfg.mode switch cfg.mode { - case ModeAuto: - dht.auto = true + case ModeAuto, ModeClient: dht.mode = modeClient - case ModeClient: - dht.auto = false - dht.mode = modeClient - case ModeServer: - dht.auto = false + case ModeAutoServer, ModeServer: dht.mode = modeServer default: return nil, fmt.Errorf("invalid dht mode %d", cfg.mode) diff --git a/dht_options.go b/dht_options.go index 1cdde18fb..185d466f4 100644 --- a/dht_options.go +++ b/dht_options.go @@ -2,9 +2,10 @@ package dht import ( "fmt" - "github.com/ipfs/go-ipns" "time" + "github.com/ipfs/go-ipns" + ds "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" "github.com/libp2p/go-libp2p-core/host" @@ -25,6 +26,8 @@ const ( ModeClient // ModeServer operates the DHT as a server, it can both send and respond to queries ModeServer + // ModeAutoServer operates in the same way as ModeAuto, but acts as a server when reachability is unknown + ModeAutoServer ) const DefaultPrefix protocol.ID = "/ipfs" @@ -255,6 +258,15 @@ func ProtocolPrefix(prefix protocol.ID) Option { } } +// ProtocolExtension adds an application specific protocol to the DHT protocol. For example, +// /ipfs/lan/kad/1.0.0 instead of /ipfs/kad/1.0.0. extension should be of the form /lan. +func ProtocolExtension(ext protocol.ID) Option { + return func(c *config) error { + c.protocolPrefix += ext + return nil + } +} + // BucketSize configures the bucket size (k in the Kademlia paper) of the routing table. // // The default value is 20. diff --git a/dual/dual.go b/dual/dual.go index 0480401c4..51b5fc98b 100644 --- a/dual/dual.go +++ b/dual/dual.go @@ -11,6 +11,7 @@ import ( ci "github.com/libp2p/go-libp2p-core/crypto" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p-core/routing" dht "github.com/libp2p/go-libp2p-kad-dht" ) @@ -22,6 +23,9 @@ type DHT struct { LAN *dht.IpfsDHT } +// DefaultLanExtension is used to differentiate local protocol requests from those on the WAN DHT. +const DefaultLanExtension protocol.ID = "/lan" + // Assert that IPFS assumptions about interfaces aren't broken. These aren't a // guarantee, but we can use them to aid refactoring. var ( @@ -32,12 +36,13 @@ var ( _ routing.ValueStore = (*DHT)(nil) ) -// NewDHT creates a new DualDHT instance. Options provided are forwarded on to the two concrete +// New creates a new DualDHT instance. Options provided are forwarded on to the two concrete // IpfsDHT internal constructions, modulo additional options used by the Dual DHT to enforce // the LAN-vs-WAN distinction. -func NewDHT(ctx context.Context, h host.Host, options ...dht.Option) (*DHT, error) { +// Note: query or routing table functional options provided as arguments to this function +// will be overriden by this constructor. +func New(ctx context.Context, h host.Host, options ...dht.Option) (*DHT, error) { wanOpts := append(options, - dht.ProtocolPrefix(dht.DefaultPrefix), dht.QueryFilter(dht.PublicQueryFilter), dht.RoutingTableFilter(dht.PublicRoutingTableFilter), ) @@ -46,8 +51,11 @@ func NewDHT(ctx context.Context, h host.Host, options ...dht.Option) (*DHT, erro return nil, err } - lanOpts := append(options, - dht.ProtocolPrefix(dht.DefaultPrefix+"/lan"), + // Unless overridden by user supplied options, the LAN DHT should default + // to 'AutoServer' mode. + lanOpts := append([]dht.Option{dht.Mode(dht.ModeAutoServer)}, options...) + lanOpts = append(lanOpts, + dht.ProtocolExtension(DefaultLanExtension), dht.QueryFilter(dht.PrivateQueryFilter), dht.RoutingTableFilter(dht.PrivateRoutingTableFilter), ) @@ -61,8 +69,7 @@ func NewDHT(ctx context.Context, h host.Host, options ...dht.Option) (*DHT, erro } func (dht *DHT) activeWAN() bool { - wanPeers := dht.WAN.RoutingTable().ListPeers() - return len(wanPeers) > 0 + return dht.WAN.RoutingTable().Size() > 0 } // Provide adds the given cid to the content routing system. @@ -75,21 +82,72 @@ func (dht *DHT) Provide(ctx context.Context, key cid.Cid, announce bool) error { // FindProvidersAsync searches for peers who are able to provide a given key func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo { - if dht.activeWAN() { - return dht.WAN.FindProvidersAsync(ctx, key, count) - } - return dht.LAN.FindProvidersAsync(ctx, key, count) + reqCtx, cancel := context.WithCancel(ctx) + outCh := make(chan peer.AddrInfo) + wanCh := dht.WAN.FindProvidersAsync(reqCtx, key, count) + lanCh := dht.LAN.FindProvidersAsync(reqCtx, key, count) + go func() { + defer cancel() + defer close(outCh) + + found := make(map[peer.ID]struct{}, count) + nch := 2 + var pi peer.AddrInfo + for nch > 0 && count > 0 { + var ok bool + select { + case pi, ok = <-wanCh: + if !ok { + wanCh = nil + nch-- + continue + } + case pi, ok = <-lanCh: + if !ok { + lanCh = nil + nch-- + continue + } + } + // already found + if _, ok = found[pi.ID]; ok { + continue + } + + select { + case outCh <- pi: + found[pi.ID] = struct{}{} + count-- + case <-ctx.Done(): + return + } + } + }() + return outCh } // FindPeer searches for a peer with given ID +// Note: with signed peer records, we can change this to short circuit once either DHT returns. func (dht *DHT) FindPeer(ctx context.Context, pid peer.ID) (peer.AddrInfo, error) { - // TODO: should these be run in parallel? - infoa, erra := dht.WAN.FindPeer(ctx, pid) - infob, errb := dht.LAN.FindPeer(ctx, pid) + var wg sync.WaitGroup + wg.Add(2) + var wanInfo, lanInfo peer.AddrInfo + var wanErr, lanErr error + go func() { + defer wg.Done() + wanInfo, wanErr = dht.WAN.FindPeer(ctx, pid) + }() + go func() { + defer wg.Done() + lanInfo, lanErr = dht.LAN.FindPeer(ctx, pid) + }() + + wg.Wait() + return peer.AddrInfo{ ID: pid, - Addrs: append(infoa.Addrs, infob.Addrs...), - }, mergeErrors(erra, errb) + Addrs: append(wanInfo.Addrs, lanInfo.Addrs...), + }, mergeErrors(wanErr, lanErr) } func mergeErrors(a, b error) error { @@ -120,13 +178,47 @@ func (dht *DHT) PutValue(ctx context.Context, key string, val []byte, opts ...ro } // GetValue searches for the value corresponding to given Key. -func (dht *DHT) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) { - vala, erra := dht.WAN.GetValue(ctx, key, opts...) - if vala != nil { - return vala, erra +func (d *DHT) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) { + reqCtx, cncl := context.WithCancel(ctx) + defer cncl() + + resChan := make(chan []byte) + defer close(resChan) + errChan := make(chan error) + defer close(errChan) + runner := func(impl *dht.IpfsDHT, valCh chan []byte, errCh chan error) { + val, err := impl.GetValue(reqCtx, key, opts...) + if err != nil { + errCh <- err + return + } + valCh <- val + } + go runner(d.WAN, resChan, errChan) + go runner(d.LAN, resChan, errChan) + + var err error + var val []byte + select { + case val = <-resChan: + cncl() + case err = <-errChan: + } + + // Drain or wait for the slower runner + select { + case secondVal := <-resChan: + if val == nil { + val = secondVal + } + case secondErr := <-errChan: + if err != nil { + err = mergeErrors(err, secondErr) + } else if val == nil { + err = secondErr + } } - valb, errb := dht.LAN.GetValue(ctx, key, opts...) - return valb, mergeErrors(erra, errb) + return val, err } // SearchValue searches for better values from this value @@ -163,14 +255,45 @@ func (dht *DHT) SearchValue(ctx context.Context, key string, opts ...routing.Opt } // GetPublicKey returns the public key for the given peer. -func (dht *DHT) GetPublicKey(ctx context.Context, pid peer.ID) (ci.PubKey, error) { - pka, erra := dht.WAN.GetPublicKey(ctx, pid) - if erra == nil { - return pka, nil +func (d *DHT) GetPublicKey(ctx context.Context, pid peer.ID) (ci.PubKey, error) { + reqCtx, cncl := context.WithCancel(ctx) + defer cncl() + + resChan := make(chan ci.PubKey) + defer close(resChan) + errChan := make(chan error) + defer close(errChan) + runner := func(impl *dht.IpfsDHT, valCh chan ci.PubKey, errCh chan error) { + val, err := impl.GetPublicKey(reqCtx, pid) + if err != nil { + errCh <- err + return + } + valCh <- val } - pkb, errb := dht.LAN.GetPublicKey(ctx, pid) - if errb == nil { - return pkb, nil + go runner(d.WAN, resChan, errChan) + go runner(d.LAN, resChan, errChan) + + var err error + var val ci.PubKey + select { + case val = <-resChan: + cncl() + case err = <-errChan: } - return nil, mergeErrors(erra, errb) + + // Drain or wait for the slower runner + select { + case secondVal := <-resChan: + if val == nil { + val = secondVal + } + case secondErr := <-errChan: + if err != nil { + err = mergeErrors(err, secondErr) + } else if val == nil { + err = secondErr + } + } + return val, err } diff --git a/subscriber_notifee.go b/subscriber_notifee.go index 18cb045e0..2f5492fc7 100644 --- a/subscriber_notifee.go +++ b/subscriber_notifee.go @@ -41,7 +41,7 @@ func newSubscriberNotifiee(dht *IpfsDHT) (*subscriberNotifee, error) { // register for event bus local routability changes in order to trigger switching between client and server modes // only register for events if the DHT is operating in ModeAuto - if dht.auto { + if dht.auto == ModeAuto || dht.auto == ModeAutoServer { evts = append(evts, new(event.EvtLocalReachabilityChanged)) } @@ -96,7 +96,7 @@ func (nn *subscriberNotifee) subscribe(proc goprocess.Process) { case event.EvtPeerIdentificationCompleted: handlePeerIdentificationCompletedEvent(dht, evt) case event.EvtLocalReachabilityChanged: - if dht.auto { + if dht.auto == ModeAuto || dht.auto == ModeAutoServer { handleLocalReachabilityChangedEvent(dht, evt) } else { // something has gone really wrong if we get an event we did not subscribe to @@ -150,8 +150,14 @@ func handleLocalReachabilityChangedEvent(dht *IpfsDHT, e event.EvtLocalReachabil var target mode switch e.Reachability { - case network.ReachabilityPrivate, network.ReachabilityUnknown: + case network.ReachabilityPrivate: target = modeClient + case network.ReachabilityUnknown: + if dht.auto == ModeAutoServer { + target = modeServer + } else { + target = modeClient + } case network.ReachabilityPublic: target = modeServer } From 0793dcde0fb04343032c85986b80bfcf7acc4a59 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Wed, 8 Apr 2020 15:23:01 -0700 Subject: [PATCH 03/10] initial testing support --- dht.go | 5 ++ dual/dual.go | 11 ++- dual/dual_test.go | 223 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 237 insertions(+), 2 deletions(-) create mode 100644 dual/dual_test.go diff --git a/dht.go b/dht.go index 2385aa1e9..38d15eaad 100644 --- a/dht.go +++ b/dht.go @@ -306,6 +306,11 @@ func makeRoutingTable(dht *IpfsDHT, cfg config) (*kb.RoutingTable, error) { return rt, err } +// Mode allows introspection of the operation mode of the DHT +func (dht *IpfsDHT) Mode() ModeOpt { + return dht.auto +} + // fixLowPeers tries to get more peers into the routing table if we're below the threshold func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) { for { diff --git a/dual/dual.go b/dual/dual.go index 51b5fc98b..cee816c09 100644 --- a/dual/dual.go +++ b/dual/dual.go @@ -53,12 +53,14 @@ func New(ctx context.Context, h host.Host, options ...dht.Option) (*DHT, error) // Unless overridden by user supplied options, the LAN DHT should default // to 'AutoServer' mode. - lanOpts := append([]dht.Option{dht.Mode(dht.ModeAutoServer)}, options...) - lanOpts = append(lanOpts, + lanOpts := append(options, dht.ProtocolExtension(DefaultLanExtension), dht.QueryFilter(dht.PrivateQueryFilter), dht.RoutingTableFilter(dht.PrivateRoutingTableFilter), ) + if wan.Mode() != dht.ModeClient { + lanOpts = append(lanOpts, dht.Mode(dht.ModeServer)) + } lan, err := dht.New(ctx, h, lanOpts...) if err != nil { return nil, err @@ -68,6 +70,11 @@ func New(ctx context.Context, h host.Host, options ...dht.Option) (*DHT, error) return &impl, nil } +// Close closes the DHT context. +func (dht *DHT) Close() error { + return mergeErrors(dht.WAN.Close(), dht.LAN.Close()) +} + func (dht *DHT) activeWAN() bool { return dht.WAN.RoutingTable().Size() > 0 } diff --git a/dual/dual_test.go b/dual/dual_test.go new file mode 100644 index 000000000..175eb9096 --- /dev/null +++ b/dual/dual_test.go @@ -0,0 +1,223 @@ +package dual + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + "github.com/ipfs/go-cid" + u "github.com/ipfs/go-ipfs-util" + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + dht "github.com/libp2p/go-libp2p-kad-dht" + peerstore "github.com/libp2p/go-libp2p-peerstore" + swarmt "github.com/libp2p/go-libp2p-swarm/testing" + bhost "github.com/libp2p/go-libp2p/p2p/host/basic" +) + +var wancid, lancid cid.Cid + +func init() { + wancid = cid.NewCidV1(cid.DagCBOR, u.Hash([]byte("wan cid -- value"))) + lancid = cid.NewCidV1(cid.DagCBOR, u.Hash([]byte("lan cid -- value"))) +} + +type blankValidator struct{} + +func (blankValidator) Validate(_ string, _ []byte) error { return nil } +func (blankValidator) Select(_ string, _ [][]byte) (int, error) { return 0, nil } + +type customRtHelper struct { + allow peer.ID +} + +func MkFilterForPeer() (func(d *dht.IpfsDHT, conns []network.Conn) bool, *customRtHelper) { + helper := customRtHelper{} + f := func(_ *dht.IpfsDHT, conns []network.Conn) bool { + for _, c := range conns { + if c.RemotePeer() == helper.allow { + fmt.Fprintf(os.Stderr, "allowed conn per filter\n") + return true + } + } + fmt.Fprintf(os.Stderr, "rejected conn per rt filter\n") + return false + } + return f, &helper +} + +func setupDHTWithFilters(ctx context.Context, t *testing.T, options ...dht.Option) (*DHT, []*customRtHelper) { + h := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)) + + wanFilter, wanRef := MkFilterForPeer() + wanOpts := []dht.Option{ + dht.NamespacedValidator("v", blankValidator{}), + dht.ProtocolPrefix("/test"), + dht.DisableAutoRefresh(), + dht.RoutingTableFilter(wanFilter), + } + wan, err := dht.New(ctx, h, wanOpts...) + if err != nil { + t.Fatal(err) + } + + lanFilter, lanRef := MkFilterForPeer() + lanOpts := []dht.Option{ + dht.NamespacedValidator("v", blankValidator{}), + dht.ProtocolPrefix("/test"), + dht.ProtocolExtension(DefaultLanExtension), + dht.DisableAutoRefresh(), + dht.RoutingTableFilter(lanFilter), + dht.Mode(dht.ModeServer), + } + lan, err := dht.New(ctx, h, lanOpts...) + if err != nil { + t.Fatal(err) + } + + impl := DHT{wan, lan} + return &impl, []*customRtHelper{wanRef, lanRef} +} + +func setupDHT(ctx context.Context, t *testing.T, options ...dht.Option) *DHT { + t.Helper() + baseOpts := []dht.Option{ + dht.NamespacedValidator("v", blankValidator{}), + dht.ProtocolPrefix("/test"), + dht.DisableAutoRefresh(), + } + + d, err := New( + ctx, + bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), + append(baseOpts, options...)..., + ) + if err != nil { + t.Fatal(err) + } + return d +} + +func connect(ctx context.Context, t *testing.T, a, b *dht.IpfsDHT) { + t.Helper() + bid := b.PeerID() + baddr := b.Host().Peerstore().Addrs(bid) + if len(baddr) == 0 { + t.Fatal("no addresses for connection.") + } + a.Host().Peerstore().AddAddrs(bid, baddr, peerstore.TempAddrTTL) + fmt.Fprintf(os.Stderr, "gonna connect.\n") + if err := a.Host().Connect(ctx, peer.AddrInfo{ID: bid}); err != nil { + t.Fatal(err) + } + fmt.Fprintf(os.Stderr, "gonn wait\n") + wait(ctx, t, a, b) +} + +func wait(ctx context.Context, t *testing.T, a, b *dht.IpfsDHT) { + t.Helper() + for a.RoutingTable().Find(b.PeerID()) == "" { + //fmt.Fprintf(os.Stderr, "%v\n", a.RoutingTable().GetPeerInfos()) + select { + case <-ctx.Done(): + t.Fatal(ctx.Err()) + case <-time.After(time.Millisecond * 5): + } + } +} + +func setupTier(ctx context.Context, t *testing.T) (*DHT, *dht.IpfsDHT, *dht.IpfsDHT) { + t.Helper() + baseOpts := []dht.Option{ + dht.NamespacedValidator("v", blankValidator{}), + dht.ProtocolPrefix("/test"), + dht.DisableAutoRefresh(), + } + + d, hlprs := setupDHTWithFilters(ctx, t) + + wan, err := dht.New( + ctx, + bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), + append(baseOpts, dht.Mode(dht.ModeServer))..., + ) + if err != nil { + t.Fatal(err) + } + hlprs[0].allow = wan.PeerID() + connect(ctx, t, wan, d.WAN) + + lan, err := dht.New( + ctx, + bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), + append(baseOpts, dht.Mode(dht.ModeServer), dht.ProtocolExtension("/lan"))..., + ) + if err != nil { + t.Fatal(err) + } + hlprs[1].allow = lan.PeerID() + connect(ctx, t, d.LAN, lan) + + return d, wan, lan +} + +func TestDualModes(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + d := setupDHT(ctx, t) + defer d.Close() + + if d.WAN.Mode() != dht.ModeAuto { + t.Fatal("wrong default mode for wan") + } else if d.LAN.Mode() != dht.ModeServer { + t.Fatal("wrong default mode for lan") + } + + d2 := setupDHT(ctx, t, dht.Mode(dht.ModeClient)) + defer d2.Close() + if d2.WAN.Mode() != dht.ModeClient || + d2.LAN.Mode() != dht.ModeClient { + t.Fatal("wrong client mode operation") + } +} + +func TestFindProviderAsync(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + d, wan, lan := setupTier(ctx, t) + defer d.Close() + defer wan.Close() + defer lan.Close() + + if err := wan.Provide(ctx, wancid, true); err != nil { + t.Fatal(err) + } + + if err := lan.Provide(ctx, lancid, true); err != nil { + t.Fatal(err) + } + + wpc := d.FindProvidersAsync(ctx, wancid, 1) + select { + case p := <-wpc: + if p.ID != wan.PeerID() { + t.Fatal("wrong wan provider") + } + case <-ctx.Done(): + t.Fatal("find provider timeout.") + } + + lpc := d.FindProvidersAsync(ctx, lancid, 1) + select { + case p := <-lpc: + if p.ID != lan.PeerID() { + t.Fatal("wrong lan provider") + } + case <-ctx.Done(): + t.Fatal("find provider timeout.") + } +} From df4c1d82488391293c8a9688e6422fde80caade5 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Wed, 8 Apr 2020 15:57:38 -0700 Subject: [PATCH 04/10] additional tests --- dual/dual_test.go | 71 +++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 69 insertions(+), 2 deletions(-) diff --git a/dual/dual_test.go b/dual/dual_test.go index 175eb9096..e863928fc 100644 --- a/dual/dual_test.go +++ b/dual/dual_test.go @@ -147,7 +147,7 @@ func setupTier(ctx context.Context, t *testing.T) (*DHT, *dht.IpfsDHT, *dht.Ipfs t.Fatal(err) } hlprs[0].allow = wan.PeerID() - connect(ctx, t, wan, d.WAN) + connect(ctx, t, d.WAN, wan) lan, err := dht.New( ctx, @@ -193,7 +193,7 @@ func TestFindProviderAsync(t *testing.T) { defer wan.Close() defer lan.Close() - if err := wan.Provide(ctx, wancid, true); err != nil { + if err := wan.Provide(ctx, wancid, false); err != nil { t.Fatal(err) } @@ -221,3 +221,70 @@ func TestFindProviderAsync(t *testing.T) { t.Fatal("find provider timeout.") } } + +func TestValueGetSet(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + d, wan, lan := setupTier(ctx, t) + defer d.Close() + defer wan.Close() + defer lan.Close() + + err := d.PutValue(ctx, "/v/hello", []byte("world")) + if err != nil { + t.Fatal(err) + } + val, err := wan.GetValue(ctx, "/v/hello") + if err != nil { + t.Fatal(err) + } + if string(val) != "world" { + t.Fatal("failed to get expected string.") + } + + val, err = lan.GetValue(ctx, "/v/hello") + if err == nil { + t.Fatal(err) + } +} + +func TestSearchValue(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + d, wan, lan := setupTier(ctx, t) + defer d.Close() + defer wan.Close() + defer lan.Close() + + err := wan.PutValue(ctx, "/v/hello", []byte("world"), ) + + valCh, err := d.SearchValue(ctx, "/v/hello", dht.Quorum(0)) + if err != nil { + t.Fatal(err) + } + + select { + case v := <-valCh: + if string(v) != "world" { + t.Errorf("expected 'world', got '%s'", string(v)) + } + case <-ctx.Done(): + t.Fatal(ctx.Err()) + } + + err = lan.PutValue(ctx, "/v/hello", []byte("newer")) + if err != nil { + t.Error(err) + } + + select { + case v := <-valCh: + if string(v) != "newer" { + t.Errorf("expected 'newer', got '%s'", string(v)) + } + case <-ctx.Done(): + t.Fatal(ctx.Err()) + } +} From 85d5de7524d40c5439d90401674150f90d70b663 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Wed, 8 Apr 2020 17:28:03 -0700 Subject: [PATCH 05/10] additional testing --- dht_test.go | 36 ++++++------------------------------ dual/dual.go | 32 +++----------------------------- dual/dual_test.go | 22 ++++++++++++++-------- go.mod | 2 +- go.sum | 3 +++ internal/testing/helper.go | 31 +++++++++++++++++++++++++++++++ 6 files changed, 58 insertions(+), 68 deletions(-) create mode 100644 internal/testing/helper.go diff --git a/dht_test.go b/dht_test.go index 76331951f..6a02cc91a 100644 --- a/dht_test.go +++ b/dht_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/require" pb "github.com/libp2p/go-libp2p-kad-dht/pb" + test "github.com/libp2p/go-libp2p-kad-dht/internal/testing" "github.com/ipfs/go-cid" u "github.com/ipfs/go-ipfs-util" @@ -72,33 +73,8 @@ type blankValidator struct{} func (blankValidator) Validate(_ string, _ []byte) error { return nil } func (blankValidator) Select(_ string, _ [][]byte) (int, error) { return 0, nil } -type testValidator struct{} - -func (testValidator) Select(_ string, bs [][]byte) (int, error) { - index := -1 - for i, b := range bs { - if bytes.Equal(b, []byte("newer")) { - index = i - } else if bytes.Equal(b, []byte("valid")) { - if index == -1 { - index = i - } - } - } - if index == -1 { - return -1, errors.New("no rec found") - } - return index, nil -} -func (testValidator) Validate(_ string, b []byte) error { - if bytes.Equal(b, []byte("expired")) { - return errors.New("expired") - } - return nil -} - type testAtomicPutValidator struct { - testValidator + test.TestValidator } // selects the entry with the 'highest' last byte @@ -372,7 +348,7 @@ func TestValueSetInvalid(t *testing.T) { defer dhtA.host.Close() defer dhtB.host.Close() - dhtA.Validator.(record.NamespacedValidator)["v"] = testValidator{} + dhtA.Validator.(record.NamespacedValidator)["v"] = test.TestValidator{} dhtB.Validator.(record.NamespacedValidator)["v"] = blankValidator{} connect(t, ctx, dhtA, dhtB) @@ -451,8 +427,8 @@ func TestSearchValue(t *testing.T) { connect(t, ctx, dhtA, dhtB) - dhtA.Validator.(record.NamespacedValidator)["v"] = testValidator{} - dhtB.Validator.(record.NamespacedValidator)["v"] = testValidator{} + dhtA.Validator.(record.NamespacedValidator)["v"] = test.TestValidator{} + dhtB.Validator.(record.NamespacedValidator)["v"] = test.TestValidator{} ctxT, cancel := context.WithTimeout(ctx, time.Second) defer cancel() @@ -554,7 +530,7 @@ func TestValueGetInvalid(t *testing.T) { defer dhtB.host.Close() dhtA.Validator.(record.NamespacedValidator)["v"] = blankValidator{} - dhtB.Validator.(record.NamespacedValidator)["v"] = testValidator{} + dhtB.Validator.(record.NamespacedValidator)["v"] = test.TestValidator{} connect(t, ctx, dhtA, dhtB) diff --git a/dual/dual.go b/dual/dual.go index cee816c09..18755f6b1 100644 --- a/dual/dual.go +++ b/dual/dual.go @@ -14,6 +14,7 @@ import ( "github.com/libp2p/go-libp2p-core/protocol" "github.com/libp2p/go-libp2p-core/routing" dht "github.com/libp2p/go-libp2p-kad-dht" + helper "github.com/libp2p/go-libp2p-routing-helpers" ) // DHT implements the routing interface to provide two concrete DHT implementationts for use @@ -230,35 +231,8 @@ func (d *DHT) GetValue(ctx context.Context, key string, opts ...routing.Option) // SearchValue searches for better values from this value func (dht *DHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) { - streama, erra := dht.WAN.SearchValue(ctx, key, opts...) - streamb, errb := dht.WAN.SearchValue(ctx, key, opts...) - if erra == nil && errb == nil { - combinedStream := make(chan []byte) - var combinedWg sync.WaitGroup - combinedWg.Add(2) - go func(out chan []byte) { - for itm := range streama { - out <- itm - } - combinedWg.Done() - }(combinedStream) - go func(out chan []byte) { - for itm := range streamb { - out <- itm - } - combinedWg.Done() - }(combinedStream) - go func() { - combinedWg.Wait() - close(combinedStream) - }() - return combinedStream, nil - } else if erra == nil { - return streama, nil - } else if errb == nil { - return streamb, nil - } - return nil, mergeErrors(erra, errb) + p := helper.Parallel{Routers: []routing.Routing{dht.WAN, dht.LAN}, Validator: dht.WAN.Validator} + return p.SearchValue(ctx, key, opts...) } // GetPublicKey returns the public key for the given peer. diff --git a/dual/dual_test.go b/dual/dual_test.go index e863928fc..aeb4f7ef4 100644 --- a/dual/dual_test.go +++ b/dual/dual_test.go @@ -12,7 +12,9 @@ import ( "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" dht "github.com/libp2p/go-libp2p-kad-dht" + test "github.com/libp2p/go-libp2p-kad-dht/internal/testing" peerstore "github.com/libp2p/go-libp2p-peerstore" + record "github.com/libp2p/go-libp2p-record" swarmt "github.com/libp2p/go-libp2p-swarm/testing" bhost "github.com/libp2p/go-libp2p/p2p/host/basic" ) @@ -108,11 +110,9 @@ func connect(ctx context.Context, t *testing.T, a, b *dht.IpfsDHT) { t.Fatal("no addresses for connection.") } a.Host().Peerstore().AddAddrs(bid, baddr, peerstore.TempAddrTTL) - fmt.Fprintf(os.Stderr, "gonna connect.\n") if err := a.Host().Connect(ctx, peer.AddrInfo{ID: bid}); err != nil { t.Fatal(err) } - fmt.Fprintf(os.Stderr, "gonn wait\n") wait(ctx, t, a, b) } @@ -231,7 +231,7 @@ func TestValueGetSet(t *testing.T) { defer wan.Close() defer lan.Close() - err := d.PutValue(ctx, "/v/hello", []byte("world")) + err := d.PutValue(ctx, "/v/hello", []byte("valid")) if err != nil { t.Fatal(err) } @@ -239,7 +239,7 @@ func TestValueGetSet(t *testing.T) { if err != nil { t.Fatal(err) } - if string(val) != "world" { + if string(val) != "valid" { t.Fatal("failed to get expected string.") } @@ -258,7 +258,10 @@ func TestSearchValue(t *testing.T) { defer wan.Close() defer lan.Close() - err := wan.PutValue(ctx, "/v/hello", []byte("world"), ) + d.WAN.Validator.(record.NamespacedValidator)["v"] = test.TestValidator{} + d.LAN.Validator.(record.NamespacedValidator)["v"] = test.TestValidator{} + + err := wan.PutValue(ctx, "/v/hello", []byte("valid")) valCh, err := d.SearchValue(ctx, "/v/hello", dht.Quorum(0)) if err != nil { @@ -267,8 +270,8 @@ func TestSearchValue(t *testing.T) { select { case v := <-valCh: - if string(v) != "world" { - t.Errorf("expected 'world', got '%s'", string(v)) + if string(v) != "valid" { + t.Errorf("expected 'valid', got '%s'", string(v)) } case <-ctx.Done(): t.Fatal(ctx.Err()) @@ -280,10 +283,13 @@ func TestSearchValue(t *testing.T) { } select { - case v := <-valCh: + case v, ok := <-valCh: if string(v) != "newer" { t.Errorf("expected 'newer', got '%s'", string(v)) } + if !ok { + t.Errorf("chan closed early") + } case <-ctx.Done(): t.Fatal(ctx.Err()) } diff --git a/go.mod b/go.mod index 87e3785a5..9b34d2b30 100644 --- a/go.mod +++ b/go.mod @@ -20,11 +20,11 @@ require ( github.com/libp2p/go-libp2p-kbucket v0.3.3 github.com/libp2p/go-libp2p-peerstore v0.2.2 github.com/libp2p/go-libp2p-record v0.1.2 + github.com/libp2p/go-libp2p-routing-helpers v0.2.0 github.com/libp2p/go-libp2p-swarm v0.2.3 github.com/libp2p/go-libp2p-testing v0.1.1 github.com/libp2p/go-msgio v0.0.4 github.com/libp2p/go-netroute v0.1.2 - github.com/mr-tron/base58 v1.1.3 github.com/multiformats/go-base32 v0.0.3 github.com/multiformats/go-multiaddr v0.2.1 github.com/multiformats/go-multiaddr-dns v0.2.0 diff --git a/go.sum b/go.sum index c9bd54f76..61bb8f5a0 100644 --- a/go.sum +++ b/go.sum @@ -79,6 +79,7 @@ github.com/gxed/hashland/murmur3 v0.0.1 h1:SheiaIt0sda5K+8FLz952/1iWS9zrnKsEJaOJ github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48= github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI= github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -264,6 +265,8 @@ github.com/libp2p/go-libp2p-pnet v0.2.0 h1:J6htxttBipJujEjz1y0a5+eYoiPcFHhSYHH6n github.com/libp2p/go-libp2p-pnet v0.2.0/go.mod h1:Qqvq6JH/oMZGwqs3N1Fqhv8NVhrdYcO0BW4wssv21LA= github.com/libp2p/go-libp2p-record v0.1.2 h1:M50VKzWnmUrk/M5/Dz99qO9Xh4vs8ijsK+7HkJvRP+0= github.com/libp2p/go-libp2p-record v0.1.2/go.mod h1:pal0eNcT5nqZaTV7UGhqeGqxFgGdsU/9W//C8dqjQDk= +github.com/libp2p/go-libp2p-routing-helpers v0.2.0 h1:+QKTsx2Bg0q3oueQ9CopTwKN5NsnF+qEC+sbkSVXnsU= +github.com/libp2p/go-libp2p-routing-helpers v0.2.0/go.mod h1:Db+7LRSPImkV9fOKsNWVW5IXyy9XDse92lUtO3O+jlo= github.com/libp2p/go-libp2p-secio v0.1.0/go.mod h1:tMJo2w7h3+wN4pgU2LSYeiKPrfqBgkOsdiKK77hE7c8= github.com/libp2p/go-libp2p-secio v0.2.0 h1:ywzZBsWEEz2KNTn5RtzauEDq5RFEefPsttXYwAWqHng= github.com/libp2p/go-libp2p-secio v0.2.0/go.mod h1:2JdZepB8J5V9mBp79BmwsaPQhRPNN2NrnB2lKQcdy6g= diff --git a/internal/testing/helper.go b/internal/testing/helper.go new file mode 100644 index 000000000..52961f3f8 --- /dev/null +++ b/internal/testing/helper.go @@ -0,0 +1,31 @@ +package testing + +import ( + "bytes" + "errors" +) + +type TestValidator struct{} + +func (TestValidator) Select(_ string, bs [][]byte) (int, error) { + index := -1 + for i, b := range bs { + if bytes.Equal(b, []byte("newer")) { + index = i + } else if bytes.Equal(b, []byte("valid")) { + if index == -1 { + index = i + } + } + } + if index == -1 { + return -1, errors.New("no rec found") + } + return index, nil +} +func (TestValidator) Validate(_ string, b []byte) error { + if bytes.Equal(b, []byte("expired")) { + return errors.New("expired") + } + return nil +} From 668c25a77bbbaf8afbd840491158ac374167c18f Mon Sep 17 00:00:00 2001 From: Will Scott Date: Wed, 8 Apr 2020 19:07:48 -0700 Subject: [PATCH 06/10] test remaining interface methods --- dual/dual_test.go | 93 ++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 79 insertions(+), 14 deletions(-) diff --git a/dual/dual_test.go b/dual/dual_test.go index aeb4f7ef4..57b1d5081 100644 --- a/dual/dual_test.go +++ b/dual/dual_test.go @@ -2,8 +2,6 @@ package dual import ( "context" - "fmt" - "os" "testing" "time" @@ -40,11 +38,9 @@ func MkFilterForPeer() (func(d *dht.IpfsDHT, conns []network.Conn) bool, *custom f := func(_ *dht.IpfsDHT, conns []network.Conn) bool { for _, c := range conns { if c.RemotePeer() == helper.allow { - fmt.Fprintf(os.Stderr, "allowed conn per filter\n") return true } } - fmt.Fprintf(os.Stderr, "rejected conn per rt filter\n") return false } return f, &helper @@ -277,20 +273,89 @@ func TestSearchValue(t *testing.T) { t.Fatal(ctx.Err()) } + select { + case _, ok := <-valCh: + if ok { + t.Errorf("chan should close") + } + case <-ctx.Done(): + t.Fatal(ctx.Err()) + } + err = lan.PutValue(ctx, "/v/hello", []byte("newer")) if err != nil { t.Error(err) } - select { - case v, ok := <-valCh: - if string(v) != "newer" { - t.Errorf("expected 'newer', got '%s'", string(v)) - } - if !ok { - t.Errorf("chan closed early") - } - case <-ctx.Done(): - t.Fatal(ctx.Err()) + valCh, err = d.SearchValue(ctx, "/v/hello", dht.Quorum(0)) + if err != nil { + t.Fatal(err) + } + + var lastVal []byte + for c := range valCh { + lastVal = c + } + if string(lastVal) != "newer" { + t.Fatal("incorrect best search value") + } +} + +func TestGetPublicKey(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + d, wan, lan := setupTier(ctx, t) + defer d.Close() + defer wan.Close() + defer lan.Close() + + pk, err := d.GetPublicKey(ctx, wan.PeerID()) + if err != nil { + t.Fatal(err) + } + id, err := peer.IDFromPublicKey(pk) + if err != nil { + t.Fatal(err) + } + if id != wan.PeerID() { + t.Fatal("incorrect PK") + } + + pk, err = d.GetPublicKey(ctx, lan.PeerID()) + if err != nil { + t.Fatal(err) + } + id, err = peer.IDFromPublicKey(pk) + if err != nil { + t.Fatal(err) + } + if id != lan.PeerID() { + t.Fatal("incorrect PK") + } +} + +func TestFindPeer(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + d, wan, lan := setupTier(ctx, t) + defer d.Close() + defer wan.Close() + defer lan.Close() + + p, err := d.FindPeer(ctx, lan.PeerID()) + if err != nil { + t.Fatal(err) + } + if len(p.Addrs) == 0 { + t.Fatal("expeced find peer to find addresses.") + } + p, err = d.FindPeer(ctx, wan.PeerID()) + if err != nil { + t.Fatal(err) + } + if len(p.Addrs) == 0 { + t.Fatal("expeced find peer to find addresses.") } } From 02d310fddc4ab8e1ced9472657dc30baf6bcc0a6 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Wed, 8 Apr 2020 19:12:54 -0700 Subject: [PATCH 07/10] lint --- dual/dual_test.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/dual/dual_test.go b/dual/dual_test.go index 57b1d5081..af32b0fb5 100644 --- a/dual/dual_test.go +++ b/dual/dual_test.go @@ -9,9 +9,9 @@ import ( u "github.com/ipfs/go-ipfs-util" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" + peerstore "github.com/libp2p/go-libp2p-core/peerstore" dht "github.com/libp2p/go-libp2p-kad-dht" test "github.com/libp2p/go-libp2p-kad-dht/internal/testing" - peerstore "github.com/libp2p/go-libp2p-peerstore" record "github.com/libp2p/go-libp2p-record" swarmt "github.com/libp2p/go-libp2p-swarm/testing" bhost "github.com/libp2p/go-libp2p/p2p/host/basic" @@ -189,6 +189,8 @@ func TestFindProviderAsync(t *testing.T) { defer wan.Close() defer lan.Close() + time.Sleep(5 * time.Millisecond) + if err := wan.Provide(ctx, wancid, false); err != nil { t.Fatal(err) } @@ -227,6 +229,8 @@ func TestValueGetSet(t *testing.T) { defer wan.Close() defer lan.Close() + time.Sleep(5 * time.Millisecond) + err := d.PutValue(ctx, "/v/hello", []byte("valid")) if err != nil { t.Fatal(err) @@ -239,7 +243,7 @@ func TestValueGetSet(t *testing.T) { t.Fatal("failed to get expected string.") } - val, err = lan.GetValue(ctx, "/v/hello") + _, err = lan.GetValue(ctx, "/v/hello") if err == nil { t.Fatal(err) } @@ -257,7 +261,7 @@ func TestSearchValue(t *testing.T) { d.WAN.Validator.(record.NamespacedValidator)["v"] = test.TestValidator{} d.LAN.Validator.(record.NamespacedValidator)["v"] = test.TestValidator{} - err := wan.PutValue(ctx, "/v/hello", []byte("valid")) + _ = wan.PutValue(ctx, "/v/hello", []byte("valid")) valCh, err := d.SearchValue(ctx, "/v/hello", dht.Quorum(0)) if err != nil { @@ -310,6 +314,8 @@ func TestGetPublicKey(t *testing.T) { defer wan.Close() defer lan.Close() + time.Sleep(5 * time.Millisecond) + pk, err := d.GetPublicKey(ctx, wan.PeerID()) if err != nil { t.Fatal(err) @@ -344,6 +350,8 @@ func TestFindPeer(t *testing.T) { defer wan.Close() defer lan.Close() + time.Sleep(5 * time.Millisecond) + p, err := d.FindPeer(ctx, lan.PeerID()) if err != nil { t.Fatal(err) From 0d90799af88a185f81e71946a1edf2901df097c0 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Wed, 8 Apr 2020 21:15:03 -0700 Subject: [PATCH 08/10] cleaner parallelism --- dual/dual.go | 124 ++++++++++------------------------------------ dual/dual_test.go | 2 +- 2 files changed, 28 insertions(+), 98 deletions(-) diff --git a/dual/dual.go b/dual/dual.go index 18755f6b1..c444f9125 100644 --- a/dual/dual.go +++ b/dual/dual.go @@ -4,7 +4,6 @@ package dual import ( "context" - "fmt" "sync" "github.com/ipfs/go-cid" @@ -15,6 +14,8 @@ import ( "github.com/libp2p/go-libp2p-core/routing" dht "github.com/libp2p/go-libp2p-kad-dht" helper "github.com/libp2p/go-libp2p-routing-helpers" + + "github.com/hashicorp/go-multierror" ) // DHT implements the routing interface to provide two concrete DHT implementationts for use @@ -24,8 +25,8 @@ type DHT struct { LAN *dht.IpfsDHT } -// DefaultLanExtension is used to differentiate local protocol requests from those on the WAN DHT. -const DefaultLanExtension protocol.ID = "/lan" +// LanExtension is used to differentiate local protocol requests from those on the WAN DHT. +const LanExtension protocol.ID = "/lan" // Assert that IPFS assumptions about interfaces aren't broken. These aren't a // guarantee, but we can use them to aid refactoring. @@ -55,7 +56,7 @@ func New(ctx context.Context, h host.Host, options ...dht.Option) (*DHT, error) // Unless overridden by user supplied options, the LAN DHT should default // to 'AutoServer' mode. lanOpts := append(options, - dht.ProtocolExtension(DefaultLanExtension), + dht.ProtocolExtension(LanExtension), dht.QueryFilter(dht.PrivateQueryFilter), dht.RoutingTableFilter(dht.PrivateRoutingTableFilter), ) @@ -73,7 +74,7 @@ func New(ctx context.Context, h host.Host, options ...dht.Option) (*DHT, error) // Close closes the DHT context. func (dht *DHT) Close() error { - return mergeErrors(dht.WAN.Close(), dht.LAN.Close()) + return multierror.Append(dht.WAN.Close(), dht.LAN.Close()).ErrorOrNil() } func (dht *DHT) activeWAN() bool { @@ -99,21 +100,18 @@ func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) defer close(outCh) found := make(map[peer.ID]struct{}, count) - nch := 2 var pi peer.AddrInfo - for nch > 0 && count > 0 { + for count > 0 && (wanCh != nil || lanCh != nil) { var ok bool select { case pi, ok = <-wanCh: if !ok { wanCh = nil - nch-- continue } case pi, ok = <-lanCh: if !ok { lanCh = nil - nch-- continue } } @@ -155,18 +153,7 @@ func (dht *DHT) FindPeer(ctx context.Context, pid peer.ID) (peer.AddrInfo, error return peer.AddrInfo{ ID: pid, Addrs: append(wanInfo.Addrs, lanInfo.Addrs...), - }, mergeErrors(wanErr, lanErr) -} - -func mergeErrors(a, b error) error { - if a == nil && b == nil { - return nil - } else if a != nil && b != nil { - return fmt.Errorf("%v, %v", a, b) - } else if a != nil { - return a - } - return b + }, multierror.Append(wanErr, lanErr).ErrorOrNil() } // Bootstrap allows callers to hint to the routing system to get into a @@ -174,7 +161,7 @@ func mergeErrors(a, b error) error { func (dht *DHT) Bootstrap(ctx context.Context) error { erra := dht.WAN.Bootstrap(ctx) errb := dht.LAN.Bootstrap(ctx) - return mergeErrors(erra, errb) + return multierror.Append(erra, errb).ErrorOrNil() } // PutValue adds value corresponding to given Key. @@ -190,43 +177,24 @@ func (d *DHT) GetValue(ctx context.Context, key string, opts ...routing.Option) reqCtx, cncl := context.WithCancel(ctx) defer cncl() - resChan := make(chan []byte) - defer close(resChan) - errChan := make(chan error) - defer close(errChan) - runner := func(impl *dht.IpfsDHT, valCh chan []byte, errCh chan error) { - val, err := impl.GetValue(reqCtx, key, opts...) - if err != nil { - errCh <- err - return - } - valCh <- val - } - go runner(d.WAN, resChan, errChan) - go runner(d.LAN, resChan, errChan) - - var err error - var val []byte - select { - case val = <-resChan: - cncl() - case err = <-errChan: - } + var lanVal []byte + var lanErr error + var lanWaiter sync.WaitGroup + lanWaiter.Add(1) + go func() { + defer lanWaiter.Done() + lanVal, lanErr = d.LAN.GetValue(reqCtx, key, opts...) + }() - // Drain or wait for the slower runner - select { - case secondVal := <-resChan: - if val == nil { - val = secondVal - } - case secondErr := <-errChan: - if err != nil { - err = mergeErrors(err, secondErr) - } else if val == nil { - err = secondErr + wanVal, wanErr := d.WAN.GetValue(ctx, key, opts...) + if wanErr != nil { + lanWaiter.Wait() + if lanErr != nil { + return nil, multierror.Append(wanErr, lanErr).ErrorOrNil() } + return lanVal, nil } - return val, err + return wanVal, nil } // SearchValue searches for better values from this value @@ -236,45 +204,7 @@ func (dht *DHT) SearchValue(ctx context.Context, key string, opts ...routing.Opt } // GetPublicKey returns the public key for the given peer. -func (d *DHT) GetPublicKey(ctx context.Context, pid peer.ID) (ci.PubKey, error) { - reqCtx, cncl := context.WithCancel(ctx) - defer cncl() - - resChan := make(chan ci.PubKey) - defer close(resChan) - errChan := make(chan error) - defer close(errChan) - runner := func(impl *dht.IpfsDHT, valCh chan ci.PubKey, errCh chan error) { - val, err := impl.GetPublicKey(reqCtx, pid) - if err != nil { - errCh <- err - return - } - valCh <- val - } - go runner(d.WAN, resChan, errChan) - go runner(d.LAN, resChan, errChan) - - var err error - var val ci.PubKey - select { - case val = <-resChan: - cncl() - case err = <-errChan: - } - - // Drain or wait for the slower runner - select { - case secondVal := <-resChan: - if val == nil { - val = secondVal - } - case secondErr := <-errChan: - if err != nil { - err = mergeErrors(err, secondErr) - } else if val == nil { - err = secondErr - } - } - return val, err +func (dht *DHT) GetPublicKey(ctx context.Context, pid peer.ID) (ci.PubKey, error) { + p := helper.Parallel{Routers: []routing.Routing{dht.WAN, dht.LAN}, Validator: dht.WAN.Validator} + return p.GetPublicKey(ctx, pid) } diff --git a/dual/dual_test.go b/dual/dual_test.go index af32b0fb5..87349f3c8 100644 --- a/dual/dual_test.go +++ b/dual/dual_test.go @@ -65,7 +65,7 @@ func setupDHTWithFilters(ctx context.Context, t *testing.T, options ...dht.Optio lanOpts := []dht.Option{ dht.NamespacedValidator("v", blankValidator{}), dht.ProtocolPrefix("/test"), - dht.ProtocolExtension(DefaultLanExtension), + dht.ProtocolExtension(LanExtension), dht.DisableAutoRefresh(), dht.RoutingTableFilter(lanFilter), dht.Mode(dht.ModeServer), From a366d8b50afbdecf05634fde5f6cf117417505ed Mon Sep 17 00:00:00 2001 From: Will Scott Date: Thu, 9 Apr 2020 10:29:57 -0700 Subject: [PATCH 09/10] address reviews --- dht_options.go | 2 -- dual/dual.go | 19 ++++++++++++------- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/dht_options.go b/dht_options.go index 91d6c4284..3637c2136 100644 --- a/dht_options.go +++ b/dht_options.go @@ -4,8 +4,6 @@ import ( "fmt" "time" - "github.com/ipfs/go-ipns" - ds "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" "github.com/ipfs/go-ipns" diff --git a/dual/dual.go b/dual/dual.go index c444f9125..99e2b7004 100644 --- a/dual/dual.go +++ b/dual/dual.go @@ -174,21 +174,26 @@ func (dht *DHT) PutValue(ctx context.Context, key string, val []byte, opts ...ro // GetValue searches for the value corresponding to given Key. func (d *DHT) GetValue(ctx context.Context, key string, opts ...routing.Option) ([]byte, error) { - reqCtx, cncl := context.WithCancel(ctx) - defer cncl() + lanCtx, cancelLan := context.WithCancel(ctx) + defer cancelLan() - var lanVal []byte - var lanErr error - var lanWaiter sync.WaitGroup + var ( + lanVal []byte + lanErr error + lanWaiter sync.WaitGroup + ) lanWaiter.Add(1) go func() { defer lanWaiter.Done() - lanVal, lanErr = d.LAN.GetValue(reqCtx, key, opts...) + lanVal, lanErr = d.LAN.GetValue(lanCtx, key, opts...) }() wanVal, wanErr := d.WAN.GetValue(ctx, key, opts...) + if wanErr == nil { + cancelLan() + } + lanWaiter.Wait() if wanErr != nil { - lanWaiter.Wait() if lanErr != nil { return nil, multierror.Append(wanErr, lanErr).ErrorOrNil() } From ca09167bd7ab40f60c0adb31fe225d9f0620dd79 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Thu, 9 Apr 2020 11:01:05 -0700 Subject: [PATCH 10/10] fix bug in count=0 findProviders --- dual/dual.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dual/dual.go b/dual/dual.go index 99e2b7004..f60e6a92a 100644 --- a/dual/dual.go +++ b/dual/dual.go @@ -95,13 +95,14 @@ func (dht *DHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) outCh := make(chan peer.AddrInfo) wanCh := dht.WAN.FindProvidersAsync(reqCtx, key, count) lanCh := dht.LAN.FindProvidersAsync(reqCtx, key, count) + zeroCount := (count == 0) go func() { defer cancel() defer close(outCh) found := make(map[peer.ID]struct{}, count) var pi peer.AddrInfo - for count > 0 && (wanCh != nil || lanCh != nil) { + for (zeroCount || count > 0) && (wanCh != nil || lanCh != nil) { var ok bool select { case pi, ok = <-wanCh: