Skip to content

Commit

Permalink
Peer Selection for Validation
Browse files Browse the repository at this point in the history
  • Loading branch information
aarshkshah1992 committed Feb 14, 2020
1 parent 32e8f8f commit daa33aa
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 60 deletions.
42 changes: 20 additions & 22 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ const (
PeerStateMissing
)

type peerInfo struct {
// PeerInfo holds all related information for a peer in the K-Bucket.
type PeerInfo struct {
Id peer.ID
State PeerState
}
Expand All @@ -36,17 +37,14 @@ func newBucket() *bucket {
return b
}

// filters the peers using the given predicate
// caller SHOULD NOT modify the returned peer objects
func (b *bucket) filter(f func(peerInfo) bool) []*peerInfo {
// returns all peers in the bucket
func (b *bucket) peers() []PeerInfo {
b.lk.RLock()
defer b.lk.RUnlock()
var ps []*peerInfo
var ps []PeerInfo
for e := b.list.Front(); e != nil; e = e.Next() {
p := e.Value.(*peerInfo)
if f(*p) {
ps = append(ps, p)
}
p := e.Value.(PeerInfo)
ps = append(ps, p)
}
return ps
}
Expand All @@ -57,32 +55,32 @@ func (b *bucket) peerIds() []peer.ID {
defer b.lk.RUnlock()
ps := make([]peer.ID, 0, b.list.Len())
for e := b.list.Front(); e != nil; e = e.Next() {
p := e.Value.(*peerInfo)
p := e.Value.(PeerInfo)
ps = append(ps, p.Id)
}
return ps
}

// returns the peer with the given Id or nil if not found.
// caller SHOULD NOT modify the returned peer object.
func (b *bucket) getPeer(p peer.ID) *peerInfo {
// returns the peer with the given Id and true if peer exists
// returns false if the peerId does not exist
func (b *bucket) getPeer(p peer.ID) (PeerInfo, bool) {
b.lk.RLock()
defer b.lk.RUnlock()
for e := b.list.Front(); e != nil; e = e.Next() {
if e.Value.(*peerInfo).Id == p {
return e.Value.(*peerInfo)
if e.Value.(PeerInfo).Id == p {
return e.Value.(PeerInfo), true
}
}
return nil
return PeerInfo{}, false
}

// replaces the peer based on the Id.
// returns true if the replace was successful, false otherwise.
func (b *bucket) replace(p *peerInfo) bool {
func (b *bucket) replace(p PeerInfo) bool {
b.lk.Lock()
defer b.lk.Unlock()
for e := b.list.Front(); e != nil; e = e.Next() {
if e.Value.(*peerInfo).Id == p.Id {
if e.Value.(PeerInfo).Id == p.Id {
b.list.Remove(e)
b.list.PushBack(p)
return true
Expand All @@ -97,7 +95,7 @@ func (b *bucket) remove(id peer.ID) bool {
b.lk.Lock()
defer b.lk.Unlock()
for e := b.list.Front(); e != nil; e = e.Next() {
if e.Value.(*peerInfo).Id == id {
if e.Value.(PeerInfo).Id == id {
b.list.Remove(e)
return true
}
Expand All @@ -110,13 +108,13 @@ func (b *bucket) moveToFront(id peer.ID) {
defer b.lk.Unlock()

for e := b.list.Front(); e != nil; e = e.Next() {
if e.Value.(*peerInfo).Id == id {
if e.Value.(PeerInfo).Id == id {
b.list.MoveToFront(e)
}
}
}

func (b *bucket) pushFront(p *peerInfo) {
func (b *bucket) pushFront(p PeerInfo) {
b.lk.Lock()
b.list.PushFront(p)
b.lk.Unlock()
Expand All @@ -140,7 +138,7 @@ func (b *bucket) split(cpl int, target ID) *bucket {
newbuck.list = out
e := b.list.Front()
for e != nil {
peerID := ConvertPeerID(e.Value.(*peerInfo).Id)
peerID := ConvertPeerID(e.Value.(PeerInfo).Id)
peerCPL := CommonPrefixLen(peerID, target)
if peerCPL > cpl {
cur := e
Expand Down
27 changes: 24 additions & 3 deletions opts/options.go → options.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package opts
package kbucket

import (
"fmt"
Expand All @@ -11,6 +11,7 @@ type Option func(*Options) error
// Options is a structure containing all the functional options that can be used when constructing a Routing Table.
type Options struct {
TableCleanup struct {
PeersForValidationFnc PeerSelectionFnc
PeerValidationTimeout time.Duration
Interval time.Duration
}
Expand All @@ -26,6 +27,14 @@ func (o *Options) Apply(opts ...Option) error {
return nil
}

// PeersForValidationFnc configures the function that will be used to select the peers that need to be validated during cleanup.
func PeersForValidationFnc(f PeerSelectionFnc) Option {
return func(o *Options) error {
o.TableCleanup.PeersForValidationFnc = f
return nil
}
}

// TableCleanupInterval configures the interval between two runs of the Routing Table cleanup routine.
func TableCleanupInterval(i time.Duration) Option {
return func(o *Options) error {
Expand All @@ -34,8 +43,8 @@ func TableCleanupInterval(i time.Duration) Option {
}
}

// TableCleanupPeerValidationTimeout sets the timeout for a single peer validation during cleanup.
func TableCleanupPeerValidationTimeout(timeout time.Duration) Option {
// PeerValidationTimeout sets the timeout for a single peer validation during cleanup.
func PeerValidationTimeout(timeout time.Duration) Option {
return func(o *Options) error {
o.TableCleanup.PeerValidationTimeout = timeout
return nil
Expand All @@ -47,5 +56,17 @@ func TableCleanupPeerValidationTimeout(timeout time.Duration) Option {
var Defaults = func(o *Options) error {
o.TableCleanup.PeerValidationTimeout = 30 * time.Second
o.TableCleanup.Interval = 2 * time.Minute

// default selector function selects all peers that are in missing state.
o.TableCleanup.PeersForValidationFnc = func(peers []PeerInfo) []PeerInfo {
var selectedPeers []PeerInfo
for _, p := range peers {
if p.State == PeerStateMissing {
selectedPeers = append(selectedPeers, p)
}
}
return selectedPeers
}

return nil
}
2 changes: 1 addition & 1 deletion sorting.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (pds *peerDistanceSorter) appendPeer(p peer.ID) {
// Append the peer.ID values in the list to the sorter's slice. It may no longer be sorted.
func (pds *peerDistanceSorter) appendPeersFromList(l *list.List) {
for e := l.Front(); e != nil; e = e.Next() {
pds.appendPeer(e.Value.(*peerInfo).Id)
pds.appendPeer(e.Value.(PeerInfo).Id)
}
}

Expand Down
31 changes: 18 additions & 13 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/libp2p/go-libp2p-core/peerstore"

logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-kbucket/opts"
mh "github.com/multiformats/go-multihash"
)

Expand All @@ -23,6 +22,10 @@ var log = logging.Logger("table")
var ErrPeerRejectedHighLatency = errors.New("peer rejected; latency too high")
var ErrPeerRejectedNoCapacity = errors.New("peer rejected; insufficient capacity")

// PeerSelectionFnc is the signature of a function that selects zero or more peers from the given peers
// based on some criteria.
type PeerSelectionFnc func(peers []PeerInfo) []PeerInfo

// PeerValidationFnc is the signature of a function that determines the validity a peer for Routing Table membership.
type PeerValidationFnc func(ctx context.Context, p peer.ID) bool

Expand Down Expand Up @@ -74,15 +77,17 @@ type RoutingTable struct {
peerValidationTimeout time.Duration
// interval between two runs of the table cleanup routine
tableCleanupInterval time.Duration
// function to select peers that need to be validated
peersForValidationFnc PeerSelectionFnc
}

// NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance.
// Passing a nil PeerValidationFnc disables periodic table cleanup.
func NewRoutingTable(ctx context.Context, bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics,
peerValidationFnc PeerValidationFnc, options ...opts.Option) (*RoutingTable, error) {
peerValidationFnc PeerValidationFnc, options ...Option) (*RoutingTable, error) {

var cfg opts.Options
if err := cfg.Apply(append([]opts.Option{opts.Defaults}, options...)...); err != nil {
var cfg Options
if err := cfg.Apply(append([]Option{Defaults}, options...)...); err != nil {
return nil, err
}

Expand All @@ -101,6 +106,7 @@ func NewRoutingTable(ctx context.Context, bucketsize int, localID ID, latency ti
PeerAdded: func(peer.ID) {},

PeerValidationFnc: peerValidationFnc,
peersForValidationFnc: cfg.TableCleanup.PeersForValidationFnc,
peerValidationTimeout: cfg.TableCleanup.PeerValidationTimeout,
tableCleanupInterval: cfg.TableCleanup.Interval,
}
Expand Down Expand Up @@ -170,16 +176,15 @@ func (rt *RoutingTable) validatePeer(p peer.ID) bool {
}

// returns the peers that need to be validated.
func (rt *RoutingTable) peersToValidate() []*peerInfo {
func (rt *RoutingTable) peersToValidate() []PeerInfo {
rt.tabLock.RLock()
defer rt.tabLock.RUnlock()

var missingPeers []*peerInfo
var peers []PeerInfo
for _, b := range rt.Buckets {
m := b.filter(func(p peerInfo) bool { return p.State == PeerStateMissing })
missingPeers = append(missingPeers, m...)
peers = append(peers, b.peers()...)
}
return missingPeers
return rt.peersForValidationFnc(peers)
}

// GetTrackedCplsForRefresh returns the Cpl's we are tracking for refresh.
Expand Down Expand Up @@ -245,7 +250,7 @@ func (rt *RoutingTable) HandlePeerDisconnect(p peer.ID) {
// mark the peer as missing
bucketId := rt.bucketIdForPeer(p)
b := rt.Buckets[bucketId]
if peer := b.getPeer(p); peer != nil {
if peer, has := b.getPeer(p); has {
peer.State = PeerStateMissing
b.replace(peer)
}
Expand All @@ -260,7 +265,7 @@ func (rt *RoutingTable) HandlePeerAlive(p peer.ID) (evicted peer.ID, err error)

bucketID := rt.bucketIdForPeer(p)
bucket := rt.Buckets[bucketID]
if peer := bucket.getPeer(p); peer != nil {
if peer, has := bucket.getPeer(p); has {
// mark the peer as active if it was missing
if peer.State == PeerStateMissing {
peer.State = PeerStateActive
Expand All @@ -281,7 +286,7 @@ func (rt *RoutingTable) HandlePeerAlive(p peer.ID) (evicted peer.ID, err error)

// We have enough space in the bucket (whether spawned or grouped).
if bucket.len() < rt.bucketsize {
bucket.pushFront(&peerInfo{p, PeerStateActive})
bucket.pushFront(PeerInfo{p, PeerStateActive})
rt.PeerAdded(p)
return "", nil
}
Expand All @@ -295,7 +300,7 @@ func (rt *RoutingTable) HandlePeerAlive(p peer.ID) (evicted peer.ID, err error)

// push the peer only if the bucket isn't overflowing after slitting
if bucket.len() < rt.bucketsize {
bucket.pushFront(&peerInfo{p, PeerStateActive})
bucket.pushFront(PeerInfo{p, PeerStateActive})
rt.PeerAdded(p)
return "", nil
}
Expand Down
42 changes: 21 additions & 21 deletions table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/test"

"github.com/libp2p/go-libp2p-kbucket/opts"
pstore "github.com/libp2p/go-libp2p-peerstore"
"github.com/stretchr/testify/require"
)
Expand All @@ -27,21 +26,31 @@ func TestBucket(t *testing.T) {
peers := make([]peer.ID, 100)
for i := 0; i < 100; i++ {
peers[i] = test.RandPeerIDFatal(t)
b.pushFront(&peerInfo{peers[i], PeerStateActive})
b.pushFront(PeerInfo{peers[i], PeerStateActive})
}

local := test.RandPeerIDFatal(t)
localID := ConvertPeerID(local)

infos := b.peers()
require.Len(t, infos, 100)

i := rand.Intn(len(peers))
if b.getPeer(peers[i]) == nil {
t.Errorf("Failed to find peer: %v", peers[i])
}
p, has := b.getPeer(peers[i])
require.True(t, has)
require.Equal(t, peers[i], p.Id)
require.Equal(t, PeerStateActive, p.State)

// replace
require.True(t, b.replace(PeerInfo{peers[i], PeerStateMissing}))
p, has = b.getPeer(peers[i])
require.True(t, has)
require.Equal(t, PeerStateMissing, p.State)

spl := b.split(0, ConvertPeerID(local))
llist := b.list
for e := llist.Front(); e != nil; e = e.Next() {
p := ConvertPeerID(e.Value.(*peerInfo).Id)
p := ConvertPeerID(e.Value.(PeerInfo).Id)
cpl := CommonPrefixLen(p, localID)
if cpl > 0 {
t.Fatalf("split failed. found id with cpl > 0 in 0 bucket")
Expand All @@ -50,23 +59,12 @@ func TestBucket(t *testing.T) {

rlist := spl.list
for e := rlist.Front(); e != nil; e = e.Next() {
p := ConvertPeerID(e.Value.(*peerInfo).Id)
p := ConvertPeerID(e.Value.(PeerInfo).Id)
cpl := CommonPrefixLen(p, localID)
if cpl == 0 {
t.Fatalf("split failed. found id with cpl == 0 in non 0 bucket")
}
}

// test filter
p1 := test.RandPeerIDFatal(t)
p2 := test.RandPeerIDFatal(t)
b.pushFront(&peerInfo{p1, PeerStateMissing})
b.pushFront(&peerInfo{p2, PeerStateMissing})

infos := b.filter(func(p peerInfo) bool { return p.State == PeerStateMissing })
require.Len(t, infos, 2)
require.Equal(t, p2, infos[0].Id)
require.Equal(t, p1, infos[1].Id)
}

func TestGenRandPeerID(t *testing.T) {
Expand Down Expand Up @@ -224,15 +222,17 @@ func TestHandlePeerDisconnect(t *testing.T) {

// verify it's active
rt.tabLock.Lock()
bp := rt.Buckets[0].getPeer(p)
bp, has := rt.Buckets[0].getPeer(p)
require.True(t, has)
require.NotNil(t, bp)
require.Equal(t, PeerStateActive, bp.State)
rt.tabLock.Unlock()

//now mark it as disconnected & verify it's in missing state
rt.HandlePeerDisconnect(p)
rt.tabLock.Lock()
bp = rt.Buckets[0].getPeer(p)
bp, has = rt.Buckets[0].getPeer(p)
require.True(t, has)
require.NotNil(t, bp)
require.Equal(t, PeerStateMissing, bp.State)
rt.tabLock.Unlock()
Expand Down Expand Up @@ -573,7 +573,7 @@ func TestTableCleanup(t *testing.T) {

// create RT with a very short cleanup interval
rt, err := NewRoutingTable(ctx, 3, ConvertPeerID(local), time.Hour, pstore.NewMetrics(), PeerAlwaysValidFnc,
opts.TableCleanupInterval(100*time.Millisecond))
TableCleanupInterval(100*time.Millisecond))
require.NoError(t, err)

// mock peer validation fnc that successfully validates p[1], p[3] & p[5]
Expand Down

0 comments on commit daa33aa

Please sign in to comment.