Skip to content

Commit

Permalink
p2p/discover: add support for EIP-868 (v4 ENR extension) (#19540)
Browse files Browse the repository at this point in the history
This change implements EIP-868. The UDPv4 transport announces support
for the extension in ping/pong and handles enrRequest messages.

There are two uses of the extension: If a remote node announces support
for EIP-868 in their pong, node revalidation pulls the node's record.
The Resolve method requests the record unconditionally.
  • Loading branch information
fjl authored May 15, 2019
1 parent 8deec2e commit 350a87d
Show file tree
Hide file tree
Showing 6 changed files with 406 additions and 136 deletions.
60 changes: 37 additions & 23 deletions p2p/discover/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,17 @@ const (
bucketIPLimit, bucketSubnet = 2, 24 // at most 2 addresses from the same /24
tableIPLimit, tableSubnet = 10, 24

maxFindnodeFailures = 5 // Nodes exceeding this limit are dropped
refreshInterval = 30 * time.Minute
revalidateInterval = 10 * time.Second
copyNodesInterval = 30 * time.Second
seedMinTableTime = 5 * time.Minute
seedCount = 30
seedMaxAge = 5 * 24 * time.Hour
refreshInterval = 30 * time.Minute
revalidateInterval = 10 * time.Second
copyNodesInterval = 30 * time.Second
seedMinTableTime = 5 * time.Minute
seedCount = 30
seedMaxAge = 5 * 24 * time.Hour
)

// Table is the 'node table', a Kademlia-like index of neighbor nodes. The table keeps
// itself up-to-date by verifying the liveness of neighbors and requesting their node
// records when announcements of a new record version are received.
type Table struct {
mutex sync.Mutex // protects buckets, bucket content, nursery, rand
buckets [nBuckets]*bucket // index of known nodes by distance
Expand All @@ -80,12 +82,13 @@ type Table struct {
nodeAddedHook func(*node) // for testing
}

// transport is implemented by UDP transports.
// transport is implemented by the UDP transports.
type transport interface {
Self() *enode.Node
lookupRandom() []*enode.Node
lookupSelf() []*enode.Node
ping(*enode.Node) error
ping(*enode.Node) (seq uint64, err error)
requestENR(*enode.Node) (*enode.Node, error)
}

// bucket contains nodes, ordered by their last activity. the entry
Expand Down Expand Up @@ -175,14 +178,16 @@ func (tab *Table) ReadRandomNodes(buf []*enode.Node) (n int) {
return i + 1
}

// Resolve searches for a specific node with the given ID.
// It returns nil if the node could not be found.
func (tab *Table) Resolve(n *enode.Node) *enode.Node {
// getNode returns the node with the given ID or nil if it isn't in the table.
func (tab *Table) getNode(id enode.ID) *enode.Node {
tab.mutex.Lock()
cl := tab.closest(n.ID(), 1, false)
tab.mutex.Unlock()
if len(cl.entries) > 0 && cl.entries[0].ID() == n.ID() {
return unwrapNode(cl.entries[0])
defer tab.mutex.Unlock()

b := tab.bucket(id)
for _, e := range b.entries {
if e.ID() == id {
return unwrapNode(e)
}
}
return nil
}
Expand Down Expand Up @@ -226,7 +231,7 @@ func (tab *Table) refresh() <-chan struct{} {
return done
}

// loop schedules refresh, revalidate runs and coordinates shutdown.
// loop schedules runs of doRefresh, doRevalidate and copyLiveNodes.
func (tab *Table) loop() {
var (
revalidate = time.NewTimer(tab.nextRevalidateTime())
Expand Down Expand Up @@ -288,9 +293,8 @@ loop:
close(tab.closed)
}

// doRefresh performs a lookup for a random target to keep buckets
// full. seed nodes are inserted if the table is empty (initial
// bootstrap or discarded faulty peers).
// doRefresh performs a lookup for a random target to keep buckets full. seed nodes are
// inserted if the table is empty (initial bootstrap or discarded faulty peers).
func (tab *Table) doRefresh(done chan struct{}) {
defer close(done)

Expand Down Expand Up @@ -324,8 +328,8 @@ func (tab *Table) loadSeedNodes() {
}
}

// doRevalidate checks that the last node in a random bucket is still live
// and replaces or deletes the node if it isn't.
// doRevalidate checks that the last node in a random bucket is still live and replaces or
// deletes the node if it isn't.
func (tab *Table) doRevalidate(done chan<- struct{}) {
defer func() { done <- struct{}{} }()

Expand All @@ -336,7 +340,17 @@ func (tab *Table) doRevalidate(done chan<- struct{}) {
}

// Ping the selected node and wait for a pong.
err := tab.net.ping(unwrapNode(last))
remoteSeq, err := tab.net.ping(unwrapNode(last))

// Also fetch record if the node replied and returned a higher sequence number.
if last.Seq() < remoteSeq {
n, err := tab.net.requestENR(unwrapNode(last))
if err != nil {
tab.log.Debug("ENR request failed", "id", last.ID(), "addr", last.addr(), "err", err)
} else {
last = &node{Node: *n, addedAt: last.addedAt, livenessChecks: last.livenessChecks}
}
}

tab.mutex.Lock()
defer tab.mutex.Unlock()
Expand Down
28 changes: 28 additions & 0 deletions p2p/discover/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,34 @@ func TestTable_addSeenNode(t *testing.T) {
checkIPLimitInvariant(t, tab)
}

// This test checks that ENR updates happen during revalidation. If a node in the table
// announces a new sequence number, the new record should be pulled.
func TestTable_revalidateSyncRecord(t *testing.T) {
transport := newPingRecorder()
tab, db := newTestTable(transport)
<-tab.initDone
defer db.Close()
defer tab.close()

// Insert a node.
var r enr.Record
r.Set(enr.IP(net.IP{127, 0, 0, 1}))
id := enode.ID{1}
n1 := wrapNode(enode.SignNull(&r, id))
tab.addSeenNode(n1)

// Update the node record.
r.Set(enr.WithEntry("foo", "bar"))
n2 := enode.SignNull(&r, id)
transport.updateRecord(n2)

tab.doRevalidate(make(chan struct{}, 1))
intable := tab.getNode(id)
if !reflect.DeepEqual(intable, n2) {
t.Fatalf("table contains old record with seq %d, want seq %d", intable.Seq(), n2.Seq())
}
}

// gen wraps quick.Value so it's easier to use.
// it generates a random value of the given value's type.
func gen(typ interface{}, rand *rand.Rand) interface{} {
Expand Down
48 changes: 32 additions & 16 deletions p2p/discover/table_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func fillTable(tab *Table, nodes []*node) {
type pingRecorder struct {
mu sync.Mutex
dead, pinged map[enode.ID]bool
records map[enode.ID]*enode.Node
n *enode.Node
}

Expand All @@ -107,38 +108,53 @@ func newPingRecorder() *pingRecorder {
n := enode.SignNull(&r, enode.ID{})

return &pingRecorder{
dead: make(map[enode.ID]bool),
pinged: make(map[enode.ID]bool),
n: n,
dead: make(map[enode.ID]bool),
pinged: make(map[enode.ID]bool),
records: make(map[enode.ID]*enode.Node),
n: n,
}
}

func (t *pingRecorder) Self() *enode.Node {
return nullNode
// setRecord updates a node record. Future calls to ping and
// requestENR will return this record.
func (t *pingRecorder) updateRecord(n *enode.Node) {
t.mu.Lock()
defer t.mu.Unlock()
t.records[n.ID()] = n
}

func (t *pingRecorder) ping(n *enode.Node) error {
// Stubs to satisfy the transport interface.
func (t *pingRecorder) Self() *enode.Node { return nullNode }
func (t *pingRecorder) lookupSelf() []*enode.Node { return nil }
func (t *pingRecorder) lookupRandom() []*enode.Node { return nil }
func (t *pingRecorder) close() {}

// ping simulates a ping request.
func (t *pingRecorder) ping(n *enode.Node) (seq uint64, err error) {
t.mu.Lock()
defer t.mu.Unlock()

t.pinged[n.ID()] = true
if t.dead[n.ID()] {
return errTimeout
} else {
return nil
return 0, errTimeout
}
if t.records[n.ID()] != nil {
seq = t.records[n.ID()].Seq()
}
return seq, nil
}

func (t *pingRecorder) lookupSelf() []*enode.Node {
return nil
}
// requestENR simulates an ENR request.
func (t *pingRecorder) requestENR(n *enode.Node) (*enode.Node, error) {
t.mu.Lock()
defer t.mu.Unlock()

func (t *pingRecorder) lookupRandom() []*enode.Node {
return nil
if t.dead[n.ID()] || t.records[n.ID()] == nil {
return nil, errTimeout
}
return t.records[n.ID()], nil
}

func (t *pingRecorder) close() {}

func hasDuplicates(slice []*node) bool {
seen := make(map[enode.ID]bool)
for i, e := range slice {
Expand Down
Loading

0 comments on commit 350a87d

Please sign in to comment.