Skip to content

Commit

Permalink
feat: move options to main package and make internals private (#486)
Browse files Browse the repository at this point in the history
* feat: move options to main package and make internals private

Rationale:

1. This allows us to make private options for testing.
2. This removes an import for DHT users.
3. This makes options much easier to discover.
4. This makes it possible to make the config/options internals private.

We originally put them in a sub-package to avoid poluting the root namespace,
but that isn't really necessary.

This keeps the old package (for now) to avoid breaking too much.
  • Loading branch information
Stebalien committed Apr 3, 2020
1 parent d4134a4 commit 30fa086
Show file tree
Hide file tree
Showing 7 changed files with 381 additions and 310 deletions.
55 changes: 27 additions & 28 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"go.opencensus.io/tag"

"github.com/libp2p/go-libp2p-kad-dht/metrics"
opts "github.com/libp2p/go-libp2p-kad-dht/opts"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
"github.com/libp2p/go-libp2p-kad-dht/providers"

Expand Down Expand Up @@ -108,37 +107,37 @@ var (
)

// New creates a new DHT with the specified host and options.
func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, error) {
var cfg opts.Options
if err := cfg.Apply(append([]opts.Option{opts.Defaults}, options...)...); err != nil {
func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) {
var cfg config
if err := cfg.Apply(append([]Option{defaults}, options...)...); err != nil {
return nil, err
}
if cfg.DisjointPaths == 0 {
cfg.DisjointPaths = cfg.BucketSize / 2
if cfg.disjointPaths == 0 {
cfg.disjointPaths = cfg.bucketSize / 2
}
dht := makeDHT(ctx, h, cfg)
dht.autoRefresh = cfg.RoutingTable.AutoRefresh
dht.rtRefreshPeriod = cfg.RoutingTable.RefreshPeriod
dht.rtRefreshQueryTimeout = cfg.RoutingTable.RefreshQueryTimeout
dht.autoRefresh = cfg.routingTable.autoRefresh
dht.rtRefreshPeriod = cfg.routingTable.refreshPeriod
dht.rtRefreshQueryTimeout = cfg.routingTable.refreshQueryTimeout

dht.maxRecordAge = cfg.MaxRecordAge
dht.enableProviders = cfg.EnableProviders
dht.enableValues = cfg.EnableValues
dht.maxRecordAge = cfg.maxRecordAge
dht.enableProviders = cfg.enableProviders
dht.enableValues = cfg.enableValues

dht.Validator = cfg.Validator
dht.Validator = cfg.validator

switch cfg.Mode {
case opts.ModeAuto:
switch cfg.mode {
case ModeAuto:
dht.auto = true
dht.mode = modeClient
case opts.ModeClient:
case ModeClient:
dht.auto = false
dht.mode = modeClient
case opts.ModeServer:
case ModeServer:
dht.auto = false
dht.mode = modeServer
default:
return nil, fmt.Errorf("invalid dht mode %d", cfg.Mode)
return nil, fmt.Errorf("invalid dht mode %d", cfg.mode)
}

if dht.mode == modeServer {
Expand All @@ -164,7 +163,7 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
// IpfsDHT's initialized with this function will respond to DHT requests,
// whereas IpfsDHT's initialized with NewDHTClient will not.
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
dht, err := New(ctx, h, opts.Datastore(dstore))
dht, err := New(ctx, h, Datastore(dstore))
if err != nil {
panic(err)
}
Expand All @@ -176,16 +175,16 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
// requests. If you need a peer to respond to DHT requests, use NewDHT instead.
// NewDHTClient creates a new DHT object with the given peer as the 'local' host
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
dht, err := New(ctx, h, opts.Datastore(dstore), opts.Client(true))
dht, err := New(ctx, h, Datastore(dstore), Client(true))
if err != nil {
panic(err)
}
return dht
}

func makeDHT(ctx context.Context, h host.Host, cfg opts.Options) *IpfsDHT {
func makeDHT(ctx context.Context, h host.Host, cfg config) *IpfsDHT {
self := kb.ConvertPeerID(h.ID())
rt := kb.NewRoutingTable(cfg.BucketSize, self, cfg.RoutingTable.LatencyTolerance, h.Peerstore())
rt := kb.NewRoutingTable(cfg.bucketSize, self, cfg.routingTable.latencyTolerance, h.Peerstore())
cmgr := h.ConnManager()

rt.PeerAdded = func(p peer.ID) {
Expand All @@ -198,18 +197,18 @@ func makeDHT(ctx context.Context, h host.Host, cfg opts.Options) *IpfsDHT {
}

dht := &IpfsDHT{
datastore: cfg.Datastore,
datastore: cfg.datastore,
self: h.ID(),
peerstore: h.Peerstore(),
host: h,
strmap: make(map[peer.ID]*messageSender),
birth: time.Now(),
rng: rand.New(rand.NewSource(rand.Int63())),
routingTable: rt,
protocols: cfg.Protocols,
bucketSize: cfg.BucketSize,
alpha: cfg.Concurrency,
d: cfg.DisjointPaths,
protocols: cfg.protocols,
bucketSize: cfg.bucketSize,
alpha: cfg.concurrency,
d: cfg.disjointPaths,
triggerRtRefresh: make(chan chan<- error),
}

Expand All @@ -221,7 +220,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg opts.Options) *IpfsDHT {
// the DHT context should be done when the process is closed
dht.ctx = goprocessctx.WithProcessClosing(ctxTags, dht.proc)

dht.ProviderManager = providers.NewProviderManager(dht.ctx, h.ID(), cfg.Datastore)
dht.ProviderManager = providers.NewProviderManager(dht.ctx, h.ID(), cfg.datastore)

return dht
}
Expand Down
262 changes: 262 additions & 0 deletions dht_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
package dht

import (
"fmt"
"time"

ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-record"
)

// ModeOpt describes what mode the dht should operate in
type ModeOpt int

const (
// ModeAuto utilizes EvtLocalReachabilityChanged events sent over the event bus to dynamically switch the DHT
// between Client and Server modes based on network conditions
ModeAuto ModeOpt = iota
// ModeClient operates the DHT as a client only, it cannot respond to incoming queries
ModeClient
// ModeServer operates the DHT as a server, it can both send and respond to queries
ModeServer
)

// Options is a structure containing all the options that can be used when constructing a DHT.
type config struct {
datastore ds.Batching
validator record.Validator
mode ModeOpt
protocols []protocol.ID
bucketSize int
disjointPaths int
concurrency int
maxRecordAge time.Duration
enableProviders bool
enableValues bool

routingTable struct {
refreshQueryTimeout time.Duration
refreshPeriod time.Duration
autoRefresh bool
latencyTolerance time.Duration
}
}

// Apply applies the given options to this Option
func (o *config) Apply(opts ...Option) error {
for i, opt := range opts {
if err := opt(o); err != nil {
return fmt.Errorf("dht option %d failed: %s", i, err)
}
}
return nil
}

// Option DHT option type.
type Option func(*config) error

// defaults are the default DHT options. This option will be automatically
// prepended to any options you pass to the DHT constructor.
var defaults = func(o *config) error {
o.validator = record.NamespacedValidator{
"pk": record.PublicKeyValidator{},
}
o.datastore = dssync.MutexWrap(ds.NewMapDatastore())
o.protocols = DefaultProtocols
o.enableProviders = true
o.enableValues = true

o.routingTable.latencyTolerance = time.Minute
o.routingTable.refreshQueryTimeout = 10 * time.Second
o.routingTable.refreshPeriod = 1 * time.Hour
o.routingTable.autoRefresh = true
o.maxRecordAge = time.Hour * 36

o.bucketSize = 20
o.concurrency = 3

return nil
}

// RoutingTableLatencyTolerance sets the maximum acceptable latency for peers
// in the routing table's cluster.
func RoutingTableLatencyTolerance(latency time.Duration) Option {
return func(o *config) error {
o.routingTable.latencyTolerance = latency
return nil
}
}

// RoutingTableRefreshQueryTimeout sets the timeout for routing table refresh
// queries.
func RoutingTableRefreshQueryTimeout(timeout time.Duration) Option {
return func(o *config) error {
o.routingTable.refreshQueryTimeout = timeout
return nil
}
}

// RoutingTableRefreshPeriod sets the period for refreshing buckets in the
// routing table. The DHT will refresh buckets every period by:
//
// 1. First searching for nearby peers to figure out how many buckets we should try to fill.
// 1. Then searching for a random key in each bucket that hasn't been queried in
// the last refresh period.
func RoutingTableRefreshPeriod(period time.Duration) Option {
return func(o *config) error {
o.routingTable.refreshPeriod = period
return nil
}
}

// Datastore configures the DHT to use the specified datastore.
//
// Defaults to an in-memory (temporary) map.
func Datastore(ds ds.Batching) Option {
return func(o *config) error {
o.datastore = ds
return nil
}
}

// Client configures whether or not the DHT operates in client-only mode.
//
// Defaults to false.
func Client(only bool) Option {
return func(o *config) error {
if only {
o.mode = ModeClient
}
return nil
}
}

// Mode configures which mode the DHT operates in (Client, Server, Auto).
//
// Defaults to ModeAuto.
func Mode(m ModeOpt) Option {
return func(o *config) error {
o.mode = m
return nil
}
}

// Validator configures the DHT to use the specified validator.
//
// Defaults to a namespaced validator that can only validate public keys.
func Validator(v record.Validator) Option {
return func(o *config) error {
o.validator = v
return nil
}
}

// NamespacedValidator adds a validator namespaced under `ns`. This option fails
// if the DHT is not using a `record.NamespacedValidator` as it's validator (it
// uses one by default but this can be overridden with the `Validator` option).
//
// Example: Given a validator registered as `NamespacedValidator("ipns",
// myValidator)`, all records with keys starting with `/ipns/` will be validated
// with `myValidator`.
func NamespacedValidator(ns string, v record.Validator) Option {
return func(o *config) error {
nsval, ok := o.validator.(record.NamespacedValidator)
if !ok {
return fmt.Errorf("can only add namespaced validators to a NamespacedValidator")
}
nsval[ns] = v
return nil
}
}

// Protocols sets the protocols for the DHT
//
// Defaults to dht.DefaultProtocols
func Protocols(protocols ...protocol.ID) Option {
return func(o *config) error {
o.protocols = protocols
return nil
}
}

// BucketSize configures the bucket size (k in the Kademlia paper) of the routing table.
//
// The default value is 20.
func BucketSize(bucketSize int) Option {
return func(o *config) error {
o.bucketSize = bucketSize
return nil
}
}

// Concurrency configures the number of concurrent requests (alpha in the Kademlia paper) for a given query path.
//
// The default value is 3.
func Concurrency(alpha int) Option {
return func(o *config) error {
o.concurrency = alpha
return nil
}
}

// DisjointPaths configures the number of disjoint paths (d in the S/Kademlia paper) taken per query.
//
// The default value is BucketSize/2.
func DisjointPaths(d int) Option {
return func(o *config) error {
o.disjointPaths = d
return nil
}
}

// MaxRecordAge specifies the maximum time that any node will hold onto a record ("PutValue record")
// from the time its received. This does not apply to any other forms of validity that
// the record may contain.
// For example, a record may contain an ipns entry with an EOL saying its valid
// until the year 2020 (a great time in the future). For that record to stick around
// it must be rebroadcasted more frequently than once every 'MaxRecordAge'
func MaxRecordAge(maxAge time.Duration) Option {
return func(o *config) error {
o.maxRecordAge = maxAge
return nil
}
}

// DisableAutoRefresh completely disables 'auto-refresh' on the DHT routing
// table. This means that we will neither refresh the routing table periodically
// nor when the routing table size goes below the minimum threshold.
func DisableAutoRefresh() Option {
return func(o *config) error {
o.routingTable.autoRefresh = false
return nil
}
}

// DisableProviders disables storing and retrieving provider records.
//
// Defaults to enabled.
//
// WARNING: do not change this unless you're using a forked DHT (i.e., a private
// network and/or distinct DHT protocols with the `Protocols` option).
func DisableProviders() Option {
return func(o *config) error {
o.enableProviders = false
return nil
}
}

// DisableProviders disables storing and retrieving value records (including
// public keys).
//
// Defaults to enabled.
//
// WARNING: do not change this unless you're using a forked DHT (i.e., a private
// network and/or distinct DHT protocols with the `Protocols` option).
func DisableValues() Option {
return func(o *config) error {
o.enableValues = false
return nil
}
}
Loading

0 comments on commit 30fa086

Please sign in to comment.