Skip to content

Commit

Permalink
fix(bootstrap): bootstrap sequentially
Browse files Browse the repository at this point in the history
The default timeout is 10s so this won't take that long anyways. On the
other hand, if we do this all at once, we max the swarms dial queue.
  • Loading branch information
Stebalien committed Nov 5, 2019
1 parent f904d43 commit d1ff842
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 55 deletions.
70 changes: 19 additions & 51 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,13 @@ package dht

import (
"context"
"fmt"
"strings"
"sync"
"time"

process "github.com/jbenet/goprocess"
processctx "github.com/jbenet/goprocess/context"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/multiformats/go-multiaddr"
_ "github.com/multiformats/go-multiaddr-dns"
"github.com/pkg/errors"
)

var DefaultBootstrapPeers []multiaddr.Multiaddr
Expand Down Expand Up @@ -76,16 +72,12 @@ func (dht *IpfsDHT) startBootstrapping() error {
}

func (dht *IpfsDHT) doBootstrap(ctx context.Context) {
if err := dht.selfWalk(ctx); err != nil {
logger.Warningf("error while bootstrapping self: %s", err)
}
if err := dht.bootstrapBuckets(ctx); err != nil {
logger.Warningf("error while bootstrapping buckets: %s", err)
}
dht.selfWalk(ctx)
dht.bootstrapBuckets(ctx)
}

// bootstrapBuckets scans the routing table, and does a random walk on k-buckets that haven't been queried since the given bucket period
func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) error {
func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) {
doQuery := func(bucketId int, target string, f func(context.Context) error) error {
logger.Infof("starting bootstrap query for bucket %d to %s (routing table size was %d)",
bucketId, target, dht.routingTable.Size())
Expand All @@ -103,60 +95,36 @@ func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) error {
}

buckets := dht.routingTable.GetAllBuckets()
var wg sync.WaitGroup
errChan := make(chan error)

for bucketID, bucket := range buckets {
if time.Since(bucket.RefreshedAt()) > dht.bootstrapPeriod {
wg.Add(1)
go func(bucketID int, errChan chan<- error) {
defer wg.Done()
// gen rand peer in the bucket
randPeerInBucket := dht.routingTable.GenRandPeerID(bucketID)

// walk to the generated peer
walkFnc := func(c context.Context) error {
_, err := dht.FindPeer(ctx, randPeerInBucket)
if err == routing.ErrNotFound {
return nil
}
return err
// gen rand peer in the bucket
randPeerInBucket := dht.routingTable.GenRandPeerID(bucketID)

// walk to the generated peer
walkFnc := func(c context.Context) error {
_, err := dht.FindPeer(ctx, randPeerInBucket)
if err == routing.ErrNotFound {
return nil
}
return err
}

if err := doQuery(bucketID, randPeerInBucket.String(), walkFnc); err != nil {
errChan <- errors.Wrapf(err, "failed to do a random walk on bucket %d", bucketID)
}
}(bucketID, errChan)
if err := doQuery(bucketID, randPeerInBucket.String(), walkFnc); err != nil {
logger.Warningf("failed to do a random walk on bucket %d", bucketID)
}
}
}

// wait for all walks to finish & close the error channel
go func() {
wg.Wait()
close(errChan)
}()

// accumulate errors from all go-routines. ensures wait group is completed by reading errChan until closure.
var errStrings []string
for err := range errChan {
errStrings = append(errStrings, err.Error())
}
if len(errStrings) == 0 {
return nil
} else {
return fmt.Errorf("errors encountered while running bootstrap on RT:\n%s", strings.Join(errStrings, "\n"))
}
}

// Traverse the DHT toward the self ID
func (dht *IpfsDHT) selfWalk(ctx context.Context) error {
func (dht *IpfsDHT) selfWalk(ctx context.Context) {
queryCtx, cancel := context.WithTimeout(ctx, dht.bootstrapTimeout)
defer cancel()
_, err := dht.FindPeer(queryCtx, dht.self)
if err == routing.ErrNotFound {
return nil
return
}
return err
logger.Warningf("failed to bootstrap self: %s", err)
}

// Bootstrap tells the DHT to get into a bootstrapped state.
Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ go 1.12

require (
github.com/gogo/protobuf v1.3.0
github.com/google/uuid v1.1.1
github.com/hashicorp/golang-lru v0.5.3
github.com/ipfs/go-cid v0.0.3
github.com/ipfs/go-datastore v0.1.0
Expand All @@ -25,7 +24,6 @@ require (
github.com/multiformats/go-multiaddr v0.0.4
github.com/multiformats/go-multiaddr-dns v0.0.3
github.com/multiformats/go-multistream v0.1.0
github.com/pkg/errors v0.8.1
github.com/stretchr/testify v1.3.0
github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc
go.opencensus.io v0.22.1
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,6 @@ golang.org/x/tools v0.0.0-20181130052023-1c3d964395ce/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898 h1:/atklqdjdhuosWIl6AIbOeHJjicWYPqR9bpxqxYG2pA=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
Expand Down

0 comments on commit d1ff842

Please sign in to comment.