Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

coreapi: DHT API #4804

Merged
merged 13 commits into from
Sep 11, 2018
5 changes: 5 additions & 0 deletions core/coreapi/coreapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,8 @@ func (api *CoreAPI) Object() coreiface.ObjectAPI {
func (api *CoreAPI) Pin() coreiface.PinAPI {
return (*PinAPI)(api)
}

// Dht returns the DhtAPI interface implementation backed by the go-ipfs node
func (api *CoreAPI) Dht() coreiface.DhtAPI {
return (*DhtAPI)(api)
}
132 changes: 132 additions & 0 deletions core/coreapi/dht.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package coreapi

import (
"context"
"errors"
"fmt"

coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options"

dag "gx/ipfs/QmNr4E8z9bGTztvHJktp7uQaMdx9p3r9Asrq6eYk7iCh4a/go-merkledag"
offline "gx/ipfs/QmPuLWvxK1vg6ckKUpT53Dow9VLCcQGdL5Trwxa8PTLp7r/go-ipfs-exchange-offline"
cidutil "gx/ipfs/QmPyxJ2QS7L5FhGkNYkNcXHGjDhvGHueJ4auqAstFHYxy5/go-cidutil"
blockservice "gx/ipfs/QmQLG22wSEStiociTSKQpZAuuaaWoF1B3iKyjPFvWiTQ77/go-blockservice"
peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
routing "gx/ipfs/QmY9JUvS8kbgao3XbPh6WAV3ChE2nxGKhcGTHiwMC4gmcU/go-libp2p-routing"
cid "gx/ipfs/QmZFbDTY9jfSBms2MchvYM9oYRbAF19K7Pby47yDBfpPrb/go-cid"
pstore "gx/ipfs/Qmda4cPRvSRyox3SqgJN6DfSZGU5TtHufPTp9uXjFj71X6/go-libp2p-peerstore"
blockstore "gx/ipfs/Qmeg56ecxRnVv7VWViMrDeEMoBHaNFMs4vQnyQrJ79Zz7i/go-ipfs-blockstore"
)

type DhtAPI CoreAPI

func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (pstore.PeerInfo, error) {
pi, err := api.node.Routing.FindPeer(ctx, peer.ID(p))
if err != nil {
return pstore.PeerInfo{}, err
}

return pi, nil
}

func (api *DhtAPI) FindProviders(ctx context.Context, p coreiface.Path, opts ...caopts.DhtFindProvidersOption) (<-chan pstore.PeerInfo, error) {
settings, err := caopts.DhtFindProvidersOptions(opts...)
if err != nil {
return nil, err
}

rp, err := api.core().ResolvePath(ctx, p)
if err != nil {
return nil, err
}

numProviders := settings.NumProviders
if numProviders < 1 {
return nil, fmt.Errorf("number of providers must be greater than 0")
}

pchan := api.node.Routing.FindProvidersAsync(ctx, rp.Cid(), numProviders)
return pchan, nil
}

func (api *DhtAPI) Provide(ctx context.Context, path coreiface.Path, opts ...caopts.DhtProvideOption) error {
settings, err := caopts.DhtProvideOptions(opts...)
if err != nil {
return err
}

if api.node.Routing == nil {
return errors.New("cannot provide in offline mode")
}

rp, err := api.core().ResolvePath(ctx, path)
if err != nil {
return err
}

c := rp.Cid()

has, err := api.node.Blockstore.Has(c)
if err != nil {
return err
}

if !has {
return fmt.Errorf("block %s not found locally, cannot provide", c)
}

if settings.Recursive {
err = provideKeysRec(ctx, api.node.Routing, api.node.Blockstore, []*cid.Cid{c})
} else {
err = provideKeys(ctx, api.node.Routing, []*cid.Cid{c})
}
if err != nil {
return err
}

return nil
}

func provideKeys(ctx context.Context, r routing.IpfsRouting, cids []*cid.Cid) error {
for _, c := range cids {
err := r.Provide(ctx, c, true)
if err != nil {
return err
}
}
return nil
}

func provideKeysRec(ctx context.Context, r routing.IpfsRouting, bs blockstore.Blockstore, cids []*cid.Cid) error {
provided := cidutil.NewStreamingSet()

errCh := make(chan error)
go func() {
dserv := dag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
for _, c := range cids {
err := dag.EnumerateChildrenAsync(ctx, dag.GetLinksDirect(dserv), c, provided.Visitor(ctx))
if err != nil {
errCh <- err
}
}
}()

for {
select {
case k := <-provided.New:
err := r.Provide(ctx, k, true)
if err != nil {
return err
}
case err := <-errCh:
return err
case <-ctx.Done():
return ctx.Err()
}
}
}

func (api *DhtAPI) core() coreiface.CoreAPI {
return (*CoreAPI)(api)
}
109 changes: 109 additions & 0 deletions core/coreapi/dht_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package coreapi_test

import (
"context"
"io"
"io/ioutil"
"testing"

"github.com/ipfs/go-ipfs/core/coreapi/interface"
"github.com/ipfs/go-ipfs/core/coreapi/interface/options"

peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
blocks "gx/ipfs/QmWAzSEoqZ6xU6pu8yL8e5WaMb7wtbfbhhN4p1DknUPtr3/go-block-format"
)

func TestDhtFindPeer(t *testing.T) {
ctx := context.Background()
nds, apis, err := makeAPISwarm(ctx, true, 5)
if err != nil {
t.Fatal(err)
}

pi, err := apis[2].Dht().FindPeer(ctx, peer.ID(nds[0].Identity))
if err != nil {
t.Fatal(err)
}

if pi.Addrs[0].String() != "/ip4/127.0.0.1/tcp/4001" {
t.Errorf("got unexpected address from FindPeer: %s", pi.Addrs[0].String())
}

pi, err = apis[1].Dht().FindPeer(ctx, peer.ID(nds[2].Identity))
if err != nil {
t.Fatal(err)
}

if pi.Addrs[0].String() != "/ip4/127.0.2.1/tcp/4001" {
t.Errorf("got unexpected address from FindPeer: %s", pi.Addrs[0].String())
}
}

func TestDhtFindProviders(t *testing.T) {
ctx := context.Background()
nds, apis, err := makeAPISwarm(ctx, true, 5)
if err != nil {
t.Fatal(err)
}

p, err := addTestObject(ctx, apis[0])
if err != nil {
t.Fatal(err)
}

out, err := apis[2].Dht().FindProviders(ctx, p, options.Dht.NumProviders(1))
if err != nil {
t.Fatal(err)
}

provider := <-out

if provider.ID.String() != nds[0].Identity.String() {
t.Errorf("got wrong provider: %s != %s", provider.ID.String(), nds[0].Identity.String())
}
}

func TestDhtProvide(t *testing.T) {
ctx := context.Background()
nds, apis, err := makeAPISwarm(ctx, true, 5)
if err != nil {
t.Fatal(err)
}

// TODO: replace once there is local add on unixfs or somewhere
data, err := ioutil.ReadAll(&io.LimitedReader{R: rnd, N: 4092})
if err != nil {
t.Fatal(err)
}

b := blocks.NewBlock(data)
nds[0].Blockstore.Put(b)
p := iface.IpfsPath(b.Cid())

out, err := apis[2].Dht().FindProviders(ctx, p, options.Dht.NumProviders(1))
if err != nil {
t.Fatal(err)
}

provider := <-out

if provider.ID.String() != "<peer.ID >" {
t.Errorf("got wrong provider: %s != %s", provider.ID.String(), nds[0].Identity.String())
}

err = apis[0].Dht().Provide(ctx, p)
if err != nil {
t.Fatal(err)
}

out, err = apis[2].Dht().FindProviders(ctx, p, options.Dht.NumProviders(1))
if err != nil {
t.Fatal(err)
}

provider = <-out

if provider.ID.String() != nds[0].Identity.String() {
t.Errorf("got wrong provider: %s != %s", provider.ID.String(), nds[0].Identity.String())
}
}
3 changes: 3 additions & 0 deletions core/coreapi/interface/coreapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ type CoreAPI interface {
// ObjectAPI returns an implementation of Object API
Object() ObjectAPI

// Dht returns an implementation of Dht API
Dht() DhtAPI

// ResolvePath resolves the path using Unixfs resolver
ResolvePath(context.Context, Path) (ResolvedPath, error)

Expand Down
26 changes: 26 additions & 0 deletions core/coreapi/interface/dht.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package iface

import (
"context"

"github.com/ipfs/go-ipfs/core/coreapi/interface/options"

peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
pstore "gx/ipfs/Qmda4cPRvSRyox3SqgJN6DfSZGU5TtHufPTp9uXjFj71X6/go-libp2p-peerstore"
)

// DhtAPI specifies the interface to the DHT
// Note: This API will likely get deprecated in near future, see
// https://github.com/ipfs/interface-ipfs-core/issues/249 for more context.
type DhtAPI interface {
// FindPeer queries the DHT for all of the multiaddresses associated with a
// Peer ID
FindPeer(context.Context, peer.ID) (pstore.PeerInfo, error)

// FindProviders finds peers in the DHT who can provide a specific value
// given a key.
FindProviders(context.Context, Path, ...options.DhtFindProvidersOption) (<-chan pstore.PeerInfo, error)

// Provide announces to the network that you are providing given values
Provide(context.Context, Path, ...options.DhtProvideOption) error
}
62 changes: 62 additions & 0 deletions core/coreapi/interface/options/dht.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package options

type DhtProvideSettings struct {
Recursive bool
}

type DhtFindProvidersSettings struct {
NumProviders int
}

type DhtProvideOption func(*DhtProvideSettings) error
type DhtFindProvidersOption func(*DhtFindProvidersSettings) error

func DhtProvideOptions(opts ...DhtProvideOption) (*DhtProvideSettings, error) {
options := &DhtProvideSettings{
Recursive: false,
}

for _, opt := range opts {
err := opt(options)
if err != nil {
return nil, err
}
}
return options, nil
}

func DhtFindProvidersOptions(opts ...DhtFindProvidersOption) (*DhtFindProvidersSettings, error) {
options := &DhtFindProvidersSettings{
NumProviders: 20,
}

for _, opt := range opts {
err := opt(options)
if err != nil {
return nil, err
}
}
return options, nil
}

type dhtOpts struct{}

var Dht dhtOpts

// Recursive is an option for Dht.Provide which specifies whether to provide
// the given path recursively
func (dhtOpts) Recursive(recursive bool) DhtProvideOption {
return func(settings *DhtProvideSettings) error {
settings.Recursive = recursive
return nil
}
}

// NumProviders is an option for Dht.FindProviders which specifies the
// number of peers to look for. Default is 20
func (dhtOpts) NumProviders(numProviders int) DhtFindProvidersOption {
return func(settings *DhtFindProvidersSettings) error {
settings.NumProviders = numProviders
return nil
}
}
12 changes: 8 additions & 4 deletions core/coreapi/name_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ func addTestObject(ctx context.Context, api coreiface.CoreAPI) (coreiface.Path,

func TestBasicPublishResolve(t *testing.T) {
ctx := context.Background()
n, api, err := makeAPIIdent(ctx, true)
nds, apis, err := makeAPISwarm(ctx, true, 2)
if err != nil {
t.Fatal(err)
return
}
n := nds[0]
api := apis[0]

p, err := addTestObject(ctx, api)
if err != nil {
Expand Down Expand Up @@ -60,11 +62,12 @@ func TestBasicPublishResolve(t *testing.T) {

func TestBasicPublishResolveKey(t *testing.T) {
ctx := context.Background()
_, api, err := makeAPIIdent(ctx, true)
_, apis, err := makeAPISwarm(ctx, true, 2)
if err != nil {
t.Fatal(err)
return
}
api := apis[0]

k, err := api.Key().Generate(ctx, "foo")
if err != nil {
Expand Down Expand Up @@ -107,12 +110,13 @@ func TestBasicPublishResolveTimeout(t *testing.T) {
t.Skip("ValidTime doesn't appear to work at this time resolution")

ctx := context.Background()
n, api, err := makeAPIIdent(ctx, true)
nds, apis, err := makeAPISwarm(ctx, true, 2)
if err != nil {
t.Fatal(err)
return
}

n := nds[0]
api := apis[0]
p, err := addTestObject(ctx, api)
if err != nil {
t.Fatal(err)
Expand Down
Loading