diff --git a/core/commands/dht.go b/core/commands/dht.go index 07136a72379..c481c776a99 100644 --- a/core/commands/dht.go +++ b/core/commands/dht.go @@ -45,6 +45,12 @@ const ( dhtVerboseOptionName = "verbose" ) +// kademlia extends the routing interface with a command to get the peers closest to the target +type kademlia interface { + routing.Routing + GetClosestPeers(ctx context.Context, key string) ([]peer.ID, error) +} + var queryDhtCmd = &cmds.Command{ Helptext: cmds.HelpText{ Tagline: "Find the closest Peer IDs to a given Peer ID by querying the DHT.", @@ -63,7 +69,7 @@ var queryDhtCmd = &cmds.Command{ return err } - if nd.DHT == nil { + if nd.DHTClient == nil { return ErrNotDHT } @@ -73,40 +79,46 @@ var queryDhtCmd = &cmds.Command{ } ctx, cancel := context.WithCancel(req.Context) + defer cancel() ctx, events := routing.RegisterForQueryEvents(ctx) - dht := nd.DHT.WAN - if !nd.DHT.WANActive() { - dht = nd.DHT.LAN + client := nd.DHTClient + if client == nd.DHT { + client = nd.DHT.WAN + if !nd.DHT.WANActive() { + client = nd.DHT.LAN + } } - errCh := make(chan error, 1) - go func() { - defer close(errCh) - defer cancel() - closestPeers, err := dht.GetClosestPeers(ctx, string(id)) - if closestPeers != nil { - for p := range closestPeers { + if d, ok := client.(kademlia); !ok { + return fmt.Errorf("dht client does not support GetClosestPeers") + } else { + errCh := make(chan error, 1) + go func() { + defer close(errCh) + defer cancel() + closestPeers, err := d.GetClosestPeers(ctx, string(id)) + for _, p := range closestPeers { routing.PublishQueryEvent(ctx, &routing.QueryEvent{ ID: p, Type: routing.FinalPeer, }) } - } - if err != nil { - errCh <- err - return - } - }() + if err != nil { + errCh <- err + return + } + }() - for e := range events { - if err := res.Emit(e); err != nil { - return err + for e := range events { + if err := res.Emit(e); err != nil { + return err + } } - } - return <-errCh + return <-errCh + } }, Encoders: cmds.EncoderMap{ cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *routing.QueryEvent) error { diff --git a/core/commands/stat_dht.go b/core/commands/stat_dht.go index a8b5323c507..1dc9a2c3cf9 100644 --- a/core/commands/stat_dht.go +++ b/core/commands/stat_dht.go @@ -12,6 +12,7 @@ import ( "github.com/libp2p/go-libp2p-core/network" pstore "github.com/libp2p/go-libp2p-core/peerstore" dht "github.com/libp2p/go-libp2p-kad-dht" + "github.com/libp2p/go-libp2p-kad-dht/fullrt" kbucket "github.com/libp2p/go-libp2p-kbucket" ) @@ -43,7 +44,8 @@ This interface is not stable and may change from release to release. `, }, Arguments: []cmds.Argument{ - cmds.StringArg("dht", false, true, "The DHT whose table should be listed (wan or lan). Defaults to both."), + cmds.StringArg("dht", false, true, "The DHT whose table should be listed (wanserver, lanserver, wan, lan). "+ + "wan and lan refer to client routing tables. When using the experimental DHT client only WAN is supported. Defaults to wan and lan."), }, Options: []cmds.Option{}, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { @@ -56,6 +58,7 @@ This interface is not stable and may change from release to release. return ErrNotOnline } + // TODO: Replace with a check for DHT client or server if nd.DHT == nil { return ErrNotDHT } @@ -67,12 +70,62 @@ This interface is not stable and may change from release to release. dhts = []string{"wan", "lan"} } + dhttypeloop: for _, name := range dhts { var dht *dht.IpfsDHT + + var separateClient bool + if nd.DHTClient != nd.DHT { + separateClient = true + } + switch name { case "wan": + if separateClient { + client, ok := nd.DHTClient.(*fullrt.FullRT) + if !ok { + return cmds.Errorf(cmds.ErrClient, "could not generate stats for the WAN DHT client type") + } + peerMap := client.Stat() + buckets := make([]dhtBucket, 1) + b := &dhtBucket{} + for _, p := range peerMap { + info := dhtPeerInfo{ID: p.String()} + + if ver, err := nd.Peerstore.Get(p, "AgentVersion"); err == nil { + info.AgentVersion, _ = ver.(string) + } else if err == pstore.ErrNotFound { + // ignore + } else { + // this is a bug, usually. + log.Errorw( + "failed to get agent version from peerstore", + "error", err, + ) + } + + info.Connected = nd.PeerHost.Network().Connectedness(p) == network.Connected + b.Peers = append(b.Peers, info) + } + buckets[0] = *b + + if err := res.Emit(dhtStat{ + Name: name, + Buckets: buckets, + }); err != nil { + return err + } + continue dhttypeloop + } + fallthrough + case "wanserver": dht = nd.DHT.WAN case "lan": + if separateClient { + return cmds.Errorf(cmds.ErrClient, "no LAN client found") + } + fallthrough + case "lanserver": dht = nd.DHT.LAN default: return cmds.Errorf(cmds.ErrClient, "unknown dht type: %s", name) diff --git a/core/core.go b/core/core.go index 75fc95ff7d8..49f1185646b 100644 --- a/core/core.go +++ b/core/core.go @@ -98,8 +98,11 @@ type IpfsNode struct { PubSub *pubsub.PubSub `optional:"true"` PSRouter *psrouter.PubsubValueStore `optional:"true"` - DHT *ddht.DHT `optional:"true"` - P2P *p2p.P2P `optional:"true"` + + DHT *ddht.DHT `optional:"true"` + DHTClient routing.Routing `name:"dhtc" optional:"true"` + + P2P *p2p.P2P `optional:"true"` Process goprocess.Process ctx context.Context diff --git a/core/node/groups.go b/core/node/groups.go index 6863043396c..f55b052c380 100644 --- a/core/node/groups.go +++ b/core/node/groups.go @@ -139,7 +139,7 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option { fx.Provide(libp2p.Security(!bcfg.DisableEncryptedConnections, cfg.Swarm.Transports)), fx.Provide(libp2p.Routing), - fx.Provide(libp2p.BaseRouting), + fx.Provide(libp2p.BaseRouting(cfg.Experimental.AcceleratedDHTClient)), maybeProvide(libp2p.PubsubRouter, bcfg.getOpt("ipnsps")), maybeProvide(libp2p.BandwidthCounter, !cfg.Swarm.DisableBandwidthMetrics), diff --git a/core/node/libp2p/host.go b/core/node/libp2p/host.go index 4005f0a7e74..04682682b48 100644 --- a/core/node/libp2p/host.go +++ b/core/node/libp2p/host.go @@ -34,7 +34,7 @@ type P2PHostOut struct { fx.Out Host host.Host - Routing BaseIpfsRouting + Routing routing.Routing `name:"initialrouting"` } func Host(mctx helpers.MetricsCtx, lc fx.Lifecycle, params P2PHostIn) (out P2PHostOut, err error) { diff --git a/core/node/libp2p/routing.go b/core/node/libp2p/routing.go index 14b8fa40c10..bb2256c4808 100644 --- a/core/node/libp2p/routing.go +++ b/core/node/libp2p/routing.go @@ -7,9 +7,12 @@ import ( "github.com/ipfs/go-ipfs/core/node/helpers" + "github.com/ipfs/go-ipfs/repo" host "github.com/libp2p/go-libp2p-core/host" routing "github.com/libp2p/go-libp2p-core/routing" + dht "github.com/libp2p/go-libp2p-kad-dht" ddht "github.com/libp2p/go-libp2p-kad-dht/dual" + "github.com/libp2p/go-libp2p-kad-dht/fullrt" "github.com/libp2p/go-libp2p-pubsub" namesys "github.com/libp2p/go-libp2p-pubsub-router" record "github.com/libp2p/go-libp2p-record" @@ -32,23 +35,86 @@ type p2pRouterOut struct { Router Router `group:"routers"` } -func BaseRouting(lc fx.Lifecycle, in BaseIpfsRouting) (out p2pRouterOut, dr *ddht.DHT) { - if dht, ok := in.(*ddht.DHT); ok { - dr = dht +type processInitialRoutingIn struct { + fx.In + + Router routing.Routing `name:"initialrouting"` + + // For setting up experimental DHT client + Host host.Host + Repo repo.Repo + Validator record.Validator +} + +type processInitialRoutingOut struct { + fx.Out + + Router Router `group:"routers"` + DHT *ddht.DHT + DHTClient routing.Routing `name:"dhtc"` + BaseRT BaseIpfsRouting +} + +func BaseRouting(experimentalDHTClient bool) interface{} { + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, in processInitialRoutingIn) (out processInitialRoutingOut, err error) { + var dr *ddht.DHT + if dht, ok := in.Router.(*ddht.DHT); ok { + dr = dht - lc.Append(fx.Hook{ - OnStop: func(ctx context.Context) error { - return dr.Close() + lc.Append(fx.Hook{ + OnStop: func(ctx context.Context) error { + return dr.Close() + }, + }) + } + + if dr != nil && experimentalDHTClient { + ctx := helpers.LifecycleCtx(mctx, lc) + cfg, err := in.Repo.Config() + if err != nil { + return out, err + } + bspeers, err := cfg.BootstrapPeers() + if err != nil { + return out, err + } + + expClient, err := fullrt.NewFullRT(ctx, in.Host, + dht.DefaultPrefix, + fullrt.DHTOption( + dht.Validator(in.Validator), + dht.Datastore(in.Repo.Datastore()), + dht.BootstrapPeers(bspeers...), + dht.BucketSize(20), + ), + ) + if err != nil { + return out, err + } + + // TODO: Start/Stop exp client + + return processInitialRoutingOut{ + Router: Router{ + Routing: expClient, + Priority: 1000, + }, + DHT: dr, + DHTClient: expClient, + BaseRT: expClient, + }, nil + } + + return processInitialRoutingOut{ + Router: Router{ + Priority: 1000, + Routing: in.Router, }, - }) + DHT: dr, + DHTClient: dr, + BaseRT: dr, + }, nil } - - return p2pRouterOut{ - Router: Router{ - Priority: 1000, - Routing: in, - }, - }, dr } type p2pOnlineRoutingIn struct { diff --git a/core/node/libp2p/routingopt.go b/core/node/libp2p/routingopt.go index 96bd8be4c26..6d29cb6caf1 100644 --- a/core/node/libp2p/routingopt.go +++ b/core/node/libp2p/routingopt.go @@ -2,7 +2,6 @@ package libp2p import ( "context" - "github.com/ipfs/go-datastore" host "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" diff --git a/go.mod b/go.mod index 42c4ddcf7df..d6f01f2a1c1 100644 --- a/go.mod +++ b/go.mod @@ -29,7 +29,7 @@ require ( github.com/ipfs/go-ipfs-blockstore v0.1.4 github.com/ipfs/go-ipfs-chunker v0.0.5 github.com/ipfs/go-ipfs-cmds v0.6.0 - github.com/ipfs/go-ipfs-config v0.13.0 + github.com/ipfs/go-ipfs-config v0.13.1-0.20210512191347-f32fe6873a21 github.com/ipfs/go-ipfs-exchange-interface v0.0.1 github.com/ipfs/go-ipfs-exchange-offline v0.0.1 github.com/ipfs/go-ipfs-files v0.0.8 @@ -66,7 +66,7 @@ require ( github.com/libp2p/go-libp2p-core v0.8.5 github.com/libp2p/go-libp2p-discovery v0.5.0 github.com/libp2p/go-libp2p-http v0.2.0 - github.com/libp2p/go-libp2p-kad-dht v0.11.1 + github.com/libp2p/go-libp2p-kad-dht v0.11.2-0.20210512190004-57eeffeacd66 github.com/libp2p/go-libp2p-kbucket v0.4.7 github.com/libp2p/go-libp2p-loggables v0.1.0 github.com/libp2p/go-libp2p-mplex v0.4.1 diff --git a/go.sum b/go.sum index e0a01f40b19..c5cb42c2de7 100644 --- a/go.sum +++ b/go.sum @@ -271,8 +271,9 @@ github.com/google/pprof v0.0.0-20200212024743-f11f1df84d12/go.mod h1:ZgVRPoUq/hf github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= +github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go v2.0.0+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY= github.com/googleapis/gax-go/v2 v2.0.3/go.mod h1:LLvjysVCY1JZeum8Z6l8qUty8fiNwE08qbEPm1M08qg= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= @@ -411,8 +412,8 @@ github.com/ipfs/go-ipfs-chunker v0.0.5 h1:ojCf7HV/m+uS2vhUGWcogIIxiO5ubl5O57Q7Na github.com/ipfs/go-ipfs-chunker v0.0.5/go.mod h1:jhgdF8vxRHycr00k13FM8Y0E+6BoalYeobXmUyTreP8= github.com/ipfs/go-ipfs-cmds v0.6.0 h1:yAxdowQZzoFKjcLI08sXVNnqVj3jnABbf9smrPQmBsw= github.com/ipfs/go-ipfs-cmds v0.6.0/go.mod h1:ZgYiWVnCk43ChwoH8hAmI1IRbuVtq3GSTHwtRB/Kqhk= -github.com/ipfs/go-ipfs-config v0.13.0 h1:ZH3dTmkVR9TTFBIbfWnFNC1JdwHbj8F0ryiaIFo7U/o= -github.com/ipfs/go-ipfs-config v0.13.0/go.mod h1:Ei/FLgHGTdPyqCPK0oPCwGTe8VSnsjJjx7HZqUb6Ry0= +github.com/ipfs/go-ipfs-config v0.13.1-0.20210512191347-f32fe6873a21 h1:jeXYiyHMuUUUW/Rj/AW/4PlMujI0os+Mvq/wJ2hmoMo= +github.com/ipfs/go-ipfs-config v0.13.1-0.20210512191347-f32fe6873a21/go.mod h1:Ei/FLgHGTdPyqCPK0oPCwGTe8VSnsjJjx7HZqUb6Ry0= github.com/ipfs/go-ipfs-delay v0.0.0-20181109222059-70721b86a9a8/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= github.com/ipfs/go-ipfs-delay v0.0.1 h1:r/UXYyRcddO6thwOnhiznIAiSvxMECGgtv35Xs1IeRQ= github.com/ipfs/go-ipfs-delay v0.0.1/go.mod h1:8SP1YXK1M1kXuc4KJZINY3TQQ03J2rwBG9QfXmbRPrw= @@ -679,8 +680,10 @@ github.com/libp2p/go-libp2p-interface-connmgr v0.0.1/go.mod h1:GarlRLH0LdeWcLnYM github.com/libp2p/go-libp2p-interface-connmgr v0.0.4/go.mod h1:GarlRLH0LdeWcLnYM/SaBykKFl9U5JFnbBGruAk/D5k= github.com/libp2p/go-libp2p-interface-connmgr v0.0.5/go.mod h1:GarlRLH0LdeWcLnYM/SaBykKFl9U5JFnbBGruAk/D5k= github.com/libp2p/go-libp2p-interface-pnet v0.0.1/go.mod h1:el9jHpQAXK5dnTpKA4yfCNBZXvrzdOU75zz+C6ryp3k= -github.com/libp2p/go-libp2p-kad-dht v0.11.1 h1:FsriVQhOUZpCotWIjyFSjEDNJmUzuMma/RyyTDZanwc= github.com/libp2p/go-libp2p-kad-dht v0.11.1/go.mod h1:5ojtR2acDPqh/jXf5orWy8YGb8bHQDS+qeDcoscL/PI= +github.com/libp2p/go-libp2p-kad-dht v0.11.2-0.20210512190004-57eeffeacd66 h1:bgny9dF7iwddusH2nkqPSbWSehRtbe+AxBox8BTBRrA= +github.com/libp2p/go-libp2p-kad-dht v0.11.2-0.20210512190004-57eeffeacd66/go.mod h1:zdQYru1c7dnluMpZls4i9Fj2TwYXS7YyDkJ1Yahv0w0= +github.com/libp2p/go-libp2p-kbucket v0.3.1/go.mod h1:oyjT5O7tS9CQurok++ERgc46YLwEpuGoFq9ubvoUOio= github.com/libp2p/go-libp2p-kbucket v0.4.7 h1:spZAcgxifvFZHBD8tErvppbnNiKA5uokDu3CV7axu70= github.com/libp2p/go-libp2p-kbucket v0.4.7/go.mod h1:XyVo99AfQH0foSf176k4jY1xUJ2+jUJIZCSDm7r2YKk= github.com/libp2p/go-libp2p-loggables v0.0.1/go.mod h1:lDipDlBNYbpyqyPX/KcoO+eq0sJYEVR2JgOexcivchg= @@ -779,6 +782,8 @@ github.com/libp2p/go-libp2p-transport-upgrader v0.3.0/go.mod h1:i+SKzbRnvXdVbU3D github.com/libp2p/go-libp2p-transport-upgrader v0.4.0/go.mod h1:J4ko0ObtZSmgn5BX5AmegP+dK3CSnU2lMCKsSq/EY0s= github.com/libp2p/go-libp2p-transport-upgrader v0.4.2 h1:4JsnbfJzgZeRS9AWN7B9dPqn/LY/HoQTlO9gtdJTIYM= github.com/libp2p/go-libp2p-transport-upgrader v0.4.2/go.mod h1:NR8ne1VwfreD5VIWIU62Agt/J18ekORFU/j1i2y8zvk= +github.com/libp2p/go-libp2p-xor v0.0.0-20200501025846-71e284145d58 h1:GcTNu27BMpOTtMnQqun03+kbtHA1qTxJ/J8cZRRYu2k= +github.com/libp2p/go-libp2p-xor v0.0.0-20200501025846-71e284145d58/go.mod h1:AYjOiqJIdcmI4SXE2ouKQuFrUbE5myv8txWaB2pl4TI= github.com/libp2p/go-libp2p-yamux v0.1.2/go.mod h1:xUoV/RmYkg6BW/qGxA9XJyg+HzXFYkeXbnhjmnYzKp8= github.com/libp2p/go-libp2p-yamux v0.1.3/go.mod h1:VGSQVrqkh6y4nm0189qqxMtvyBft44MOYYPpYKXiVt4= github.com/libp2p/go-libp2p-yamux v0.2.0/go.mod h1:Db2gU+XfLpm6E4rG5uGCFX6uXA8MEXOxFcRoXUODaK8= @@ -1201,6 +1206,7 @@ github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtX github.com/urfave/cli/v2 v2.0.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ= github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU= github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM= +github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30/go.mod h1:YkocrP2K2tcw938x9gCOmT5G5eCD6jsTz0SZuyAqwIE= github.com/warpfork/go-wish v0.0.0-20180510122957-5ad1f5abf436/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= github.com/warpfork/go-wish v0.0.0-20190328234359-8b3e70f8e830/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= github.com/warpfork/go-wish v0.0.0-20200122115046-b9ea61034e4a h1:G++j5e0OC488te356JvdhaM8YS6nMsjLAYF7JxCv07w= @@ -1305,6 +1311,7 @@ golang.org/x/crypto v0.0.0-20200602180216-279210d13fed/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= +golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210506145944-38f3c27a63bf h1:B2n+Zi5QeYRDAEodEu72OS36gmTWjgpXr2+cWcBW90o= golang.org/x/crypto v0.0.0-20210506145944-38f3c27a63bf/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=