diff --git a/dht.go b/dht.go index ac833bd5d..6823a54cf 100644 --- a/dht.go +++ b/dht.go @@ -67,11 +67,10 @@ type IpfsDHT struct { bucketSize int - bootstrapCfg opts.BootstrapConfig - - triggerAutoBootstrap bool - triggerBootstrap chan struct{} - latestSelfWalk time.Time // the last time we looked-up our own peerID in the network + autoRefresh bool + rtRefreshQueryTimeout time.Duration + rtRefreshPeriod time.Duration + triggerRtRefresh chan struct{} } // Assert that IPFS assumptions about interfaces aren't broken. These aren't a @@ -92,7 +91,9 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er return nil, err } dht := makeDHT(ctx, h, cfg.Datastore, cfg.Protocols, cfg.BucketSize) - dht.bootstrapCfg = cfg.BootstrapConfig + dht.autoRefresh = cfg.RoutingTable.AutoRefresh + dht.rtRefreshPeriod = cfg.RoutingTable.RefreshPeriod + dht.rtRefreshQueryTimeout = cfg.RoutingTable.RefreshQueryTimeout // register for network notifs. dht.host.Network().Notify((*netNotifiee)(dht)) @@ -105,14 +106,13 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er dht.proc.AddChild(dht.providers.Process()) dht.Validator = cfg.Validator - dht.triggerAutoBootstrap = cfg.TriggerAutoBootstrap if !cfg.Client { for _, p := range cfg.Protocols { h.SetStreamHandler(p, dht.handleNewStream) } } - dht.startBootstrapping() + dht.startRefreshing() return dht, nil } @@ -163,7 +163,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p routingTable: rt, protocols: protocols, bucketSize: bucketSize, - triggerBootstrap: make(chan struct{}), + triggerRtRefresh: make(chan struct{}), } dht.ctx = dht.newContextWithLocalTags(ctx) diff --git a/dht_bootstrap.go b/dht_bootstrap.go index 914e01cfe..c17407c16 100644 --- a/dht_bootstrap.go +++ b/dht_bootstrap.go @@ -2,9 +2,6 @@ package dht import ( "context" - "fmt" - "strings" - "sync" "time" process "github.com/jbenet/goprocess" @@ -12,12 +9,13 @@ import ( "github.com/libp2p/go-libp2p-core/routing" "github.com/multiformats/go-multiaddr" _ "github.com/multiformats/go-multiaddr-dns" - "github.com/pkg/errors" ) var DefaultBootstrapPeers []multiaddr.Multiaddr -var minRTBootstrapThreshold = 4 +// Minimum number of peers in the routing table. If we drop below this and we +// see a new peer, we trigger a bootstrap round. +var minRTRefreshThreshold = 4 func init() { for _, s := range []string{ @@ -43,71 +41,53 @@ func init() { } } -// Start the bootstrap worker. -func (dht *IpfsDHT) startBootstrapping() error { +// Start the refresh worker. +func (dht *IpfsDHT) startRefreshing() error { // scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period dht.proc.Go(func(proc process.Process) { ctx := processctx.OnClosingContext(proc) - scanInterval := time.NewTicker(dht.bootstrapCfg.BucketPeriod) - defer scanInterval.Stop() + refreshTicker := time.NewTicker(dht.rtRefreshPeriod) + defer refreshTicker.Stop() - // run bootstrap if option is set - if dht.triggerAutoBootstrap { - if err := dht.doBootstrap(ctx, true); err != nil { - logger.Warningf("bootstrap error: %s", err) - } + // refresh if option is set + if dht.autoRefresh { + dht.doRefresh(ctx) } else { - // disable the "auto-bootstrap" ticker so that no more ticks are sent to this channel - scanInterval.Stop() + // disable the "auto-refresh" ticker so that no more ticks are sent to this channel + refreshTicker.Stop() } for { select { - case now := <-scanInterval.C: - walkSelf := now.After(dht.latestSelfWalk.Add(dht.bootstrapCfg.SelfQueryInterval)) - if err := dht.doBootstrap(ctx, walkSelf); err != nil { - logger.Warning("bootstrap error: %s", err) - } - case <-dht.triggerBootstrap: - logger.Infof("triggering a bootstrap: RT has %d peers", dht.routingTable.Size()) - if err := dht.doBootstrap(ctx, true); err != nil { - logger.Warning("bootstrap error: %s", err) - } + case <-refreshTicker.C: + case <-dht.triggerRtRefresh: + logger.Infof("triggering a refresh: RT has %d peers", dht.routingTable.Size()) case <-ctx.Done(): return } + dht.doRefresh(ctx) } }) return nil } -func (dht *IpfsDHT) doBootstrap(ctx context.Context, walkSelf bool) error { - if walkSelf { - if err := dht.selfWalk(ctx); err != nil { - return fmt.Errorf("self walk: error: %s", err) - } - dht.latestSelfWalk = time.Now() - } - - if err := dht.bootstrapBuckets(ctx); err != nil { - return fmt.Errorf("bootstrap buckets: error bootstrapping: %s", err) - } - - return nil +func (dht *IpfsDHT) doRefresh(ctx context.Context) { + dht.selfWalk(ctx) + dht.refreshBuckets(ctx) } -// bootstrapBuckets scans the routing table, and does a random walk on k-buckets that haven't been queried since the given bucket period -func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) error { +// refreshBuckets scans the routing table, and does a random walk on k-buckets that haven't been queried since the given bucket period +func (dht *IpfsDHT) refreshBuckets(ctx context.Context) { doQuery := func(bucketId int, target string, f func(context.Context) error) error { - logger.Infof("starting bootstrap query for bucket %d to %s (routing table size was %d)", + logger.Infof("starting refreshing bucket %d to %s (routing table size was %d)", bucketId, target, dht.routingTable.Size()) defer func() { - logger.Infof("finished bootstrap query for bucket %d to %s (routing table size is now %d)", + logger.Infof("finished refreshing bucket %d to %s (routing table size is now %d)", bucketId, target, dht.routingTable.Size()) }() - queryCtx, cancel := context.WithTimeout(ctx, dht.bootstrapCfg.Timeout) + queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout) defer cancel() err := f(queryCtx) if err == context.DeadlineExceeded && queryCtx.Err() == context.DeadlineExceeded && ctx.Err() == nil { @@ -117,69 +97,58 @@ func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) error { } buckets := dht.routingTable.GetAllBuckets() - var wg sync.WaitGroup - errChan := make(chan error) - + if len(buckets) > 16 { + // Don't bother bootstrapping more than 16 buckets. + // GenRandPeerID can't generate target peer IDs with more than + // 16 bits specified anyways. + buckets = buckets[:16] + } for bucketID, bucket := range buckets { - if time.Since(bucket.RefreshedAt()) > dht.bootstrapCfg.BucketPeriod { - wg.Add(1) - go func(bucketID int, errChan chan<- error) { - defer wg.Done() - // gen rand peer in the bucket - randPeerInBucket := dht.routingTable.GenRandPeerID(bucketID) - - // walk to the generated peer - walkFnc := func(c context.Context) error { - _, err := dht.FindPeer(ctx, randPeerInBucket) - if err == routing.ErrNotFound { - return nil - } - return err - } - - if err := doQuery(bucketID, randPeerInBucket.String(), walkFnc); err != nil { - errChan <- errors.Wrapf(err, "failed to do a random walk on bucket %d", bucketID) - } - }(bucketID, errChan) + if time.Since(bucket.RefreshedAt()) <= dht.rtRefreshPeriod { + continue + } + // gen rand peer in the bucket + randPeerInBucket := dht.routingTable.GenRandPeerID(bucketID) + + // walk to the generated peer + walkFnc := func(c context.Context) error { + _, err := dht.FindPeer(c, randPeerInBucket) + if err == routing.ErrNotFound { + return nil + } + return err } - } - - // wait for all walks to finish & close the error channel - go func() { - wg.Wait() - close(errChan) - }() - // accumulate errors from all go-routines. ensures wait group is completed by reading errChan until closure. - var errStrings []string - for err := range errChan { - errStrings = append(errStrings, err.Error()) - } - if len(errStrings) == 0 { - return nil - } else { - return fmt.Errorf("errors encountered while running bootstrap on RT:\n%s", strings.Join(errStrings, "\n")) + if err := doQuery(bucketID, randPeerInBucket.String(), walkFnc); err != nil { + logger.Warningf("failed to do a random walk on bucket %d: %s", bucketID, err) + } } } // Traverse the DHT toward the self ID -func (dht *IpfsDHT) selfWalk(ctx context.Context) error { - queryCtx, cancel := context.WithTimeout(ctx, dht.bootstrapCfg.Timeout) +func (dht *IpfsDHT) selfWalk(ctx context.Context) { + queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout) defer cancel() _, err := dht.FindPeer(queryCtx, dht.self) if err == routing.ErrNotFound { - return nil + return } - return err + logger.Warningf("failed to query self during routing table refresh: %s", err) } -// Bootstrap tells the DHT to get into a bootstrapped state. +// Bootstrap tells the DHT to get into a bootstrapped state satisfying the +// IpfsRouter interface. // -// Note: the context is ignored. +// This just calls `RefreshRoutingTable`. func (dht *IpfsDHT) Bootstrap(_ context.Context) error { + dht.RefreshRoutingTable() + return nil +} + +// RefreshRoutingTable tells the DHT to refresh it's routing tables. +func (dht *IpfsDHT) RefreshRoutingTable() { select { - case dht.triggerBootstrap <- struct{}{}: + case dht.triggerRtRefresh <- struct{}{}: default: } - return nil } diff --git a/dht_test.go b/dht_test.go index 3611ab4a7..b15e5b0cf 100644 --- a/dht_test.go +++ b/dht_test.go @@ -113,7 +113,7 @@ func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT { bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), opts.Client(client), opts.NamespacedValidator("v", blankValidator{}), - opts.DisableAutoBootstrap(), + opts.DisableAutoRefresh(), ) if err != nil { t.Fatal(err) @@ -191,7 +191,7 @@ func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) { ctx, cancel := context.WithCancel(ctx) defer cancel() - logger.Debugf("Bootstrapping DHTs...") + logger.Debugf("refreshing DHTs routing tables...") // tried async. sequential fares much better. compare: // 100 async https://gist.github.com/jbenet/56d12f0578d5f34810b2 @@ -201,7 +201,7 @@ func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) { start := rand.Intn(len(dhts)) // randomize to decrease bias. for i := range dhts { dht := dhts[(start+i)%len(dhts)] - dht.Bootstrap(ctx) + dht.RefreshRoutingTable() } } @@ -639,7 +639,7 @@ func printRoutingTables(dhts []*IpfsDHT) { } } -func TestBootstrap(t *testing.T) { +func TestRefresh(t *testing.T) { if testing.Short() { t.SkipNow() } @@ -689,7 +689,7 @@ func TestBootstrap(t *testing.T) { } } -func TestBootstrapBelowMinRTThreshold(t *testing.T) { +func TestRefreshBelowMinRTThreshold(t *testing.T) { ctx := context.Background() // enable auto bootstrap on A @@ -721,7 +721,7 @@ func TestBootstrapBelowMinRTThreshold(t *testing.T) { connect(t, ctx, dhtB, dhtC) // we ONLY init bootstrap on A - dhtA.Bootstrap(ctx) + dhtA.RefreshRoutingTable() // and wait for one round to complete i.e. A should be connected to both B & C waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 2, 2, 20*time.Second) @@ -749,7 +749,7 @@ func TestBootstrapBelowMinRTThreshold(t *testing.T) { assert.Equal(t, dhtE.self, dhtA.routingTable.Find(dhtE.self), "A's routing table should have peer E!") } -func TestPeriodicBootstrap(t *testing.T) { +func TestPeriodicRefresh(t *testing.T) { if ci.IsRunning() { t.Skip("skipping on CI. highly timing dependent") } @@ -795,7 +795,7 @@ func TestPeriodicBootstrap(t *testing.T) { t.Logf("bootstrapping them so they find each other. %d", nDHTs) for _, dht := range dhts { - go dht.Bootstrap(ctx) + dht.RefreshRoutingTable() } // this is async, and we dont know when it's finished with one cycle, so keep checking @@ -1428,7 +1428,7 @@ func TestGetSetPluggedProtocol(t *testing.T) { opts.Protocols("/esh/dht"), opts.Client(false), opts.NamespacedValidator("v", blankValidator{}), - opts.DisableAutoBootstrap(), + opts.DisableAutoRefresh(), } dhtA, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), os...) @@ -1467,7 +1467,7 @@ func TestGetSetPluggedProtocol(t *testing.T) { opts.Protocols("/esh/dht"), opts.Client(false), opts.NamespacedValidator("v", blankValidator{}), - opts.DisableAutoBootstrap(), + opts.DisableAutoRefresh(), }...) if err != nil { t.Fatal(err) @@ -1477,7 +1477,7 @@ func TestGetSetPluggedProtocol(t *testing.T) { opts.Protocols("/lsr/dht"), opts.Client(false), opts.NamespacedValidator("v", blankValidator{}), - opts.DisableAutoBootstrap(), + opts.DisableAutoRefresh(), }...) if err != nil { t.Fatal(err) diff --git a/ext_test.go b/ext_test.go index fb920506f..91d54d9af 100644 --- a/ext_test.go +++ b/ext_test.go @@ -30,7 +30,7 @@ func TestGetFailures(t *testing.T) { } hosts := mn.Hosts() - os := []opts.Option{opts.DisableAutoBootstrap()} + os := []opts.Option{opts.DisableAutoRefresh()} d, err := New(ctx, hosts[0], os...) if err != nil { t.Fatal(err) @@ -152,7 +152,7 @@ func TestNotFound(t *testing.T) { } hosts := mn.Hosts() - os := []opts.Option{opts.DisableAutoBootstrap()} + os := []opts.Option{opts.DisableAutoRefresh()} d, err := New(ctx, hosts[0], os...) if err != nil { t.Fatal(err) @@ -232,7 +232,7 @@ func TestLessThanKResponses(t *testing.T) { } hosts := mn.Hosts() - os := []opts.Option{opts.DisableAutoBootstrap()} + os := []opts.Option{opts.DisableAutoRefresh()} d, err := New(ctx, hosts[0], os...) if err != nil { t.Fatal(err) @@ -302,7 +302,7 @@ func TestMultipleQueries(t *testing.T) { t.Fatal(err) } hosts := mn.Hosts() - os := []opts.Option{opts.DisableAutoBootstrap()} + os := []opts.Option{opts.DisableAutoRefresh()} d, err := New(ctx, hosts[0], os...) if err != nil { t.Fatal(err) diff --git a/go.mod b/go.mod index c2ea5bb0a..07127b8d5 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,6 @@ go 1.12 require ( github.com/gogo/protobuf v1.3.0 - github.com/google/uuid v1.1.1 github.com/hashicorp/golang-lru v0.5.3 github.com/ipfs/go-cid v0.0.3 github.com/ipfs/go-datastore v0.1.0 @@ -25,7 +24,6 @@ require ( github.com/multiformats/go-multiaddr v0.0.4 github.com/multiformats/go-multiaddr-dns v0.0.3 github.com/multiformats/go-multistream v0.1.0 - github.com/pkg/errors v0.8.1 github.com/stretchr/testify v1.3.0 github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc go.opencensus.io v0.22.1 diff --git a/go.sum b/go.sum index 309201147..f52fccee2 100644 --- a/go.sum +++ b/go.sum @@ -395,8 +395,6 @@ golang.org/x/tools v0.0.0-20181130052023-1c3d964395ce/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= -golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc= -golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898 h1:/atklqdjdhuosWIl6AIbOeHJjicWYPqR9bpxqxYG2pA= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= diff --git a/notif.go b/notif.go index 7c47215c3..87ba1a300 100644 --- a/notif.go +++ b/notif.go @@ -32,11 +32,11 @@ func (nn *netNotifiee) Connected(n network.Network, v network.Conn) { dht.plk.Lock() defer dht.plk.Unlock() if dht.host.Network().Connectedness(p) == network.Connected { - bootstrap := dht.routingTable.Size() <= minRTBootstrapThreshold + refresh := dht.routingTable.Size() <= minRTRefreshThreshold dht.Update(dht.Context(), p) - if bootstrap && dht.triggerAutoBootstrap { + if refresh && dht.autoRefresh { select { - case dht.triggerBootstrap <- struct{}{}: + case dht.triggerRtRefresh <- struct{}{}: default: } } @@ -78,11 +78,11 @@ func (nn *netNotifiee) testConnection(v network.Conn) { dht.plk.Lock() defer dht.plk.Unlock() if dht.host.Network().Connectedness(p) == network.Connected { - bootstrap := dht.routingTable.Size() <= minRTBootstrapThreshold + refresh := dht.routingTable.Size() <= minRTRefreshThreshold dht.Update(dht.Context(), p) - if bootstrap && dht.triggerAutoBootstrap { + if refresh && dht.autoRefresh { select { - case dht.triggerBootstrap <- struct{}{}: + case dht.triggerRtRefresh <- struct{}{}: default: } } diff --git a/opts/options.go b/opts/options.go index 39a29cced..8da33eeac 100644 --- a/opts/options.go +++ b/opts/options.go @@ -19,22 +19,19 @@ var ( DefaultProtocols = []protocol.ID{ProtocolDHT} ) -// BootstrapConfig specifies parameters used for bootstrapping the DHT. -type BootstrapConfig struct { - BucketPeriod time.Duration // how long to wait for a k-bucket to be queried before doing a random walk on it - Timeout time.Duration // how long to wait for a bootstrap query to run - SelfQueryInterval time.Duration // how often to query for self -} - // Options is a structure containing all the options that can be used when constructing a DHT. type Options struct { - Datastore ds.Batching - Validator record.Validator - Client bool - Protocols []protocol.ID - BucketSize int - BootstrapConfig BootstrapConfig - TriggerAutoBootstrap bool + Datastore ds.Batching + Validator record.Validator + Client bool + Protocols []protocol.ID + BucketSize int + + RoutingTable struct { + RefreshQueryTimeout time.Duration + RefreshPeriod time.Duration + AutoRefresh bool + } } // Apply applies the given options to this Option @@ -59,24 +56,31 @@ var Defaults = func(o *Options) error { o.Datastore = dssync.MutexWrap(ds.NewMapDatastore()) o.Protocols = DefaultProtocols - o.BootstrapConfig = BootstrapConfig{ - // same as that mentioned in the kad dht paper - BucketPeriod: 1 * time.Hour, + o.RoutingTable.RefreshQueryTimeout = 10 * time.Second + o.RoutingTable.RefreshPeriod = 1 * time.Hour + o.RoutingTable.AutoRefresh = true - Timeout: 10 * time.Second, + return nil +} - SelfQueryInterval: 1 * time.Hour, +// RoutingTableRefreshQueryTimeout sets the timeout for routing table refresh +// queries. +func RoutingTableRefreshQueryTimeout(timeout time.Duration) Option { + return func(o *Options) error { + o.RoutingTable.RefreshQueryTimeout = timeout + return nil } - - o.TriggerAutoBootstrap = true - - return nil } -// Bootstrap configures the dht bootstrapping process -func Bootstrap(b BootstrapConfig) Option { +// RoutingTableRefreshPeriod sets the period for refreshing buckets in the +// routing table. The DHT will refresh buckets every period by: +// +// 1. First searching for nearby peers to figure out how many buckets we should try to fill. +// 1. Then searching for a random key in each bucket that hasn't been queried in +// the last refresh period. +func RoutingTableRefreshPeriod(period time.Duration) Option { return func(o *Options) error { - o.BootstrapConfig = b + o.RoutingTable.RefreshPeriod = period return nil } } @@ -149,12 +153,12 @@ func BucketSize(bucketSize int) Option { } } -// DisableAutoBootstrap completely disables 'auto-bootstrap' on the Dht -// This means that neither will we do periodic bootstrap nor will we -// bootstrap the Dht even if the Routing Table size goes below the minimum threshold -func DisableAutoBootstrap() Option { +// DisableAutoRefresh completely disables 'auto-refresh' on the DHT routing +// table. This means that we will neither refresh the routing table periodically +// nor when the routing table size goes below the minimum threshold. +func DisableAutoRefresh() Option { return func(o *Options) error { - o.TriggerAutoBootstrap = false + o.RoutingTable.AutoRefresh = false return nil } }