Skip to content

Commit

Permalink
feat(bootstrap): simplify bootstrapping
Browse files Browse the repository at this point in the history
* Rename triggerAutoBootstrap to autoBootstrap. This variable used to control
_triggering_ only but now completely disables automatic bootstrapping.
* Remove the BootstrapConfig. We introduced this before we switched to
functional options. Now that we're breaking the interfaces anyways, we might as
well use functional options all the way (easier to extend).
* Always query self (feedback from @raulk).
* Important: don't abort the bootstrap process if we timeout finding ourselves.
  • Loading branch information
Stebalien committed Nov 5, 2019
1 parent 4fd6498 commit f904d43
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 62 deletions.
14 changes: 7 additions & 7 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,10 @@ type IpfsDHT struct {

bucketSize int

bootstrapCfg opts.BootstrapConfig

triggerAutoBootstrap bool
triggerBootstrap chan struct{}
latestSelfWalk time.Time // the last time we looked-up our own peerID in the network
autoBootstrap bool
bootstrapTimeout time.Duration
bootstrapPeriod time.Duration
triggerBootstrap chan struct{}
}

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
Expand All @@ -92,7 +91,9 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
return nil, err
}
dht := makeDHT(ctx, h, cfg.Datastore, cfg.Protocols, cfg.BucketSize)
dht.bootstrapCfg = cfg.BootstrapConfig
dht.autoBootstrap = cfg.AutoBootstrap
dht.bootstrapPeriod = cfg.BootstrapPeriod
dht.bootstrapTimeout = cfg.BootstrapTimeout

// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))
Expand All @@ -105,7 +106,6 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er

dht.proc.AddChild(dht.providers.Process())
dht.Validator = cfg.Validator
dht.triggerAutoBootstrap = cfg.TriggerAutoBootstrap

