Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix and simplify some bootstrapping logic #405

Merged
merged 10 commits into from
Nov 6, 2019
14 changes: 7 additions & 7 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,10 @@ type IpfsDHT struct {

bucketSize int

bootstrapCfg opts.BootstrapConfig

triggerAutoBootstrap bool
triggerBootstrap chan struct{}
latestSelfWalk time.Time // the last time we looked-up our own peerID in the network
autoBootstrap bool
bootstrapTimeout time.Duration
bootstrapPeriod time.Duration
triggerBootstrap chan struct{}
}

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
Expand All @@ -92,7 +91,9 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
return nil, err
}
dht := makeDHT(ctx, h, cfg.Datastore, cfg.Protocols, cfg.BucketSize)
dht.bootstrapCfg = cfg.BootstrapConfig
dht.autoBootstrap = cfg.AutoBootstrap
dht.bootstrapPeriod = cfg.BootstrapPeriod
dht.bootstrapTimeout = cfg.BootstrapTimeout

// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))
Expand All @@ -105,7 +106,6 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er

dht.proc.AddChild(dht.providers.Process())
dht.Validator = cfg.Validator
dht.triggerAutoBootstrap = cfg.TriggerAutoBootstrap

if !cfg.Client {
for _, p := range cfg.Protocols {
Expand Down
108 changes: 34 additions & 74 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,13 @@ package dht

import (
"context"
"fmt"
"strings"
"sync"
"time"

process "github.com/jbenet/goprocess"
processctx "github.com/jbenet/goprocess/context"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/multiformats/go-multiaddr"
_ "github.com/multiformats/go-multiaddr-dns"
"github.com/pkg/errors"
)

var DefaultBootstrapPeers []multiaddr.Multiaddr
Expand Down Expand Up @@ -49,65 +45,47 @@ func (dht *IpfsDHT) startBootstrapping() error {
dht.proc.Go(func(proc process.Process) {
ctx := processctx.OnClosingContext(proc)

scanInterval := time.NewTicker(dht.bootstrapCfg.BucketPeriod)
scanInterval := time.NewTicker(dht.bootstrapPeriod)
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
defer scanInterval.Stop()

// run bootstrap if option is set
if dht.triggerAutoBootstrap {
if err := dht.doBootstrap(ctx, true); err != nil {
logger.Warningf("bootstrap error: %s", err)
}
if dht.autoBootstrap {
dht.doBootstrap(ctx)
} else {
// disable the "auto-bootstrap" ticker so that no more ticks are sent to this channel
scanInterval.Stop()
}

for {
select {
case now := <-scanInterval.C:
walkSelf := now.After(dht.latestSelfWalk.Add(dht.bootstrapCfg.SelfQueryInterval))
if err := dht.doBootstrap(ctx, walkSelf); err != nil {
logger.Warning("bootstrap error: %s", err)
}
case <-scanInterval.C:
case <-dht.triggerBootstrap:
logger.Infof("triggering a bootstrap: RT has %d peers", dht.routingTable.Size())
if err := dht.doBootstrap(ctx, true); err != nil {
logger.Warning("bootstrap error: %s", err)
}
case <-ctx.Done():
return
}
dht.doBootstrap(ctx)
}
})

return nil
}

func (dht *IpfsDHT) doBootstrap(ctx context.Context, walkSelf bool) error {
if walkSelf {
if err := dht.selfWalk(ctx); err != nil {
return fmt.Errorf("self walk: error: %s", err)
}
dht.latestSelfWalk = time.Now()
}

if err := dht.bootstrapBuckets(ctx); err != nil {
return fmt.Errorf("bootstrap buckets: error bootstrapping: %s", err)
}

return nil
func (dht *IpfsDHT) doBootstrap(ctx context.Context) {
dht.selfWalk(ctx)
dht.bootstrapBuckets(ctx)
}

// bootstrapBuckets scans the routing table, and does a random walk on k-buckets that haven't been queried since the given bucket period
func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) error {
func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) {
doQuery := func(bucketId int, target string, f func(context.Context) error) error {
logger.Infof("starting bootstrap query for bucket %d to %s (routing table size was %d)",
bucketId, target, dht.routingTable.Size())
defer func() {
logger.Infof("finished bootstrap query for bucket %d to %s (routing table size is now %d)",
bucketId, target, dht.routingTable.Size())
}()
queryCtx, cancel := context.WithTimeout(ctx, dht.bootstrapCfg.Timeout)
queryCtx, cancel := context.WithTimeout(ctx, dht.bootstrapTimeout)
defer cancel()
err := f(queryCtx)
if err == context.DeadlineExceeded && queryCtx.Err() == context.DeadlineExceeded && ctx.Err() == nil {
Expand All @@ -117,60 +95,42 @@ func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) error {
}

buckets := dht.routingTable.GetAllBuckets()
var wg sync.WaitGroup
errChan := make(chan error)

if len(buckets) > 16 {
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
// Don't bother bootstrapping more than 16 buckets.
// GenRandPeerID can't generate target peer IDs with more than
// 16 bits specified anyways.
buckets = buckets[:16]
}
for bucketID, bucket := range buckets {
if time.Since(bucket.RefreshedAt()) > dht.bootstrapCfg.BucketPeriod {
wg.Add(1)
go func(bucketID int, errChan chan<- error) {
defer wg.Done()
// gen rand peer in the bucket
randPeerInBucket := dht.routingTable.GenRandPeerID(bucketID)

// walk to the generated peer
walkFnc := func(c context.Context) error {
_, err := dht.FindPeer(ctx, randPeerInBucket)
if err == routing.ErrNotFound {
return nil
}
return err
if time.Since(bucket.RefreshedAt()) > dht.bootstrapPeriod {
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
// gen rand peer in the bucket
randPeerInBucket := dht.routingTable.GenRandPeerID(bucketID)

// walk to the generated peer
walkFnc := func(c context.Context) error {
_, err := dht.FindPeer(ctx, randPeerInBucket)
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
if err == routing.ErrNotFound {
return nil
}
return err
}

if err := doQuery(bucketID, randPeerInBucket.String(), walkFnc); err != nil {
errChan <- errors.Wrapf(err, "failed to do a random walk on bucket %d", bucketID)
}
}(bucketID, errChan)
if err := doQuery(bucketID, randPeerInBucket.String(), walkFnc); err != nil {
logger.Warningf("failed to do a random walk on bucket %d", bucketID)
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

// wait for all walks to finish & close the error channel
go func() {
wg.Wait()
close(errChan)
}()

// accumulate errors from all go-routines. ensures wait group is completed by reading errChan until closure.
var errStrings []string
for err := range errChan {
errStrings = append(errStrings, err.Error())
}
if len(errStrings) == 0 {
return nil
} else {
return fmt.Errorf("errors encountered while running bootstrap on RT:\n%s", strings.Join(errStrings, "\n"))
}
}

// Traverse the DHT toward the self ID
func (dht *IpfsDHT) selfWalk(ctx context.Context) error {
queryCtx, cancel := context.WithTimeout(ctx, dht.bootstrapCfg.Timeout)
func (dht *IpfsDHT) selfWalk(ctx context.Context) {
queryCtx, cancel := context.WithTimeout(ctx, dht.bootstrapTimeout)
defer cancel()
_, err := dht.FindPeer(queryCtx, dht.self)
if err == routing.ErrNotFound {
return nil
return
}
return err
logger.Warningf("failed to bootstrap self: %s", err)
}

// Bootstrap tells the DHT to get into a bootstrapped state.
Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ go 1.12

require (
github.com/gogo/protobuf v1.3.0
github.com/google/uuid v1.1.1
github.com/hashicorp/golang-lru v0.5.3
github.com/ipfs/go-cid v0.0.3
github.com/ipfs/go-datastore v0.1.0
Expand All @@ -25,7 +24,6 @@ require (
github.com/multiformats/go-multiaddr v0.0.4
github.com/multiformats/go-multiaddr-dns v0.0.3
github.com/multiformats/go-multistream v0.1.0
github.com/pkg/errors v0.8.1
github.com/stretchr/testify v1.3.0
github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc
go.opencensus.io v0.22.1
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,6 @@ golang.org/x/tools v0.0.0-20181130052023-1c3d964395ce/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898 h1:/atklqdjdhuosWIl6AIbOeHJjicWYPqR9bpxqxYG2pA=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
Expand Down
4 changes: 2 additions & 2 deletions notif.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (nn *netNotifiee) Connected(n network.Network, v network.Conn) {
if dht.host.Network().Connectedness(p) == network.Connected {
bootstrap := dht.routingTable.Size() <= minRTBootstrapThreshold
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
dht.Update(dht.Context(), p)
if bootstrap && dht.triggerAutoBootstrap {
if bootstrap && dht.autoBootstrap {
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
select {
case dht.triggerBootstrap <- struct{}{}:
default:
Expand Down Expand Up @@ -80,7 +80,7 @@ func (nn *netNotifiee) testConnection(v network.Conn) {
if dht.host.Network().Connectedness(p) == network.Connected {
bootstrap := dht.routingTable.Size() <= minRTBootstrapThreshold
dht.Update(dht.Context(), p)
if bootstrap && dht.triggerAutoBootstrap {
if bootstrap && dht.autoBootstrap {
select {
case dht.triggerBootstrap <- struct{}{}:
default:
Expand Down
55 changes: 28 additions & 27 deletions opts/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,17 @@ var (
DefaultProtocols = []protocol.ID{ProtocolDHT}
)

// BootstrapConfig specifies parameters used for bootstrapping the DHT.
type BootstrapConfig struct {
BucketPeriod time.Duration // how long to wait for a k-bucket to be queried before doing a random walk on it
Timeout time.Duration // how long to wait for a bootstrap query to run
SelfQueryInterval time.Duration // how often to query for self
}

// Options is a structure containing all the options that can be used when constructing a DHT.
type Options struct {
Datastore ds.Batching
Validator record.Validator
Client bool
Protocols []protocol.ID
BucketSize int
BootstrapConfig BootstrapConfig
TriggerAutoBootstrap bool
Datastore ds.Batching
Validator record.Validator
Client bool
Protocols []protocol.ID
BucketSize int

BootstrapTimeout time.Duration
BootstrapPeriod time.Duration
AutoBootstrap bool
}

// Apply applies the given options to this Option
Expand All @@ -59,24 +54,30 @@ var Defaults = func(o *Options) error {
o.Datastore = dssync.MutexWrap(ds.NewMapDatastore())
o.Protocols = DefaultProtocols

o.BootstrapConfig = BootstrapConfig{
// same as that mentioned in the kad dht paper
BucketPeriod: 1 * time.Hour,
o.BootstrapTimeout = 10 * time.Second
o.BootstrapPeriod = 1 * time.Hour
o.AutoBootstrap = true

Timeout: 10 * time.Second,
return nil
}

SelfQueryInterval: 1 * time.Hour,
// BootstrapTimeout sets the timeout for bootstrap queries.
func BootstrapTimeout(timeout time.Duration) Option {
return func(o *Options) error {
o.BootstrapTimeout = timeout
return nil
}

o.TriggerAutoBootstrap = true

return nil
}

// Bootstrap configures the dht bootstrapping process
func Bootstrap(b BootstrapConfig) Option {
// BootstrapPeriod sets the period for bootstrapping. The DHT will bootstrap
// every bootstrap period by:
//
// 1. First searching for nearby peers to figure out how many buckets we should try to fill.
// 1. Then searching for a random key in each bucket that hasn't been queried in
// the last bootstrap period.
func BootstrapPeriod(period time.Duration) Option {
return func(o *Options) error {
o.BootstrapConfig = b
o.BootstrapPeriod = period
return nil
}
}
Expand Down Expand Up @@ -154,7 +155,7 @@ func BucketSize(bucketSize int) Option {
// bootstrap the Dht even if the Routing Table size goes below the minimum threshold
func DisableAutoBootstrap() Option {
return func(o *Options) error {
o.TriggerAutoBootstrap = false
o.AutoBootstrap = false
return nil
}
}