diff --git a/p2p/discover/lookup.go b/p2p/discover/lookup.go index ec11aad89a2..0e03daa30f2 100644 --- a/p2p/discover/lookup.go +++ b/p2p/discover/lookup.go @@ -35,6 +35,7 @@ type lookup struct { result nodesByDistance replyBuffer []*node queries int + noSlowdown bool } type queryFunc func(*node) ([]*node, error) @@ -50,6 +51,8 @@ func newLookup(ctx context.Context, tab *Table, target enode.ID, q queryFunc) *l cancelCh: ctx.Done(), queries: -1, } + it.noSlowdown = isDisabledLookupSlowdown(ctx) + // Don't query further if we hit ourself. // Unlikely to happen often in practice. it.asked[tab.self().ID()] = true @@ -129,7 +132,18 @@ func (it *lookup) startQueries() bool { return it.queries > 0 } +func disableLookupSlowdown(ctx context.Context) context.Context { + return context.WithValue(ctx, "p2p.discover.lookup.noSlowdown", true) +} + +func isDisabledLookupSlowdown(ctx context.Context) bool { + return ctx.Value("p2p.discover.lookup.noSlowdown") != nil +} + func (it *lookup) slowdown() { + if it.noSlowdown { + return + } sleep := time.NewTimer(1 * time.Second) defer sleep.Stop() select { diff --git a/p2p/discover/v4_udp_test.go b/p2p/discover/v4_udp_test.go index 348a73b59b8..115a70b8b0e 100644 --- a/p2p/discover/v4_udp_test.go +++ b/p2p/discover/v4_udp_test.go @@ -29,7 +29,6 @@ import ( "net" "reflect" "runtime" - "sync" "testing" "time" @@ -77,10 +76,14 @@ func newUDPTest(t *testing.T) *udpTest { } ln := enode.NewLocalNode(test.db, test.localkey) ctx := context.Background() - test.udp, _ = ListenV4(ctx, test.pipe, ln, Config{ + ctx = disableLookupSlowdown(ctx) + test.udp, err = ListenV4(ctx, test.pipe, ln, Config{ PrivateKey: test.localkey, Log: testlog.Logger(t, log.LvlError), }) + if err != nil { + panic(err) + } test.table = test.udp.tab // Wait for initial refresh so the table doesn't send unexpected findnode. <-test.table.initDone @@ -589,6 +592,7 @@ func startLocalhostV4(t *testing.T, cfg Config) *UDPv4 { ln.SetStaticIP(realaddr.IP) ln.SetFallbackUDP(realaddr.Port) ctx := context.Background() + ctx = disableLookupSlowdown(ctx) udp, err := ListenV4(ctx, socket, ln, cfg) if err != nil { t.Fatal(err) @@ -598,11 +602,8 @@ func startLocalhostV4(t *testing.T, cfg Config) *UDPv4 { // dgramPipe is a fake UDP socket. It queues all sent datagrams. type dgramPipe struct { - mu *sync.Mutex - cond *sync.Cond - closing chan struct{} - closed bool - queue []dgram + queue chan dgram + closed chan struct{} } type dgram struct { @@ -611,11 +612,9 @@ type dgram struct { } func newpipe() *dgramPipe { - mu := new(sync.Mutex) return &dgramPipe{ - closing: make(chan struct{}), - cond: &sync.Cond{L: mu}, - mu: mu, + make(chan dgram, 1000), + make(chan struct{}), } } @@ -623,30 +622,24 @@ func newpipe() *dgramPipe { func (c *dgramPipe) WriteToUDP(b []byte, to *net.UDPAddr) (n int, err error) { msg := make([]byte, len(b)) copy(msg, b) - c.mu.Lock() - defer c.mu.Unlock() - if c.closed { - return 0, errors.New("closed") - } - c.queue = append(c.queue, dgram{*to, b}) - c.cond.Signal() + + n = 0 + err = errors.New("closed") + defer recover() + + c.queue <- dgram{*to, b} return len(b), nil } // ReadFromUDP just hangs until the pipe is closed. func (c *dgramPipe) ReadFromUDP(b []byte) (n int, addr *net.UDPAddr, err error) { - <-c.closing + <-c.closed return 0, nil, io.EOF } func (c *dgramPipe) Close() error { - c.mu.Lock() - defer c.mu.Unlock() - if !c.closed { - close(c.closing) - c.closed = true - } - c.cond.Broadcast() + close(c.queue) + close(c.closed) return nil } @@ -655,29 +648,16 @@ func (c *dgramPipe) LocalAddr() net.Addr { } func (c *dgramPipe) receive() (dgram, error) { - c.mu.Lock() - defer c.mu.Unlock() - - var timedOut bool - timer := time.AfterFunc(3*time.Second, func() { - c.mu.Lock() - timedOut = true - c.mu.Unlock() - c.cond.Broadcast() - }) - defer timer.Stop() + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() - for len(c.queue) == 0 && !c.closed && !timedOut { - c.cond.Wait() - } - if c.closed { + select { + case p, isOpen := <-c.queue: + if isOpen { + return p, nil + } return dgram{}, errClosed - } - if timedOut { + case <-ctx.Done(): return dgram{}, errTimeout } - p := c.queue[0] - copy(c.queue, c.queue[1:]) - c.queue = c.queue[:len(c.queue)-1] - return p, nil } diff --git a/p2p/discover/v5_udp_test.go b/p2p/discover/v5_udp_test.go index b196a6eaab7..0fb214527ea 100644 --- a/p2p/discover/v5_udp_test.go +++ b/p2p/discover/v5_udp_test.go @@ -104,6 +104,7 @@ func startLocalhostV5(t *testing.T, cfg Config) *UDPv5 { ln.SetStaticIP(realaddr.IP) ln.Set(enr.UDP(realaddr.Port)) ctx := context.Background() + ctx = disableLookupSlowdown(ctx) udp, err := ListenV5(ctx, socket, ln, cfg) if err != nil { t.Fatal(err) @@ -729,6 +730,7 @@ func newUDPV5Test(t *testing.T) *udpV5Test { ln.SetStaticIP(net.IP{10, 0, 0, 1}) ln.Set(enr.UDP(30303)) ctx := context.Background() + ctx = disableLookupSlowdown(ctx) test.udp, err = ListenV5(ctx, test.pipe, ln, Config{ PrivateKey: test.localkey, Log: testlog.Logger(t, log.LvlError),