From 1b9804a2e95061ffd83815f3cd7616b165541d04 Mon Sep 17 00:00:00 2001 From: gfanton <8671905+gfanton@users.noreply.github.com> Date: Mon, 19 Apr 2021 17:11:22 +0200 Subject: [PATCH] fix: tinder service test Signed-off-by: gfanton <8671905+gfanton@users.noreply.github.com> --- go/internal/initutil/ipfs.go | 4 +- go/internal/ipfsutil/testing.go | 5 +- go/internal/tinder/service.go | 88 ++++++++++++----------------- go/internal/tinder/service_test.go | 90 ++++++++++++++++++++---------- 4 files changed, 103 insertions(+), 84 deletions(-) diff --git a/go/internal/initutil/ipfs.go b/go/internal/initutil/ipfs.go index ea6684bb78..cf5c185802 100644 --- a/go/internal/initutil/ipfs.go +++ b/go/internal/initutil/ipfs.go @@ -482,7 +482,9 @@ func (m *Manager) configIPFSRouting(h host.Host, r p2p_routing.Routing) error { Logger: logger, AdvertiseResetInterval: time.Minute, AdvertiseGracePeriod: time.Minute, - BackoffStratFactory: backoffstrat, + BackoffStrategy: &tinder.BackoffOpts{ + StratFactory: backoffstrat, + }, } m.Node.Protocol.discovery, err = tinder.NewService(tinderOpts, h, drivers...) diff --git a/go/internal/ipfsutil/testing.go b/go/internal/ipfsutil/testing.go index dec15c5bbf..6e932b8887 100644 --- a/go/internal/ipfsutil/testing.go +++ b/go/internal/ipfsutil/testing.go @@ -171,9 +171,10 @@ func TestingCoreAPIUsingMockNet(ctx context.Context, t testing.TB, opts *Testing Logger: opts.Logger, AdvertiseResetInterval: time.Minute, AdvertiseGracePeriod: time.Minute, - BackoffStratFactory: discovery.NewFixedBackoff(time.Second), + BackoffStrategy: &tinder.BackoffOpts{ + StratFactory: discovery.NewFixedBackoff(time.Second), + }, // BackoffStratFactory: discovery.NewExponentialBackoff(minBackoff, maxBackoff, discovery.FullJitter, time.Second, 5.0, 0, rng), - } // enable discovery monitor diff --git a/go/internal/tinder/service.go b/go/internal/tinder/service.go index ad2b1335a9..98fb22e19f 100644 --- a/go/internal/tinder/service.go +++ b/go/internal/tinder/service.go @@ -11,9 +11,7 @@ import ( p2p_discovery "github.com/libp2p/go-libp2p-core/discovery" p2p_event "github.com/libp2p/go-libp2p-core/event" "github.com/libp2p/go-libp2p-core/host" - "github.com/libp2p/go-libp2p-core/peer" p2p_peer "github.com/libp2p/go-libp2p-core/peer" - discovery "github.com/libp2p/go-libp2p-discovery" "go.uber.org/zap" ) @@ -61,14 +59,24 @@ type service struct { muAdvertiser sync.Mutex } +type BackoffOpts struct { + StratFactory discovery.BackoffFactory + DiscoveryOptions []discovery.BackoffDiscoveryOption +} + type Opts struct { Logger *zap.Logger - FindPeerResetInterval time.Duration + EnableDiscoveryMonitor bool + AdvertiseResetInterval time.Duration AdvertiseGracePeriod time.Duration - BackoffStratFactory discovery.BackoffFactory - DiscovertyOptions []discovery.BackoffDiscoveryOption + FindPeerResetInterval time.Duration + + // BackoffStrategy describes how backoff will be implemented on the + // FindPeer method. If none are provided, it will be disable alongside + // cache. + BackoffStrategy *BackoffOpts } func (o *Opts) applyDefault() { @@ -104,25 +112,25 @@ func NewService(opts *Opts, h host.Host, drivers ...*Driver) (Service, error) { return nil, err } + var disc p2p_discovery.Discovery = s + // add backoff strategy if provided - var disc p2p_discovery.Discovery - if opts.BackoffStratFactory != nil { + if opts.BackoffStrategy != nil { // wrap backoff/cache discovery - disc, err = discovery.NewBackoffDiscovery(s, opts.BackoffStratFactory, opts.DiscovertyOptions...) + disc, err = discovery.NewBackoffDiscovery(disc, opts.BackoffStrategy.StratFactory, opts.BackoffStrategy.DiscoveryOptions...) if err != nil { return nil, err } - } else { - disc = s } - // disc = s - - discm := &discoveryMonitor{ - host: h, - logger: opts.Logger, - disc: disc, - emitter: s.emitter, + // enable monitor if needed + if opts.EnableDiscoveryMonitor { + disc = &discoveryMonitor{ + host: h, + logger: opts.Logger, + disc: disc, + emitter: s.emitter, + } } // compose backoff with tinder service @@ -134,9 +142,8 @@ func NewService(opts *Opts, h host.Host, drivers ...*Driver) (Service, error) { } wctx, cancel := context.WithCancel(context.Background()) - - composer.Advertiser = discm - composer.Discoverer = newWatchdogsDiscoverer(wctx, opts.Logger, opts.AdvertiseResetInterval, discm) + composer.Discoverer = newWatchdogsDiscoverer(wctx, opts.Logger, opts.AdvertiseResetInterval, disc) + composer.Advertiser = disc composer.Unregisterer = s composer.Closer = newCloserCompose(func() error { cancel() @@ -259,16 +266,6 @@ func (s *service) advertise(ctx context.Context, d *Driver, ns string, opts ...p deadline = s.resetInterval } else { - s.Emit(&EvtDriverMonitor{ - EventType: TypeEventMonitorDriverAdvertise, - AddrInfo: p2p_peer.AddrInfo{ - ID: s.host.ID(), - Addrs: currentAddrs, - }, - Topic: ns, - DriverName: d.Name, - }) - if ttl == 0 { ttl = s.ttl } @@ -326,8 +323,7 @@ func (s *service) FindPeers(ctx context.Context, ns string, opts ...p2p_discover continue } - s.logger.Debug("finder driver started", zap.String("key", ns), zap.String("driver", driver.Name)) - + s.logger.Debug("findpeer for driver started", zap.String("key", ns), zap.String("driver", driver.Name)) cdrivers = append(cdrivers, &driverChan{ cc: ch, driver: driver, @@ -341,7 +337,7 @@ func (s *service) FindPeers(ctx context.Context, ns string, opts ...p2p_discover // use optimized method for few drivers err := s.selectFindPeers(ctx, cc, cdrivers) - s.logger.Warn("find peers done", zap.String("topic", ns), zap.Error(err)) + s.logger.Debug("find peers done", zap.String("topic", ns), zap.Error(err)) }() return cc, nil @@ -368,7 +364,6 @@ func (s *service) selectFindPeers(ctx context.Context, out chan<- p2p_peer.AddrI for n > 0 { sel, value, ok := reflect.Select(selCases) if sel == selDone { - s.logger.Warn("context done") return ctx.Err() } @@ -380,7 +375,6 @@ func (s *service) selectFindPeers(ctx context.Context, out chan<- p2p_peer.AddrI } driver := in[sel].driver - topic := in[sel].topic // we can safly get our peer peer := value.Interface().(p2p_peer.AddrInfo) @@ -399,18 +393,12 @@ func (s *service) selectFindPeers(ctx context.Context, out chan<- p2p_peer.AddrI Addrs: addrs, } + topic := in[sel].topic // protect this peer to avoid to be pruned s.ProtectPeer(peer.ID) - - s.Emit(&EvtDriverMonitor{ - EventType: TypeEventMonitorDriverFoundPeer, - Topic: topic, - AddrInfo: filterpeer, - DriverName: driver.Name, - }) - - s.logger.Debug("found a matching peer!", + s.logger.Debug("found a peer", zap.String("driver", driver.Name), + zap.String("topic", topic), zap.String("peer", filterpeer.ID.String()), zap.Any("addrs", filterpeer.Addrs)) @@ -438,13 +426,6 @@ func (s *service) Unregister(ctx context.Context, ns string) error { return nil } -func (s *service) Emit(evt *EvtDriverMonitor) { - // s.logger.Info("emitting", zap.Any("event", evt)) - if err := s.emitter.Emit(*evt); err != nil { - s.logger.Warn("unable to emit `EvtDriverMonitor`", zap.Error(err)) - } -} - func (s *service) Close() error { return s.networkNotify.Close() } @@ -522,6 +503,7 @@ func (w *watchdogsDiscoverer) FindPeers(_ context.Context, ns string, opts ...p2 return c, nil } +// discovery monitor will send a event everytime we found/advertise a peer type discoveryMonitor struct { host host.Host logger *zap.Logger @@ -547,13 +529,13 @@ func (d *discoveryMonitor) Advertise(ctx context.Context, ns string, opts ...p2p } // FindPeers discovers peers providing a service -func (d *discoveryMonitor) FindPeers(ctx context.Context, ns string, opts ...p2p_discovery.Option) (<-chan peer.AddrInfo, error) { +func (d *discoveryMonitor) FindPeers(ctx context.Context, ns string, opts ...p2p_discovery.Option) (<-chan p2p_peer.AddrInfo, error) { c, err := d.disc.FindPeers(ctx, ns, opts...) if err != nil { return nil, err } - retc := make(chan peer.AddrInfo) + retc := make(chan p2p_peer.AddrInfo) go func() { for p := range c { d.Emit(&EvtDriverMonitor{ diff --git a/go/internal/tinder/service_test.go b/go/internal/tinder/service_test.go index c7276a0c94..bc21bdbcba 100644 --- a/go/internal/tinder/service_test.go +++ b/go/internal/tinder/service_test.go @@ -3,6 +3,7 @@ package tinder import ( "context" "fmt" + "log" "math/rand" "testing" "time" @@ -10,6 +11,7 @@ import ( "github.com/libp2p/go-libp2p-core/discovery" "github.com/libp2p/go-libp2p-core/event" "github.com/libp2p/go-libp2p-core/host" + idisc "github.com/libp2p/go-libp2p-discovery" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/require" "github.com/tj/assert" @@ -24,6 +26,14 @@ type mockedService struct { Service Service } +var fixedSecondBackoff = &BackoffOpts{ + StratFactory: idisc.NewFixedBackoff(time.Second), +} + +var veryLongBackoff = &BackoffOpts{ + StratFactory: idisc.NewFixedBackoff(time.Hour), +} + func TestNewService(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -181,51 +191,73 @@ func TestFindPeersCache(t *testing.T) { const advertisekey = "test_key" const nDriver = 10 + cases := []struct { + name string + ndriver int + foundExpected int + backoffStrat *BackoffOpts + }{ + {name: "1 driver/no cache", ndriver: 1, foundExpected: 1}, + {name: "10 driver/no cache", ndriver: 10, foundExpected: 10}, + {name: "100 driver/no cache", ndriver: 100, foundExpected: 100}, + {name: "1 driver/with cache", ndriver: 1, foundExpected: 1, backoffStrat: veryLongBackoff}, + {name: "10 driver/with cache", ndriver: 10, foundExpected: 1, backoffStrat: veryLongBackoff}, + {name: "100 driver/with cache", ndriver: 100, foundExpected: 1, backoffStrat: veryLongBackoff}, + } + ctx, cancel := context.WithCancel(context.Background()) defer cancel() l, cleanup := testutil.Logger(t) defer cleanup() - m := mocknet.New(ctx) - - server := NewMockedDriverServer() + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + m := mocknet.New(ctx) + server := NewMockedDriverServer() - h, err := m.GenPeer() - require.NoError(t, err) + driverh, err := m.GenPeer() + require.NoError(t, err) - drivers := make([]*Driver, nDriver) - for i := range drivers { - drivers[i] = NewMockedDriverClient(fmt.Sprintf("MockedDriver #%d", i), h, server) - } + drivers := make([]*Driver, tc.ndriver) + for i := range drivers { + drivers[i] = NewMockedDriverClient("MockedDriver", driverh, server) + } - opts := &Opts{ - Logger: l, - AdvertiseResetInterval: time.Minute, - } + // client host should be different than the driver so he wont be skipped + clienth, err := m.GenPeer() + require.NoError(t, err) - client, err := NewService(opts, h, drivers...) - require.NoError(t, err) + // test with cache enable + opts := &Opts{ + BackoffStrategy: tc.backoffStrat, + Logger: l, + AdvertiseResetInterval: time.Minute, + } + client, err := NewService(opts, clienth, drivers...) + require.NoError(t, err) + defer client.Close() - defer client.Close() + _, err = client.Advertise(ctx, advertisekey, discovery.TTL(time.Minute)) + require.NoError(t, err) - _, err = client.Advertise(ctx, advertisekey, discovery.TTL(time.Minute)) - require.NoError(t, err) + // wait for at last one advertise to succeed. we dont care to wait for + // each driver here, since they share the same host + <-server.WaitForAdvertise(advertisekey, driverh.ID()) - // wait for at last one advertise to succeed. we dont care to wait for - // each driver here, since they share the same host - <-server.WaitForAdvertise(advertisekey, h.ID()) + cc, err := client.FindPeers(ctx, advertisekey) + require.NoError(t, err) - cc, err := client.FindPeers(ctx, advertisekey) - require.NoError(t, err) + count := 0 + for p := range cc { + assert.Equal(t, driverh.ID(), p.ID) + count++ + } - // should return one peer - count := 0 - for range cc { - count++ + assert.Equal(t, tc.foundExpected, count) + }) } - assert.Equal(t, 1, count) } func TestFindPeers(t *testing.T) { @@ -290,6 +322,7 @@ func TestFindPeers(t *testing.T) { require.NoError(t, err) opts := &Opts{ + BackoffStrategy: veryLongBackoff, Logger: l, AdvertiseResetInterval: time.Minute, } @@ -315,6 +348,7 @@ func TestFindPeers(t *testing.T) { count := 0 for p := range cc { + log.Printf("peer: %+v\n", p) links := m.LinksBetweenPeers(hcl.ID(), p.ID) assert.Len(t, links, 2)