Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
aschmahmann committed Mar 9, 2020
1 parent 31a4cb1 commit 0f3efa4
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 34 deletions.
59 changes: 49 additions & 10 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ const (
modeClient = 2
)

const (
kad1 protocol.ID = "/kad/1.0.0"
kad2 protocol.ID = "/kad/2.0.0"
)

// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base Routing module.
type IpfsDHT struct {
Expand Down Expand Up @@ -73,7 +78,11 @@ type IpfsDHT struct {

stripedPutLocks [256]sync.Mutex

protocols, clientProtocols []protocol.ID // DHT protocols
// Primary DHT protocols we query and respond to these protocols
protocols []protocol.ID

// DHT protocols we can respond to (may contain protocols in addition to the primary protocols)
serverProtocols []protocol.ID

auto bool
mode mode
Expand Down Expand Up @@ -112,7 +121,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
if err := cfg.Apply(append([]Option{defaults}, options...)...); err != nil {
return nil, err
}
if err := opts.UnsetDefaults(&cfg); err != nil {
if err := applyFallbacks(&cfg); err != nil {
return nil, err
}

Expand Down Expand Up @@ -183,6 +192,33 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT
return dht
}

// applyFallbacks sets default DHT options. It is applied after Defaults and any options passed to the constructor in
// order to allow for defaults that are based on other set options.
func applyFallbacks(o *opts.Options) error {
if o.DisjointPaths == 0 {
o.DisjointPaths = o.BucketSize / 2
}

if o.ProtocolPrefix == opts.DefaultPrefix {
if o.BucketSize != opts.DefaultBucketSize {
return fmt.Errorf("protocol prefix %s must use bucket size %d", opts.DefaultPrefix, opts.DefaultBucketSize)
}
if !o.EnableProviders {
return fmt.Errorf("protocol prefix %s must have providers enabled", opts.DefaultPrefix)
}
if !o.EnableValues {
return fmt.Errorf("protocol prefix %s must have values enabled", opts.DefaultPrefix)
}
if nsval, ok := o.Validator.(record.NamespacedValidator); !ok {
return fmt.Errorf("protocol prefix %s must use a namespaced validator", opts.DefaultPrefix)
} else if len(nsval) > 2 || nsval["pk"] == nil || nsval["ipns"] == nil {
return fmt.Errorf("protocol prefix %s must support only the /pk and /ipns namespaces", opts.DefaultPrefix)
}
return nil
}

return nil
}
func makeDHT(ctx context.Context, h host.Host, cfg config) *IpfsDHT {
self := kb.ConvertPeerID(h.ID())
rt := kb.NewRoutingTable(cfg.bucketSize, self, cfg.routingTable.latencyTolerance, h.Peerstore())
Expand All @@ -197,6 +233,9 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) *IpfsDHT {
cmgr.UntagPeer(p, "kbucket")
}

protocols := []protocol.ID{cfg.ProtocolPrefix + kad2}
serverProtocols := []protocol.ID{cfg.ProtocolPrefix + kad2, cfg.ProtocolPrefix + kad1}

dht := &IpfsDHT{
datastore: cfg.datastore,
self: h.ID(),
Expand All @@ -206,8 +245,8 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) *IpfsDHT {
birth: time.Now(),
rng: rand.New(rand.NewSource(rand.Int63())),
routingTable: rt,
protocols: cfg.Protocols,
clientProtocols: cfg.ClientProtocols,
protocols: protocols,
serverProtocols: serverProtocols,
bucketSize: cfg.BucketSize,
alpha: cfg.Concurrency,
d: cfg.DisjointPaths,
Expand Down Expand Up @@ -487,20 +526,20 @@ func (dht *IpfsDHT) setMode(m mode) error {

func (dht *IpfsDHT) moveToServerMode() error {
dht.mode = modeServer
for _, p := range dht.protocols {
for _, p := range dht.serverProtocols {
dht.host.SetStreamHandler(p, dht.handleNewStream)
}
return nil
}

func (dht *IpfsDHT) moveToClientMode() error {
dht.mode = modeClient
for _, p := range dht.protocols {
for _, p := range dht.serverProtocols {
dht.host.RemoveStreamHandler(p)
}

pset := make(map[protocol.ID]bool)
for _, p := range dht.protocols {
for _, p := range dht.serverProtocols {
pset[p] = true
}

Expand Down Expand Up @@ -542,9 +581,9 @@ func (dht *IpfsDHT) Close() error {
return dht.proc.Close()
}

func (dht *IpfsDHT) clientProtocolStrs() []string {
pstrs := make([]string, len(dht.clientProtocols))
for idx, proto := range dht.clientProtocols {
func (dht *IpfsDHT) protocolStrs() []string {
pstrs := make([]string, len(dht.protocols))
for idx, proto := range dht.protocols {
pstrs[idx] = string(proto)
}

Expand Down
2 changes: 1 addition & 1 deletion dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func (ms *messageSender) prep(ctx context.Context) error {
return nil
}

nstr, err := ms.dht.host.NewStream(ctx, ms.p, ms.dht.clientProtocols...)
nstr, err := ms.dht.host.NewStream(ctx, ms.p, ms.dht.protocols...)
if err != nil {
return err
}
Expand Down
22 changes: 10 additions & 12 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1559,8 +1559,8 @@ func TestProvideDisabled(t *testing.T) {
var (
optsA, optsB []Option
)
optsA = append(optsA, opts.Protocols("/dht/provMaybeDisabled"))
optsB = append(optsB, opts.Protocols("/dht/provMaybeDisabled"))
optsA = append(optsA, opts.ProtocolPrefix("/provMaybeDisabled"))
optsB = append(optsB, opts.ProtocolPrefix("/provMaybeDisabled"))

if !enabledA {
optsA = append(optsA, DisableProviders())
Expand Down Expand Up @@ -1617,10 +1617,9 @@ func TestProvideDisabled(t *testing.T) {
}

func TestHandleRemotePeerProtocolChanges(t *testing.T) {
proto := protocol.ID("/v1/dht")
ctx := context.Background()
os := []Option{
Protocols(proto),
ProtocolPrefix("/test"),
Mode(ModeServer),
NamespacedValidator("v", blankValidator{}),
DisableAutoRefresh(),
Expand Down Expand Up @@ -1660,7 +1659,7 @@ func TestGetSetPluggedProtocol(t *testing.T) {
defer cancel()

os := []Option{
Protocols("/esh/dht"),
ProtocolPrefix("/esh"),
Mode(ModeServer),
NamespacedValidator("v", blankValidator{}),
DisableAutoRefresh(),
Expand Down Expand Up @@ -1699,7 +1698,7 @@ func TestGetSetPluggedProtocol(t *testing.T) {
defer cancel()

dhtA, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), []Option{
Protocols("/esh/dht"),
ProtocolPrefix("/esh"),
Mode(ModeServer),
NamespacedValidator("v", blankValidator{}),
DisableAutoRefresh(),
Expand All @@ -1709,7 +1708,7 @@ func TestGetSetPluggedProtocol(t *testing.T) {
}

dhtB, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), []Option{
Protocols("/lsr/dht"),
ProtocolPrefix("/lsr"),
Mode(ModeServer),
NamespacedValidator("v", blankValidator{}),
DisableAutoRefresh(),
Expand Down Expand Up @@ -1837,23 +1836,22 @@ func TestProtocolUpgrade(t *testing.T) {
// DHT, but only act as a client of the new DHT. In it's capacity as a server it should also only tell queriers
// about other DHT servers in the new DHT.

protoNew := protocol.ID("/dht/B")
protoOld := protocol.ID("/dht/C")
prefix := opts.ProtocolPrefix(protocol.ID("/test"))

dhtA, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
append([]opts.Option{opts.Protocols(protoNew, protoOld), opts.ClientProtocols(protoNew)}, os...)...)
append([]opts.Option{prefix}, os...)...)
if err != nil {
t.Fatal(err)
}

dhtB, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
append([]opts.Option{opts.Protocols(protoNew, protoOld), opts.ClientProtocols(protoNew)}, os...)...)
append([]opts.Option{prefix}, os...)...)
if err != nil {
t.Fatal(err)
}

dhtC, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
append([]opts.Option{opts.Protocols(protoOld)}, os...)...)
append([]opts.Option{prefix}, os...)...)
if err != nil {
t.Fatal(err)
}
Expand Down
8 changes: 3 additions & 5 deletions opts/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,11 @@ import (
"github.com/libp2p/go-libp2p-record"
)

// Deprecated: The old format did not support more than one message per stream, and is not supported
// or relevant with stream pooling. ProtocolDHT should be used instead.
const ProtocolDHTOld = "/ipfs/dht"
const DefaultPrefix protocol.ID = "/ipfs"

var (
ProtocolDHT = dht.ProtocolDHT
DefaultProtocols = dht.DefaultProtocols
ProtocolDHT protocol.ID = "/ipfs/kad/2.0.0"
DefaultProtocols = []protocol.ID{ProtocolDHT, "/ipfs/kad/1.0.0"}
)

// Deprecated: use dht.RoutingTableLatencyTolerance
Expand Down
4 changes: 2 additions & 2 deletions records_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,8 @@ func TestValuesDisabled(t *testing.T) {
var (
optsA, optsB []Option
)
optsA = append(optsA, dhtopt.Protocols("/dht/valuesMaybeDisabled"))
optsB = append(optsB, dhtopt.Protocols("/dht/valuesMaybeDisabled"))
optsA = append(optsA, dhtopt.ProtocolPrefix("/valuesMaybeDisabled"))
optsB = append(optsB, dhtopt.ProtocolPrefix("/valuesMaybeDisabled"))

if !enabledA {
optsA = append(optsA, DisableValues())
Expand Down
8 changes: 4 additions & 4 deletions subscriber_notifee.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func newSubscriberNotifiee(dht *IpfsDHT) (*subscriberNotifee, error) {
dht.plk.Lock()
defer dht.plk.Unlock()
for _, p := range dht.host.Network().Peers() {
protos, err := dht.peerstore.SupportsProtocols(p, dht.clientProtocolStrs()...)
protos, err := dht.peerstore.SupportsProtocols(p, dht.protocolStrs()...)
if err != nil {
return nil, fmt.Errorf("could not check peerstore for protocol support: err: %s", err)
}
Expand Down Expand Up @@ -110,7 +110,7 @@ func handlePeerIdentificationCompletedEvent(dht *IpfsDHT, e event.EvtPeerIdentif
}

// if the peer supports the DHT protocol, add it to our RT and kick a refresh if needed
protos, err := dht.peerstore.SupportsProtocols(e.Peer, dht.clientProtocolStrs()...)
protos, err := dht.peerstore.SupportsProtocols(e.Peer, dht.protocolStrs()...)
if err != nil {
logger.Errorf("could not check peerstore for protocol support: err: %s", err)
return
Expand All @@ -122,7 +122,7 @@ func handlePeerIdentificationCompletedEvent(dht *IpfsDHT, e event.EvtPeerIdentif
}

func handlePeerProtocolsUpdatedEvent(dht *IpfsDHT, e event.EvtPeerProtocolsUpdated) {
protos, err := dht.peerstore.SupportsProtocols(e.Peer, dht.clientProtocolStrs()...)
protos, err := dht.peerstore.SupportsProtocols(e.Peer, dht.protocolStrs()...)
if err != nil {
logger.Errorf("could not check peerstore for protocol support: err: %s", err)
return
Expand Down Expand Up @@ -167,7 +167,7 @@ func fixLowPeers(dht *IpfsDHT) {
// Passively add peers we already know about
for _, p := range dht.host.Network().Peers() {
// Don't bother probing, we do that on connect.
protos, err := dht.peerstore.SupportsProtocols(p, dht.clientProtocolStrs()...)
protos, err := dht.peerstore.SupportsProtocols(p, dht.protocolStrs()...)
if err == nil && len(protos) != 0 {
dht.Update(dht.Context(), p)
}
Expand Down

0 comments on commit 0f3efa4

Please sign in to comment.