if !cfg.Client {
for _, p := range cfg.Protocols {
Expand Down
38 changes: 12 additions & 26 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,53 +49,39 @@ func (dht *IpfsDHT) startBootstrapping() error {
dht.proc.Go(func(proc process.Process) {
ctx := processctx.OnClosingContext(proc)

scanInterval := time.NewTicker(dht.bootstrapCfg.BucketPeriod)
scanInterval := time.NewTicker(dht.bootstrapPeriod)
defer scanInterval.Stop()

// run bootstrap if option is set
if dht.triggerAutoBootstrap {
if err := dht.doBootstrap(ctx, true); err != nil {
logger.Warningf("bootstrap error: %s", err)
}
if dht.autoBootstrap {
dht.doBootstrap(ctx)
} else {
// disable the "auto-bootstrap" ticker so that no more ticks are sent to this channel
scanInterval.Stop()
}

for {
select {
case now := <-scanInterval.C:
walkSelf := now.After(dht.latestSelfWalk.Add(dht.bootstrapCfg.SelfQueryInterval))
if err := dht.doBootstrap(ctx, walkSelf); err != nil {
logger.Warning("bootstrap error: %s", err)
}
case <-scanInterval.C:
case <-dht.triggerBootstrap:
logger.Infof("triggering a bootstrap: RT has %d peers", dht.routingTable.Size())
if err := dht.doBootstrap(ctx, true); err != nil {
logger.Warning("bootstrap error: %s", err)
}
case <-ctx.Done():
return
}
dht.doBootstrap(ctx)
}
})

return nil
}

func (dht *IpfsDHT) doBootstrap(ctx context.Context, walkSelf bool) error {
if walkSelf {
if err := dht.selfWalk(ctx); err != nil {
return fmt.Errorf("self walk: error: %s", err)
}
dht.latestSelfWalk = time.Now()
func (dht *IpfsDHT) doBootstrap(ctx context.Context) {
if err := dht.selfWalk(ctx); err != nil {
logger.Warningf("error while bootstrapping self: %s", err)
}

if err := dht.bootstrapBuckets(ctx); err != nil {
return fmt.Errorf("bootstrap buckets: error bootstrapping: %s", err)
logger.Warningf("error while bootstrapping buckets: %s", err)
}

return nil
}

// bootstrapBuckets scans the routing table, and does a random walk on k-buckets that haven't been queried since the given bucket period
Expand All @@ -107,7 +93,7 @@ func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) error {
logger.Infof("finished bootstrap query for bucket %d to %s (routing table size is now %d)",
bucketId, target, dht.routingTable.Size())
}()
queryCtx, cancel := context.WithTimeout(ctx, dht.bootstrapCfg.Timeout)
queryCtx, cancel := context.WithTimeout(ctx, dht.bootstrapTimeout)
defer cancel()
err := f(queryCtx)
if err == context.DeadlineExceeded && queryCtx.Err() == context.DeadlineExceeded && ctx.Err() == nil {
Expand All @@ -121,7 +107,7 @@ func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) error {
errChan := make(chan error)

for bucketID, bucket := range buckets {
if time.Since(bucket.RefreshedAt()) > dht.bootstrapCfg.BucketPeriod {
if time.Since(bucket.RefreshedAt()) > dht.bootstrapPeriod {
wg.Add(1)
go func(bucketID int, errChan chan<- error) {
defer wg.Done()
Expand Down Expand Up @@ -164,7 +150,7 @@ func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) error {

// Traverse the DHT toward the self ID
func (dht *IpfsDHT) selfWalk(ctx context.Context) error {
queryCtx, cancel := context.WithTimeout(ctx, dht.bootstrapCfg.Timeout)
queryCtx, cancel := context.WithTimeout(ctx, dht.bootstrapTimeout)
defer cancel()
_, err := dht.FindPeer(queryCtx, dht.self)
if err == routing.ErrNotFound {
Expand Down
4 changes: 2 additions & 2 deletions notif.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (nn *netNotifiee) Connected(n network.Network, v network.Conn) {
if dht.host.Network().Connectedness(p) == network.Connected {
bootstrap := dht.routingTable.Size() <= minRTBootstrapThreshold
dht.Update(dht.Context(), p)
if bootstrap && dht.triggerAutoBootstrap {
if bootstrap && dht.autoBootstrap {
select {
case dht.triggerBootstrap <- struct{}{}:
default:
Expand Down Expand Up @@ -80,7 +80,7 @@ func (nn *netNotifiee) testConnection(v network.Conn) {
if dht.host.Network().Connectedness(p) == network.Connected {
bootstrap := dht.routingTable.Size() <= minRTBootstrapThreshold
dht.Update(dht.Context(), p)
if bootstrap && dht.triggerAutoBootstrap {
if bootstrap && dht.autoBootstrap {
select {
case dht.triggerBootstrap <- struct{}{}:
default:
Expand Down
50 changes: 23 additions & 27 deletions opts/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,17 @@ var (
DefaultProtocols = []protocol.ID{ProtocolDHT}
)

// BootstrapConfig specifies parameters used for bootstrapping the DHT.
type BootstrapConfig struct {
BucketPeriod time.Duration // how long to wait for a k-bucket to be queried before doing a random walk on it
Timeout time.Duration // how long to wait for a bootstrap query to run
SelfQueryInterval time.Duration // how often to query for self
}

// Options is a structure containing all the options that can be used when constructing a DHT.
type Options struct {
Datastore ds.Batching
Validator record.Validator
Client bool
Protocols []protocol.ID
BucketSize int
BootstrapConfig BootstrapConfig
TriggerAutoBootstrap bool
Datastore ds.Batching
Validator record.Validator
Client bool
Protocols []protocol.ID
BucketSize int

BootstrapTimeout time.Duration
BootstrapPeriod time.Duration
AutoBootstrap bool
}

// Apply applies the given options to this Option
Expand All @@ -59,24 +54,25 @@ var Defaults = func(o *Options) error {
o.Datastore = dssync.MutexWrap(ds.NewMapDatastore())
o.Protocols = DefaultProtocols

o.BootstrapConfig = BootstrapConfig{
// same as that mentioned in the kad dht paper
BucketPeriod: 1 * time.Hour,
o.BootstrapTimeout = 10 * time.Second
o.BootstrapPeriod = 1 * time.Hour
o.AutoBootstrap = true

Timeout: 10 * time.Second,
return nil
}

SelfQueryInterval: 1 * time.Hour,
// BootstrapTimeout sets the timeout for bootstrap queries.
func BootstrapTimeout(timeout time.Duration) Option {
return func(o *Options) error {
o.BootstrapTimeout = timeout
return nil
}

o.TriggerAutoBootstrap = true

return nil
}

// Bootstrap configures the dht bootstrapping process
func Bootstrap(b BootstrapConfig) Option {
// BootstrapTimeout sets the timeout for bootstrap queries.
func BootstrapPeriod(timeout time.Duration) Option {
return func(o *Options) error {
o.BootstrapConfig = b
o.BootstrapPeriod = period
return nil
}
}
Expand Down Expand Up @@ -154,7 +150,7 @@ func BucketSize(bucketSize int) Option {
// bootstrap the Dht even if the Routing Table size goes below the minimum threshold
func DisableAutoBootstrap() Option {
return func(o *Options) error {
o.TriggerAutoBootstrap = false
o.AutoBootstrap = false
return nil
}
}

0 comments on commit f904d43

Please sign in to comment.