Skip to content

Commit

Permalink
fix: don't add unresponsive DHT servers to the Routing Table (#820)
Browse files Browse the repository at this point in the history
* added check to avoid adding unresponsive dht peers to the dht routing table

* removed lock in adding peers to the rt

* made variable names more meaningful

* fixed network loop and corrected tests

* added UsefulPeer() references from current PR

* go mod tidy

* added delay in TestRefreshBelowMinRTThreshold

* addressed review

* go mod tidy

* addressed Jorropo review

* added comments

* removed state of peers probed recently

* fix conflicts merge

* updated deps

* added optimizations documentation

* Update dht.go

* updated md files

---------

Co-authored-by: Jorropo <jorropo.pgm@gmail.com>
(cherry picked from commit 8c9fdff)
  • Loading branch information
guillaumemichel authored and Jorropo committed Jun 12, 2023
1 parent f4e8e86 commit d373974
Show file tree
Hide file tree
Showing 15 changed files with 287 additions and 931 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

- [Install](#install)
- [Usage](#usage)
- [Optimizations](#optimizations)
- [Contribute](#contribute)
- [Maintainers](#maintainers)
- [License](#license)
Expand All @@ -21,6 +22,10 @@
go get github.com/libp2p/go-libp2p-kad-dht
```

## Optimizations

Client-side optimizations are described in [optimizations.md](./optimizations.md)

## Usage

Go to https://godoc.org/github.com/libp2p/go-libp2p-kad-dht.
Expand Down
110 changes: 62 additions & 48 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ const (
protectedBuckets = 2
)

type addPeerRTReq struct {
p peer.ID
queryPeer bool
}

// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base Routing module.
type IpfsDHT struct {
Expand Down Expand Up @@ -128,6 +123,9 @@ type IpfsDHT struct {

autoRefresh bool

// timeout for the lookupCheck operation
lookupCheckTimeout time.Duration

// A function returning a set of bootstrap peers to fallback on if all other attempts to fix
// the routing table fail (or, e.g., this is the first time this node is
// connecting to the network).
Expand All @@ -143,7 +141,7 @@ type IpfsDHT struct {
disableFixLowPeers bool
fixLowPeersChan chan struct{}

addPeerToRTChan chan addPeerRTReq
addPeerToRTChan chan peer.ID
refreshFinishedCh chan struct{}

rtFreezeTimeout time.Duration
Expand Down Expand Up @@ -243,7 +241,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
// Fill routing table with currently connected peers that are DHT servers
dht.plk.Lock()
for _, p := range dht.host.Network().Peers() {
dht.peerFound(p, false)
dht.peerFound(dht.ctx, p)
}
dht.plk.Unlock()

Expand Down Expand Up @@ -304,7 +302,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err

fixLowPeersChan: make(chan struct{}, 1),

addPeerToRTChan: make(chan addPeerRTReq),
addPeerToRTChan: make(chan peer.ID),
refreshFinishedCh: make(chan struct{}),

enableOptProv: cfg.EnableOptimisticProvide,
Expand Down Expand Up @@ -334,6 +332,8 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err
dht.routingTable = rt
dht.bootstrapPeers = cfg.BootstrapPeers

dht.lookupCheckTimeout = cfg.RoutingTable.RefreshQueryTimeout

// init network size estimator
dht.nsEstimator = netsize.NewEstimator(h.ID(), rt, cfg.BucketSize)

Expand Down Expand Up @@ -372,6 +372,18 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err
return dht, nil
}

// lookupCheck performs a lookup request to a remote peer.ID, verifying that it is able to
// answer it correctly
func (dht *IpfsDHT) lookupCheck(ctx context.Context, p peer.ID) error {
// lookup request to p requesting for its own peer.ID
peerids, err := dht.protoMessenger.GetClosestPeers(ctx, p, p)
// p should return at least its own peerid
if err == nil && len(peerids) == 0 {
return fmt.Errorf("peer %s failed to return its closest peers, got %d", p, len(peerids))
}
return err
}

func makeRtRefreshManager(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutboundThreshold time.Duration) (*rtrefresh.RtRefreshManager, error) {
keyGenFnc := func(cpl uint) (string, error) {
p, err := dht.routingTable.GenRandPeerID(cpl)
Expand All @@ -383,16 +395,11 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutb
return err
}

pingFnc := func(ctx context.Context, p peer.ID) error {
_, err := dht.protoMessenger.GetClosestPeers(ctx, p, p) // don't use the PING message type as it's deprecated
return err
}

r, err := rtrefresh.NewRtRefreshManager(
dht.host, dht.routingTable, cfg.RoutingTable.AutoRefresh,
keyGenFnc,
queryFnc,
pingFnc,
dht.lookupCheck,
cfg.RoutingTable.RefreshQueryTimeout,
cfg.RoutingTable.RefreshInterval,
maxLastSuccessfulOutboundThreshold,
Expand Down Expand Up @@ -500,7 +507,7 @@ func (dht *IpfsDHT) fixLowPeers(ctx context.Context) {
// we try to add all peers we are connected to to the Routing Table
// in case they aren't already there.
for _, p := range dht.host.Network().Peers() {
dht.peerFound(p, false)
dht.peerFound(ctx, p)
}

// TODO Active Bootstrapping
Expand Down Expand Up @@ -611,22 +618,22 @@ func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) {
select {
case <-timerCh:
dht.routingTable.MarkAllPeersIrreplaceable()
case addReq := <-dht.addPeerToRTChan:
prevSize := dht.routingTable.Size()
if prevSize == 0 {
case p := <-dht.addPeerToRTChan:
if dht.routingTable.Size() == 0 {
isBootsrapping = true
bootstrapCount = 0
timerCh = nil
}
newlyAdded, err := dht.routingTable.TryAddPeer(addReq.p, addReq.queryPeer, isBootsrapping)
// queryPeer set to true as we only try to add queried peers to the RT
newlyAdded, err := dht.routingTable.TryAddPeer(p, true, isBootsrapping)
if err != nil {
// peer not added.
continue
}
if !newlyAdded && addReq.queryPeer {
if !newlyAdded {
// the peer is already in our RT, but we just successfully queried it and so let's give it a
// bump on the query time so we don't ping it too soon for a liveliness check.
dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(addReq.p, time.Now())
dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(p, time.Now())
}
case <-dht.refreshFinishedCh:
bootstrapCount = bootstrapCount + 1
Expand All @@ -646,40 +653,47 @@ func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) {
}
}

// peerFound signals the routingTable that we've found a peer that
// might support the DHT protocol.
// If we have a connection a peer but no exchange of a query RPC ->
//
// LastQueriedAt=time.Now (so we don't ping it for some time for a liveliness check)
// LastUsefulAt=0
//
// If we connect to a peer and then exchange a query RPC ->
//
// LastQueriedAt=time.Now (same reason as above)
// LastUsefulAt=time.Now (so we give it some life in the RT without immediately evicting it)
//
// If we query a peer we already have in our Routing Table ->
//
// LastQueriedAt=time.Now()
// LastUsefulAt remains unchanged
//
// If we connect to a peer we already have in the RT but do not exchange a query (rare)
//
// Do Nothing.
func (dht *IpfsDHT) peerFound(p peer.ID, queryPeer bool) {

if c := baseLogger.Check(zap.DebugLevel, "peer found"); c != nil {
c.Write(zap.String("peer", p.String()))
// peerFound verifies whether the found peer advertises DHT protocols
// and probe it to make sure it answers DHT queries as expected. If
// it fails to answer, it isn't added to the routingTable.
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID) {
// if the peer is already in the routing table or the appropriate bucket is
// already full, don't try to add the new peer.ID
if dht.routingTable.Find(p) != "" || !dht.routingTable.UsefulPeer(p) {

This comment has been minimized.

Copy link
@Jorropo

Jorropo Jun 13, 2023

Contributor

This aquire the same mutex twice back to back for no reason, leading to a potential logical race, dht.routingTable.Find(p) != "" needs to be moved inside .UsefulPeer (I would consider renaming it .UsefullNewPeer then).

This comment has been minimized.

Copy link
@guillaumemichel

guillaumemichel Jun 14, 2023

Author Contributor
return
}

// verify whether the remote peer advertises the right dht protocol
b, err := dht.validRTPeer(p)
if err != nil {
logger.Errorw("failed to validate if peer is a DHT peer", "peer", p, "error", err)
} else if b {
select {
case dht.addPeerToRTChan <- addPeerRTReq{p, queryPeer}:
case <-dht.ctx.Done():

livelinessCtx, cancel := context.WithTimeout(ctx, dht.lookupCheckTimeout)
defer cancel()

// performing a FIND_NODE query
if err := dht.lookupCheck(livelinessCtx, p); err != nil {

This comment has been minimized.

Copy link
@Jorropo

Jorropo Jun 13, 2023

Contributor

Callers are now expected to call this in it's own goroutine.
This is wastefull if we already have the peer in the routing table (as we do that for all messages received).

The callers should call this synchronously and only if the node is not in the routing table we then start a new goroutine for lookupCheck.

This also needs to be bounded.

logger.Debugw("connected peer not answering DHT request as expected", "peer", p, "error", err)
return
}

// if the FIND_NODE succeeded, the peer is considered as valid
dht.validPeerFound(ctx, p)
}
}

// validPeerFound signals the routingTable that we've found a peer that
// supports the DHT protocol, and just answered correctly to a DHT FindPeers
func (dht *IpfsDHT) validPeerFound(ctx context.Context, p peer.ID) {
if c := baseLogger.Check(zap.DebugLevel, "peer found"); c != nil {
c.Write(zap.String("peer", p.String()))
}

select {
case dht.addPeerToRTChan <- p:
case <-dht.ctx.Done():
return
}
}

Expand Down
4 changes: 2 additions & 2 deletions dht_bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ func TestBootstrappersReplacable(t *testing.T) {
require.NoError(t, d.host.Network().ClosePeer(d5.self))
connectNoSync(t, ctx, d, d1)
connectNoSync(t, ctx, d, d5)
d.peerFound(d5.self, true)
d.peerFound(d1.self, true)
d.peerFound(ctx, d5.self)
d.peerFound(ctx, d1.self)
time.Sleep(1 * time.Second)

require.Len(t, d.routingTable.ListPeers(), 2)
Expand Down
11 changes: 9 additions & 2 deletions dht_filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dht
import (
"context"
"net"
"sync/atomic"
"testing"

ic "github.com/libp2p/go-libp2p/core/crypto"
Expand Down Expand Up @@ -31,12 +32,17 @@ func TestIsRelay(t *testing.T) {
type mockConn struct {
local peer.AddrInfo
remote peer.AddrInfo

isClosed atomic.Bool
}

var _ network.Conn = (*mockConn)(nil)

func (m *mockConn) ID() string { return "0" }
func (m *mockConn) Close() error { return nil }
func (m *mockConn) ID() string { return "0" }
func (m *mockConn) Close() error {
m.isClosed.Store(true)
return nil
}
func (m *mockConn) NewStream(context.Context) (network.Stream, error) { return nil, nil }
func (m *mockConn) GetStreams() []network.Stream { return []network.Stream{} }
func (m *mockConn) Stat() network.ConnStats {
Expand All @@ -50,6 +56,7 @@ func (m *mockConn) LocalPrivateKey() ic.PrivKey { return nil }
func (m *mockConn) RemotePeer() peer.ID { return m.remote.ID }
func (m *mockConn) RemotePublicKey() ic.PubKey { return nil }
func (m *mockConn) ConnState() network.ConnectionState { return network.ConnectionState{} }
func (m *mockConn) IsClosed() bool { return m.isClosed.Load() }

func TestFilterCaching(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
Expand Down
6 changes: 4 additions & 2 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,10 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
return false
}

// a peer has queried us, let's add it to RT
dht.peerFound(mPeer, true)
// a peer has queried us, let's add it to RT. A new go routine is required
// because we can't block the stream handler until the remote peer answers
// our query.
go dht.peerFound(dht.ctx, mPeer)

This comment has been minimized.

Copy link
@Jorropo

Jorropo Jun 13, 2023

Contributor

This is unbounded work and needs to be fixed.

This comment has been minimized.

Copy link
@Jorropo

Jorropo Jun 13, 2023

Contributor

Also I'm pretty sure this occurrence can be removed, we can't reach this without first opening a connection which would show up in the libp2p subscribe.

This comment has been minimized.

Copy link
@guillaumemichel

guillaumemichel Jun 14, 2023

Author Contributor

As long as there is a new go routine at d373974#r117906670, it shouldn't be a problem 👍🏻


if c := baseLogger.Check(zap.DebugLevel, "handling message"); c != nil {
c.Write(zap.String("from", mPeer.String()),
Expand Down
48 changes: 47 additions & 1 deletion dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,8 @@ func TestRefreshBelowMinRTThreshold(t *testing.T) {
connect(t, ctx, dhtA, dhtD)

// and because of the above bootstrap, A also discovers E !
waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 4, 4, 20*time.Second)
waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 4, 4, 10*time.Second)
time.Sleep(100 * time.Millisecond)
assert.Equal(t, dhtE.self, dhtA.routingTable.Find(dhtE.self), "A's routing table should have peer E!")
}

Expand Down Expand Up @@ -1325,6 +1326,49 @@ func TestClientModeConnect(t *testing.T) {
}
}

func TestInvalidServer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

a := setupDHT(ctx, t, false)
b := setupDHT(ctx, t, true)

// make b advertise all dht server protocols
for _, proto := range a.serverProtocols {
// Hang on every request.
b.host.SetStreamHandler(proto, func(s network.Stream) {
defer s.Reset() // nolint
<-ctx.Done()
})
}

connectNoSync(t, ctx, a, b)

c := testCaseCids[0]
p := peer.ID("TestPeer")
a.ProviderStore().AddProvider(ctx, c.Hash(), peer.AddrInfo{ID: p})
time.Sleep(time.Millisecond * 5) // just in case...

provs, err := b.FindProviders(ctx, c)
if err != nil {
t.Fatal(err)
}

if len(provs) == 0 {
t.Fatal("Expected to get a provider back")
}

if provs[0].ID != p {
t.Fatal("expected it to be our test peer")
}
if a.routingTable.Find(b.self) != "" {
t.Fatal("DHT clients should not be added to routing tables")
}
if b.routingTable.Find(a.self) == "" {
t.Fatal("DHT server should have been added to the dht client's routing table")
}
}

func TestClientModeFindPeer(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down Expand Up @@ -2124,6 +2168,8 @@ func TestPreconnectedNodes(t *testing.T) {
require.NoError(t, err)
defer h2.Close()

connect(t, ctx, d1, d2)

// See if it works
peers, err := d2.GetClosestPeers(ctx, "testkey")
require.NoError(t, err)
Expand Down
Loading

0 comments on commit d373974

Please sign in to comment.