Skip to content

Commit

Permalink
integrate experimental AcceleratedDHTClient
Browse files Browse the repository at this point in the history
The experimental AcceleratedDHTClient can be enabled from the config

When enabled it modifies the output of the `ipfs stats dht` command.
  • Loading branch information
aschmahmann committed May 13, 2021
1 parent c32c757 commit 476444c
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 48 deletions.
56 changes: 34 additions & 22 deletions core/commands/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ const (
dhtVerboseOptionName = "verbose"
)

// kademlia extends the routing interface with a command to get the peers closest to the target
type kademlia interface {
routing.Routing
GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error)
}

var queryDhtCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Find the closest Peer IDs to a given Peer ID by querying the DHT.",
Expand All @@ -63,7 +69,7 @@ var queryDhtCmd = &cmds.Command{
return err
}

if nd.DHT == nil {
if nd.DHTClient == nil {
return ErrNotDHT
}

Expand All @@ -73,40 +79,46 @@ var queryDhtCmd = &cmds.Command{
}

ctx, cancel := context.WithCancel(req.Context)
defer cancel()
ctx, events := routing.RegisterForQueryEvents(ctx)

dht := nd.DHT.WAN
if !nd.DHT.WANActive() {
dht = nd.DHT.LAN
client := nd.DHTClient
if client == nd.DHT {
client = nd.DHT.WAN
if !nd.DHT.WANActive() {
client = nd.DHT.LAN
}
}

errCh := make(chan error, 1)
go func() {
defer close(errCh)
defer cancel()
closestPeers, err := dht.GetClosestPeers(ctx, string(id))
if closestPeers != nil {
for p := range closestPeers {
if d, ok := client.(kademlia); !ok {
return fmt.Errorf("dht client does not support GetClosestPeers")
} else {
errCh := make(chan error, 1)
go func() {
defer close(errCh)
defer cancel()
closestPeers, err := d.GetClosestPeers(ctx, string(id))
for _, p := range closestPeers {
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
ID: p,
Type: routing.FinalPeer,
})
}
}

if err != nil {
errCh <- err
return
}
}()
if err != nil {
errCh <- err
return
}
}()

for e := range events {
if err := res.Emit(e); err != nil {
return err
for e := range events {
if err := res.Emit(e); err != nil {
return err
}
}
}

return <-errCh
return <-errCh
}
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *routing.QueryEvent) error {
Expand Down
54 changes: 53 additions & 1 deletion core/commands/stat_dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/libp2p/go-libp2p-core/network"
pstore "github.com/libp2p/go-libp2p-core/peerstore"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p-kad-dht/fullrt"
kbucket "github.com/libp2p/go-libp2p-kbucket"
)

Expand Down Expand Up @@ -43,7 +44,8 @@ This interface is not stable and may change from release to release.
`,
},
Arguments: []cmds.Argument{
cmds.StringArg("dht", false, true, "The DHT whose table should be listed (wan or lan). Defaults to both."),
cmds.StringArg("dht", false, true, "The DHT whose table should be listed (wanserver, lanserver, wan, lan). "+
"wan and lan refer to client routing tables. When using the experimental DHT client only WAN is supported. Defaults to wan and lan."),
},
Options: []cmds.Option{},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
Expand All @@ -67,12 +69,62 @@ This interface is not stable and may change from release to release.
dhts = []string{"wan", "lan"}
}

dhttypeloop:
for _, name := range dhts {
var dht *dht.IpfsDHT

var separateClient bool
if nd.DHTClient != nd.DHT {
separateClient = true
}

switch name {
case "wan":
if separateClient {
client, ok := nd.DHTClient.(*fullrt.FullRT)
if !ok {
return cmds.Errorf(cmds.ErrClient, "could not generate stats for the WAN DHT client type")
}
peerMap := client.Stat()
buckets := make([]dhtBucket, 1)
b := &dhtBucket{}
for _, p := range peerMap {
info := dhtPeerInfo{ID: p.String()}

if ver, err := nd.Peerstore.Get(p, "AgentVersion"); err == nil {
info.AgentVersion, _ = ver.(string)
} else if err == pstore.ErrNotFound {
// ignore
} else {
// this is a bug, usually.
log.Errorw(
"failed to get agent version from peerstore",
"error", err,
)
}

info.Connected = nd.PeerHost.Network().Connectedness(p) == network.Connected
b.Peers = append(b.Peers, info)
}
buckets[0] = *b

if err := res.Emit(dhtStat{
Name: name,
Buckets: buckets,
}); err != nil {
return err
}
continue dhttypeloop
}
fallthrough
case "wanserver":
dht = nd.DHT.WAN
case "lan":
if separateClient {
return cmds.Errorf(cmds.ErrClient, "no LAN client found")
}
fallthrough
case "lanserver":
dht = nd.DHT.LAN
default:
return cmds.Errorf(cmds.ErrClient, "unknown dht type: %s", name)
Expand Down
7 changes: 5 additions & 2 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,11 @@ type IpfsNode struct {

PubSub *pubsub.PubSub `optional:"true"`
PSRouter *psrouter.PubsubValueStore `optional:"true"`
DHT *ddht.DHT `optional:"true"`
P2P *p2p.P2P `optional:"true"`

DHT *ddht.DHT `optional:"true"`
DHTClient routing.Routing `name:"dhtc" optional:"true"`

P2P *p2p.P2P `optional:"true"`

Process goprocess.Process
ctx context.Context
Expand Down
2 changes: 1 addition & 1 deletion core/node/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option {
fx.Provide(libp2p.Security(!bcfg.DisableEncryptedConnections, cfg.Swarm.Transports)),

fx.Provide(libp2p.Routing),
fx.Provide(libp2p.BaseRouting),
fx.Provide(libp2p.BaseRouting(cfg.Experimental.AcceleratedDHTClient)),
maybeProvide(libp2p.PubsubRouter, bcfg.getOpt("ipnsps")),

maybeProvide(libp2p.BandwidthCounter, !cfg.Swarm.DisableBandwidthMetrics),
Expand Down
2 changes: 1 addition & 1 deletion core/node/libp2p/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type P2PHostOut struct {
fx.Out

Host host.Host
Routing BaseIpfsRouting
Routing routing.Routing `name:"initialrouting"`
}

func Host(mctx helpers.MetricsCtx, lc fx.Lifecycle, params P2PHostIn) (out P2PHostOut, err error) {
Expand Down
94 changes: 80 additions & 14 deletions core/node/libp2p/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ import (

"github.com/ipfs/go-ipfs/core/node/helpers"

"github.com/ipfs/go-ipfs/repo"
host "github.com/libp2p/go-libp2p-core/host"
routing "github.com/libp2p/go-libp2p-core/routing"
dht "github.com/libp2p/go-libp2p-kad-dht"
ddht "github.com/libp2p/go-libp2p-kad-dht/dual"
"github.com/libp2p/go-libp2p-kad-dht/fullrt"
"github.com/libp2p/go-libp2p-pubsub"
namesys "github.com/libp2p/go-libp2p-pubsub-router"
record "github.com/libp2p/go-libp2p-record"
Expand All @@ -32,23 +35,86 @@ type p2pRouterOut struct {
Router Router `group:"routers"`
}

func BaseRouting(lc fx.Lifecycle, in BaseIpfsRouting) (out p2pRouterOut, dr *ddht.DHT) {
if dht, ok := in.(*ddht.DHT); ok {
dr = dht
type processInitialRoutingIn struct {
fx.In

Router routing.Routing `name:"initialrouting"`

// For setting up experimental DHT client
Host host.Host
Repo repo.Repo
Validator record.Validator
}

type processInitialRoutingOut struct {
fx.Out

Router Router `group:"routers"`
DHT *ddht.DHT
DHTClient routing.Routing `name:"dhtc"`
BaseRT BaseIpfsRouting
}

func BaseRouting(experimentalDHTClient bool) interface{} {
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, in processInitialRoutingIn) (out processInitialRoutingOut, err error) {
var dr *ddht.DHT
if dht, ok := in.Router.(*ddht.DHT); ok {
dr = dht

lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return dr.Close()
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return dr.Close()
},
})
}

if dr != nil && experimentalDHTClient {
ctx := helpers.LifecycleCtx(mctx, lc)
cfg, err := in.Repo.Config()
if err != nil {
return out, err
}
bspeers, err := cfg.BootstrapPeers()
if err != nil {
return out, err
}

expClient, err := fullrt.NewFullRT(ctx, in.Host,
dht.DefaultPrefix,
fullrt.DHTOption(
dht.Validator(in.Validator),
dht.Datastore(in.Repo.Datastore()),
dht.BootstrapPeers(bspeers...),
dht.BucketSize(20),
),
)
if err != nil {
return out, err
}

// TODO: Start/Stop exp client

return processInitialRoutingOut{
Router: Router{
Routing: expClient,
Priority: 1000,
},
DHT: dr,
DHTClient: expClient,
BaseRT: expClient,
}, nil
}

return processInitialRoutingOut{
Router: Router{
Priority: 1000,
Routing: in.Router,
},
})
DHT: dr,
DHTClient: dr,
BaseRT: dr,
}, nil
}

return p2pRouterOut{
Router: Router{
Priority: 1000,
Routing: in,
},
}, dr
}

type p2pOnlineRoutingIn struct {
Expand Down
1 change: 0 additions & 1 deletion core/node/libp2p/routingopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package libp2p

import (
"context"

"github.com/ipfs/go-datastore"
host "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ require (
github.com/ipfs/go-ipfs-blockstore v0.1.4
github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-cmds v0.6.0
github.com/ipfs/go-ipfs-config v0.13.0
github.com/ipfs/go-ipfs-config v0.13.1-0.20210512191347-f32fe6873a21
github.com/ipfs/go-ipfs-exchange-interface v0.0.1
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
github.com/ipfs/go-ipfs-files v0.0.8
Expand Down Expand Up @@ -66,7 +66,7 @@ require (
github.com/libp2p/go-libp2p-core v0.8.5
github.com/libp2p/go-libp2p-discovery v0.5.0
github.com/libp2p/go-libp2p-http v0.2.0
github.com/libp2p/go-libp2p-kad-dht v0.11.1
github.com/libp2p/go-libp2p-kad-dht v0.11.2-0.20210512190004-57eeffeacd66
github.com/libp2p/go-libp2p-kbucket v0.4.7
github.com/libp2p/go-libp2p-loggables v0.1.0
github.com/libp2p/go-libp2p-mplex v0.4.1
Expand Down
Loading

0 comments on commit 476444c

Please sign in to comment.