Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bootstrap empty RT and optimize allocs #627

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 54 additions & 2 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"math"
"math/rand"
"sync"
"time"

Expand Down Expand Up @@ -108,6 +109,11 @@ type IpfsDHT struct {
triggerRtRefresh chan chan<- error
triggerSelfLookup chan chan<- error

// A set of bootstrap peers to fallback on if all other attempts to fix
// the routing table fail (or, e.g., this is the first time this node is
// connecting to the network).
bootstrapPeers []peer.AddrInfo

maxRecordAge time.Duration

// Allows disabling dht subsystems. These should _only_ be set on
Expand Down Expand Up @@ -262,7 +268,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
triggerSelfLookup: make(chan chan<- error),
queryPeerFilter: cfg.queryPeerFilter,
routingTablePeerFilter: cfg.routingTable.peerFilter,
fixLowPeersChan: make(chan struct{}),
fixLowPeersChan: make(chan struct{}, 1),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, I added this buffer so we'd fix low peers if we got a "fix low peers" signal while bootstrapping.

}

// construct routing table
Expand All @@ -272,6 +278,12 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
}
dht.routingTable = rt

// parse the bootstrap peers.
dht.bootstrapPeers, err = peer.AddrInfosFromP2pAddrs(cfg.bootstrapPeers...)
if err != nil {
return nil, err
}

// create a DHT proc with the given context
dht.proc = goprocessctx.WithContext(ctx)

Expand Down Expand Up @@ -323,14 +335,19 @@ func (dht *IpfsDHT) Mode() ModeOpt {
return dht.auto
}

// fixLowPeers tries to get more peers into the routing table if we're below the threshold
// fixLowPeersRoutine tries to get more peers into the routing table if we're below the threshold
func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) {
timer := time.NewTimer(periodicBootstrapInterval)
defer timer.Stop()

for {
select {
case <-dht.fixLowPeersChan:
case <-timer.C:
case <-proc.Closing():
return
}

if dht.routingTable.Size() > minRTRefreshThreshold {
continue
}
Expand All @@ -339,6 +356,41 @@ func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) {
dht.peerFound(dht.Context(), p, false)
}

if dht.routingTable.Size() == 0 {
if len(dht.bootstrapPeers) == 0 {
// No point in continuing, we have no peers!
continue
}

found := 0
for _, i := range rand.Perm(len(dht.bootstrapPeers)) {
ai := dht.bootstrapPeers[i]
err := dht.Host().Connect(dht.Context(), ai)
if err == nil {
found++
} else {
logger.Warnw("failed to bootstrap", "peer", ai.ID, "error", err)
}

// Wait for two bootstrap peers, or try them all.
//
// Why two? In theory, one should be enough
// normally. However, if the network were to
// restart and everyone connected to just one
// bootstrapper, we'll end up with a mostly
// partitioned network.
//
// So we always bootstrap with two random peers.
if found == 2 {
break
}
}

// No point in refreshing the routing table yet. We have
// to wait to identify the peer.
continue
}

if dht.autoRefresh {
select {
case dht.triggerRtRefresh <- nil:
Expand Down
1 change: 1 addition & 0 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var minRTRefreshThreshold = 10

// timeout for pinging one peer
const peerPingTimeout = 10 * time.Second
const periodicBootstrapInterval = 2 * time.Minute

func init() {
for _, s := range []string{
Expand Down
9 changes: 9 additions & 0 deletions dht_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-kad-dht/providers"
record "github.com/libp2p/go-libp2p-record"
ma "github.com/multiformats/go-multiaddr"
)

// ModeOpt describes what mode the dht should operate in
Expand Down Expand Up @@ -60,6 +61,7 @@ type config struct {

// set to true if we're operating in v1 dht compatible mode
v1CompatibleMode bool
bootstrapPeers []ma.Multiaddr
}

func emptyQueryFilter(_ *IpfsDHT, ai peer.AddrInfo) bool { return true }
Expand Down Expand Up @@ -393,3 +395,10 @@ func V1CompatibleMode(enable bool) Option {
return nil
}
}

func BootstrapPeers(addrs ...ma.Multiaddr) Option {
return func(c *config) error {
c.bootstrappers = append(c.bootstrappers, addrs...)
return nil
}
}