Skip to content

Commit

Permalink
Discovery: speed up lookup tests
Browse files Browse the repository at this point in the history
This speeds up the discover package test suite from 60-80 sec to 20-40 sec.

* disable lookup.slowdown() in tests
* dgramPipe: refactor to use a channel
* log ListenV4 err

The improvements can be compared using:
go test -count 1 ./p2p/discover
go test ./p2p/discover -test.run TestUDPv4_Lookup -blockprofile b.dat
go tool pprof -tree b.txt
  • Loading branch information
battlmonstr committed Mar 10, 2022
1 parent d300c28 commit c11532b
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 47 deletions.
14 changes: 14 additions & 0 deletions p2p/discover/lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type lookup struct {
result nodesByDistance
replyBuffer []*node
queries int
noSlowdown bool
}

type queryFunc func(*node) ([]*node, error)
Expand All @@ -50,6 +51,8 @@ func newLookup(ctx context.Context, tab *Table, target enode.ID, q queryFunc) *l
cancelCh: ctx.Done(),
queries: -1,
}
it.noSlowdown = isDisabledLookupSlowdown(ctx)

// Don't query further if we hit ourself.
// Unlikely to happen often in practice.
it.asked[tab.self().ID()] = true
Expand Down Expand Up @@ -129,7 +132,18 @@ func (it *lookup) startQueries() bool {
return it.queries > 0
}

func disableLookupSlowdown(ctx context.Context) context.Context {
return context.WithValue(ctx, "p2p.discover.lookup.noSlowdown", true)
}

func isDisabledLookupSlowdown(ctx context.Context) bool {
return ctx.Value("p2p.discover.lookup.noSlowdown") != nil
}

func (it *lookup) slowdown() {
if it.noSlowdown {
return
}
sleep := time.NewTimer(1 * time.Second)
defer sleep.Stop()
select {
Expand Down
74 changes: 27 additions & 47 deletions p2p/discover/v4_udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"net"
"reflect"
"runtime"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -77,10 +76,14 @@ func newUDPTest(t *testing.T) *udpTest {
}
ln := enode.NewLocalNode(test.db, test.localkey)
ctx := context.Background()
test.udp, _ = ListenV4(ctx, test.pipe, ln, Config{
ctx = disableLookupSlowdown(ctx)
test.udp, err = ListenV4(ctx, test.pipe, ln, Config{
PrivateKey: test.localkey,
Log: testlog.Logger(t, log.LvlError),
})
if err != nil {
panic(err)
}
test.table = test.udp.tab
// Wait for initial refresh so the table doesn't send unexpected findnode.
<-test.table.initDone
Expand Down Expand Up @@ -589,6 +592,7 @@ func startLocalhostV4(t *testing.T, cfg Config) *UDPv4 {
ln.SetStaticIP(realaddr.IP)
ln.SetFallbackUDP(realaddr.Port)
ctx := context.Background()
ctx = disableLookupSlowdown(ctx)
udp, err := ListenV4(ctx, socket, ln, cfg)
if err != nil {
t.Fatal(err)
Expand All @@ -598,11 +602,8 @@ func startLocalhostV4(t *testing.T, cfg Config) *UDPv4 {

// dgramPipe is a fake UDP socket. It queues all sent datagrams.
type dgramPipe struct {
mu *sync.Mutex
cond *sync.Cond
closing chan struct{}
closed bool
queue []dgram
queue chan dgram
closed chan struct{}
}

type dgram struct {
Expand All @@ -611,42 +612,34 @@ type dgram struct {
}

func newpipe() *dgramPipe {
mu := new(sync.Mutex)
return &dgramPipe{
closing: make(chan struct{}),
cond: &sync.Cond{L: mu},
mu: mu,
make(chan dgram, 1000),
make(chan struct{}),
}
}

// WriteToUDP queues a datagram.
func (c *dgramPipe) WriteToUDP(b []byte, to *net.UDPAddr) (n int, err error) {
msg := make([]byte, len(b))
copy(msg, b)
c.mu.Lock()
defer c.mu.Unlock()
if c.closed {
return 0, errors.New("closed")
}
c.queue = append(c.queue, dgram{*to, b})
c.cond.Signal()

n = 0
err = errors.New("closed")
defer recover()

c.queue <- dgram{*to, b}
return len(b), nil
}

// ReadFromUDP just hangs until the pipe is closed.
func (c *dgramPipe) ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error) {
<-c.closing
<-c.closed
return 0, nil, io.EOF
}

func (c *dgramPipe) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
if !c.closed {
close(c.closing)
c.closed = true
}
c.cond.Broadcast()
close(c.queue)
close(c.closed)
return nil
}

Expand All @@ -655,29 +648,16 @@ func (c *dgramPipe) LocalAddr() net.Addr {
}

func (c *dgramPipe) receive() (dgram, error) {
c.mu.Lock()
defer c.mu.Unlock()

var timedOut bool
timer := time.AfterFunc(3*time.Second, func() {
c.mu.Lock()
timedOut = true
c.mu.Unlock()
c.cond.Broadcast()
})
defer timer.Stop()
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

for len(c.queue) == 0 && !c.closed && !timedOut {
c.cond.Wait()
}
if c.closed {
select {
case p, isOpen := <-c.queue:
if isOpen {
return p, nil
}
return dgram{}, errClosed
}
if timedOut {
case <-ctx.Done():
return dgram{}, errTimeout
}
p := c.queue[0]
copy(c.queue, c.queue[1:])
c.queue = c.queue[:len(c.queue)-1]
return p, nil
}
2 changes: 2 additions & 0 deletions p2p/discover/v5_udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func startLocalhostV5(t *testing.T, cfg Config) *UDPv5 {
ln.SetStaticIP(realaddr.IP)
ln.Set(enr.UDP(realaddr.Port))
ctx := context.Background()
ctx = disableLookupSlowdown(ctx)
udp, err := ListenV5(ctx, socket, ln, cfg)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -729,6 +730,7 @@ func newUDPV5Test(t *testing.T) *udpV5Test {
ln.SetStaticIP(net.IP{10, 0, 0, 1})
ln.Set(enr.UDP(30303))
ctx := context.Background()
ctx = disableLookupSlowdown(ctx)
test.udp, err = ListenV5(ctx, test.pipe, ln, Config{
PrivateKey: test.localkey,
Log: testlog.Logger(t, log.LvlError),
Expand Down

0 comments on commit c11532b

Please sign in to comment.