diff --git a/core/coreapi/coreapi.go b/core/coreapi/coreapi.go index e9ec439518a..bb7afd61a51 100644 --- a/core/coreapi/coreapi.go +++ b/core/coreapi/coreapi.go @@ -62,3 +62,8 @@ func (api *CoreAPI) Object() coreiface.ObjectAPI { func (api *CoreAPI) Pin() coreiface.PinAPI { return (*PinAPI)(api) } + +// Dht returns the DhtAPI interface implementation backed by the go-ipfs node +func (api *CoreAPI) Dht() coreiface.DhtAPI { + return (*DhtAPI)(api) +} diff --git a/core/coreapi/dht.go b/core/coreapi/dht.go new file mode 100644 index 00000000000..0585fdb860b --- /dev/null +++ b/core/coreapi/dht.go @@ -0,0 +1,132 @@ +package coreapi + +import ( + "context" + "errors" + "fmt" + + coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" + caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options" + + dag "gx/ipfs/QmNr4E8z9bGTztvHJktp7uQaMdx9p3r9Asrq6eYk7iCh4a/go-merkledag" + offline "gx/ipfs/QmPuLWvxK1vg6ckKUpT53Dow9VLCcQGdL5Trwxa8PTLp7r/go-ipfs-exchange-offline" + cidutil "gx/ipfs/QmPyxJ2QS7L5FhGkNYkNcXHGjDhvGHueJ4auqAstFHYxy5/go-cidutil" + blockservice "gx/ipfs/QmQLG22wSEStiociTSKQpZAuuaaWoF1B3iKyjPFvWiTQ77/go-blockservice" + peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer" + routing "gx/ipfs/QmY9JUvS8kbgao3XbPh6WAV3ChE2nxGKhcGTHiwMC4gmcU/go-libp2p-routing" + cid "gx/ipfs/QmZFbDTY9jfSBms2MchvYM9oYRbAF19K7Pby47yDBfpPrb/go-cid" + pstore "gx/ipfs/Qmda4cPRvSRyox3SqgJN6DfSZGU5TtHufPTp9uXjFj71X6/go-libp2p-peerstore" + blockstore "gx/ipfs/Qmeg56ecxRnVv7VWViMrDeEMoBHaNFMs4vQnyQrJ79Zz7i/go-ipfs-blockstore" +) + +type DhtAPI CoreAPI + +func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (pstore.PeerInfo, error) { + pi, err := api.node.Routing.FindPeer(ctx, peer.ID(p)) + if err != nil { + return pstore.PeerInfo{}, err + } + + return pi, nil +} + +func (api *DhtAPI) FindProviders(ctx context.Context, p coreiface.Path, opts ...caopts.DhtFindProvidersOption) (<-chan pstore.PeerInfo, error) { + settings, err := caopts.DhtFindProvidersOptions(opts...) + if err != nil { + return nil, err + } + + rp, err := api.core().ResolvePath(ctx, p) + if err != nil { + return nil, err + } + + numProviders := settings.NumProviders + if numProviders < 1 { + return nil, fmt.Errorf("number of providers must be greater than 0") + } + + pchan := api.node.Routing.FindProvidersAsync(ctx, rp.Cid(), numProviders) + return pchan, nil +} + +func (api *DhtAPI) Provide(ctx context.Context, path coreiface.Path, opts ...caopts.DhtProvideOption) error { + settings, err := caopts.DhtProvideOptions(opts...) + if err != nil { + return err + } + + if api.node.Routing == nil { + return errors.New("cannot provide in offline mode") + } + + rp, err := api.core().ResolvePath(ctx, path) + if err != nil { + return err + } + + c := rp.Cid() + + has, err := api.node.Blockstore.Has(c) + if err != nil { + return err + } + + if !has { + return fmt.Errorf("block %s not found locally, cannot provide", c) + } + + if settings.Recursive { + err = provideKeysRec(ctx, api.node.Routing, api.node.Blockstore, []*cid.Cid{c}) + } else { + err = provideKeys(ctx, api.node.Routing, []*cid.Cid{c}) + } + if err != nil { + return err + } + + return nil +} + +func provideKeys(ctx context.Context, r routing.IpfsRouting, cids []*cid.Cid) error { + for _, c := range cids { + err := r.Provide(ctx, c, true) + if err != nil { + return err + } + } + return nil +} + +func provideKeysRec(ctx context.Context, r routing.IpfsRouting, bs blockstore.Blockstore, cids []*cid.Cid) error { + provided := cidutil.NewStreamingSet() + + errCh := make(chan error) + go func() { + dserv := dag.NewDAGService(blockservice.New(bs, offline.Exchange(bs))) + for _, c := range cids { + err := dag.EnumerateChildrenAsync(ctx, dag.GetLinksDirect(dserv), c, provided.Visitor(ctx)) + if err != nil { + errCh <- err + } + } + }() + + for { + select { + case k := <-provided.New: + err := r.Provide(ctx, k, true) + if err != nil { + return err + } + case err := <-errCh: + return err + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (api *DhtAPI) core() coreiface.CoreAPI { + return (*CoreAPI)(api) +} diff --git a/core/coreapi/dht_test.go b/core/coreapi/dht_test.go new file mode 100644 index 00000000000..759835b567f --- /dev/null +++ b/core/coreapi/dht_test.go @@ -0,0 +1,109 @@ +package coreapi_test + +import ( + "context" + "io" + "io/ioutil" + "testing" + + "github.com/ipfs/go-ipfs/core/coreapi/interface" + "github.com/ipfs/go-ipfs/core/coreapi/interface/options" + + peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer" + blocks "gx/ipfs/QmWAzSEoqZ6xU6pu8yL8e5WaMb7wtbfbhhN4p1DknUPtr3/go-block-format" +) + +func TestDhtFindPeer(t *testing.T) { + ctx := context.Background() + nds, apis, err := makeAPISwarm(ctx, true, 5) + if err != nil { + t.Fatal(err) + } + + pi, err := apis[2].Dht().FindPeer(ctx, peer.ID(nds[0].Identity)) + if err != nil { + t.Fatal(err) + } + + if pi.Addrs[0].String() != "/ip4/127.0.0.1/tcp/4001" { + t.Errorf("got unexpected address from FindPeer: %s", pi.Addrs[0].String()) + } + + pi, err = apis[1].Dht().FindPeer(ctx, peer.ID(nds[2].Identity)) + if err != nil { + t.Fatal(err) + } + + if pi.Addrs[0].String() != "/ip4/127.0.2.1/tcp/4001" { + t.Errorf("got unexpected address from FindPeer: %s", pi.Addrs[0].String()) + } +} + +func TestDhtFindProviders(t *testing.T) { + ctx := context.Background() + nds, apis, err := makeAPISwarm(ctx, true, 5) + if err != nil { + t.Fatal(err) + } + + p, err := addTestObject(ctx, apis[0]) + if err != nil { + t.Fatal(err) + } + + out, err := apis[2].Dht().FindProviders(ctx, p, options.Dht.NumProviders(1)) + if err != nil { + t.Fatal(err) + } + + provider := <-out + + if provider.ID.String() != nds[0].Identity.String() { + t.Errorf("got wrong provider: %s != %s", provider.ID.String(), nds[0].Identity.String()) + } +} + +func TestDhtProvide(t *testing.T) { + ctx := context.Background() + nds, apis, err := makeAPISwarm(ctx, true, 5) + if err != nil { + t.Fatal(err) + } + + // TODO: replace once there is local add on unixfs or somewhere + data, err := ioutil.ReadAll(&io.LimitedReader{R: rnd, N: 4092}) + if err != nil { + t.Fatal(err) + } + + b := blocks.NewBlock(data) + nds[0].Blockstore.Put(b) + p := iface.IpfsPath(b.Cid()) + + out, err := apis[2].Dht().FindProviders(ctx, p, options.Dht.NumProviders(1)) + if err != nil { + t.Fatal(err) + } + + provider := <-out + + if provider.ID.String() != "" { + t.Errorf("got wrong provider: %s != %s", provider.ID.String(), nds[0].Identity.String()) + } + + err = apis[0].Dht().Provide(ctx, p) + if err != nil { + t.Fatal(err) + } + + out, err = apis[2].Dht().FindProviders(ctx, p, options.Dht.NumProviders(1)) + if err != nil { + t.Fatal(err) + } + + provider = <-out + + if provider.ID.String() != nds[0].Identity.String() { + t.Errorf("got wrong provider: %s != %s", provider.ID.String(), nds[0].Identity.String()) + } +} diff --git a/core/coreapi/interface/coreapi.go b/core/coreapi/interface/coreapi.go index 696eefbaf14..9811b75be85 100644 --- a/core/coreapi/interface/coreapi.go +++ b/core/coreapi/interface/coreapi.go @@ -31,6 +31,9 @@ type CoreAPI interface { // ObjectAPI returns an implementation of Object API Object() ObjectAPI + // Dht returns an implementation of Dht API + Dht() DhtAPI + // ResolvePath resolves the path using Unixfs resolver ResolvePath(context.Context, Path) (ResolvedPath, error) diff --git a/core/coreapi/interface/dht.go b/core/coreapi/interface/dht.go new file mode 100644 index 00000000000..7b8119e4494 --- /dev/null +++ b/core/coreapi/interface/dht.go @@ -0,0 +1,26 @@ +package iface + +import ( + "context" + + "github.com/ipfs/go-ipfs/core/coreapi/interface/options" + + peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer" + pstore "gx/ipfs/Qmda4cPRvSRyox3SqgJN6DfSZGU5TtHufPTp9uXjFj71X6/go-libp2p-peerstore" +) + +// DhtAPI specifies the interface to the DHT +// Note: This API will likely get deprecated in near future, see +// https://github.com/ipfs/interface-ipfs-core/issues/249 for more context. +type DhtAPI interface { + // FindPeer queries the DHT for all of the multiaddresses associated with a + // Peer ID + FindPeer(context.Context, peer.ID) (pstore.PeerInfo, error) + + // FindProviders finds peers in the DHT who can provide a specific value + // given a key. + FindProviders(context.Context, Path, ...options.DhtFindProvidersOption) (<-chan pstore.PeerInfo, error) + + // Provide announces to the network that you are providing given values + Provide(context.Context, Path, ...options.DhtProvideOption) error +} diff --git a/core/coreapi/interface/options/dht.go b/core/coreapi/interface/options/dht.go new file mode 100644 index 00000000000..e13e1602006 --- /dev/null +++ b/core/coreapi/interface/options/dht.go @@ -0,0 +1,62 @@ +package options + +type DhtProvideSettings struct { + Recursive bool +} + +type DhtFindProvidersSettings struct { + NumProviders int +} + +type DhtProvideOption func(*DhtProvideSettings) error +type DhtFindProvidersOption func(*DhtFindProvidersSettings) error + +func DhtProvideOptions(opts ...DhtProvideOption) (*DhtProvideSettings, error) { + options := &DhtProvideSettings{ + Recursive: false, + } + + for _, opt := range opts { + err := opt(options) + if err != nil { + return nil, err + } + } + return options, nil +} + +func DhtFindProvidersOptions(opts ...DhtFindProvidersOption) (*DhtFindProvidersSettings, error) { + options := &DhtFindProvidersSettings{ + NumProviders: 20, + } + + for _, opt := range opts { + err := opt(options) + if err != nil { + return nil, err + } + } + return options, nil +} + +type dhtOpts struct{} + +var Dht dhtOpts + +// Recursive is an option for Dht.Provide which specifies whether to provide +// the given path recursively +func (dhtOpts) Recursive(recursive bool) DhtProvideOption { + return func(settings *DhtProvideSettings) error { + settings.Recursive = recursive + return nil + } +} + +// NumProviders is an option for Dht.FindProviders which specifies the +// number of peers to look for. Default is 20 +func (dhtOpts) NumProviders(numProviders int) DhtFindProvidersOption { + return func(settings *DhtFindProvidersSettings) error { + settings.NumProviders = numProviders + return nil + } +} diff --git a/core/coreapi/name_test.go b/core/coreapi/name_test.go index 076a06b4311..c0460b1571c 100644 --- a/core/coreapi/name_test.go +++ b/core/coreapi/name_test.go @@ -21,11 +21,13 @@ func addTestObject(ctx context.Context, api coreiface.CoreAPI) (coreiface.Path, func TestBasicPublishResolve(t *testing.T) { ctx := context.Background() - n, api, err := makeAPIIdent(ctx, true) + nds, apis, err := makeAPISwarm(ctx, true, 2) if err != nil { t.Fatal(err) return } + n := nds[0] + api := apis[0] p, err := addTestObject(ctx, api) if err != nil { @@ -60,11 +62,12 @@ func TestBasicPublishResolve(t *testing.T) { func TestBasicPublishResolveKey(t *testing.T) { ctx := context.Background() - _, api, err := makeAPIIdent(ctx, true) + _, apis, err := makeAPISwarm(ctx, true, 2) if err != nil { t.Fatal(err) return } + api := apis[0] k, err := api.Key().Generate(ctx, "foo") if err != nil { @@ -107,12 +110,13 @@ func TestBasicPublishResolveTimeout(t *testing.T) { t.Skip("ValidTime doesn't appear to work at this time resolution") ctx := context.Background() - n, api, err := makeAPIIdent(ctx, true) + nds, apis, err := makeAPISwarm(ctx, true, 2) if err != nil { t.Fatal(err) return } - + n := nds[0] + api := apis[0] p, err := addTestObject(ctx, api) if err != nil { t.Fatal(err) diff --git a/core/coreapi/unixfs_test.go b/core/coreapi/unixfs_test.go index 9d703566856..ccee30f1349 100644 --- a/core/coreapi/unixfs_test.go +++ b/core/coreapi/unixfs_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/base64" + "fmt" "io" "math" "strings" @@ -14,6 +15,7 @@ import ( coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" options "github.com/ipfs/go-ipfs/core/coreapi/interface/options" coreunix "github.com/ipfs/go-ipfs/core/coreunix" + mock "github.com/ipfs/go-ipfs/core/mock" keystore "github.com/ipfs/go-ipfs/keystore" repo "github.com/ipfs/go-ipfs/repo" @@ -22,8 +24,10 @@ import ( peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer" datastore "gx/ipfs/QmSpg1CvpXQQow5ernt1gNBXaXV6yxyNqi7XoeerWfzB5w/go-datastore" syncds "gx/ipfs/QmSpg1CvpXQQow5ernt1gNBXaXV6yxyNqi7XoeerWfzB5w/go-datastore/sync" + mocknet "gx/ipfs/QmUEqyXr97aUbNmQADHYNknjwjjdVpJXEt1UZXmSG81EV4/go-libp2p/p2p/net/mock" unixfs "gx/ipfs/QmWAfTyD6KEBm7bzqNRBPvqKrZCDtn5PGbs9V1DKfnVK59/go-unixfs" config "gx/ipfs/QmYVqYJTVjetcf1guieEgWpK1PZtHPytP624vKzTF1P3r2/go-ipfs-config" + pstore "gx/ipfs/Qmda4cPRvSRyox3SqgJN6DfSZGU5TtHufPTp9uXjFj71X6/go-libp2p-peerstore" cbor "gx/ipfs/QmepvyyduWnXHm1G7ybmGbJfQQHTAo36DjP2nvF7H7ZXjE/go-ipld-cbor" ) @@ -36,51 +40,89 @@ var helloStr = "hello, world!" // `echo -n | ipfs add` var emptyFile = "/ipfs/QmbFMke1KXqnYyBBWxB74N4c5SBnJMVAiMNRcGu6x1AwQH" -func makeAPIIdent(ctx context.Context, fullIdentity bool) (*core.IpfsNode, coreiface.CoreAPI, error) { - var ident config.Identity - if fullIdentity { - sk, pk, err := ci.GenerateKeyPair(ci.RSA, 512) - if err != nil { - return nil, nil, err +func makeAPISwarm(ctx context.Context, fullIdentity bool, n int) ([]*core.IpfsNode, []coreiface.CoreAPI, error) { + mn := mocknet.New(ctx) + + nodes := make([]*core.IpfsNode, n) + apis := make([]coreiface.CoreAPI, n) + + for i := 0; i < n; i++ { + var ident config.Identity + if fullIdentity { + sk, pk, err := ci.GenerateKeyPair(ci.RSA, 512) + if err != nil { + return nil, nil, err + } + + id, err := peer.IDFromPublicKey(pk) + if err != nil { + return nil, nil, err + } + + kbytes, err := sk.Bytes() + if err != nil { + return nil, nil, err + } + + ident = config.Identity{ + PeerID: id.Pretty(), + PrivKey: base64.StdEncoding.EncodeToString(kbytes), + } + } else { + ident = config.Identity{ + PeerID: testPeerID, + } } - id, err := peer.IDFromPublicKey(pk) - if err != nil { - return nil, nil, err + c := config.Config{} + c.Addresses.Swarm = []string{fmt.Sprintf("/ip4/127.0.%d.1/tcp/4001", i)} + c.Identity = ident + + r := &repo.Mock{ + C: c, + D: syncds.MutexWrap(datastore.NewMapDatastore()), + K: keystore.NewMemKeystore(), } - kbytes, err := sk.Bytes() + node, err := core.NewNode(ctx, &core.BuildCfg{ + Repo: r, + Host: mock.MockHostOption(mn), + Online: fullIdentity, + }) if err != nil { return nil, nil, err } + nodes[i] = node + apis[i] = coreapi.NewCoreAPI(node) + } - ident = config.Identity{ - PeerID: id.Pretty(), - PrivKey: base64.StdEncoding.EncodeToString(kbytes), - } - } else { - ident = config.Identity{ - PeerID: testPeerID, - } + err := mn.LinkAll() + if err != nil { + return nil, nil, err } - r := &repo.Mock{ - C: config.Config{ - Identity: ident, + bsinf := core.BootstrapConfigWithPeers( + []pstore.PeerInfo{ + nodes[0].Peerstore.PeerInfo(nodes[0].Identity), }, - D: syncds.MutexWrap(datastore.NewMapDatastore()), - K: keystore.NewMemKeystore(), + ) + + for _, n := range nodes[1:] { + if err := n.Bootstrap(bsinf); err != nil { + return nil, nil, err + } } - node, err := core.NewNode(ctx, &core.BuildCfg{Repo: r}) + + return nodes, apis, nil +} + +func makeAPI(ctx context.Context) (*core.IpfsNode, coreiface.CoreAPI, error) { + nd, api, err := makeAPISwarm(ctx, false, 1) if err != nil { return nil, nil, err } - api := coreapi.NewCoreAPI(node) - return node, api, nil -} -func makeAPI(ctx context.Context) (*core.IpfsNode, coreiface.CoreAPI, error) { - return makeAPIIdent(ctx, false) + return nd[0], api[0], nil } func TestAdd(t *testing.T) { diff --git a/exchange/reprovide/providers.go b/exchange/reprovide/providers.go index df688a22907..c3ff84b86b4 100644 --- a/exchange/reprovide/providers.go +++ b/exchange/reprovide/providers.go @@ -6,6 +6,7 @@ import ( pin "github.com/ipfs/go-ipfs/pin" merkledag "gx/ipfs/QmNr4E8z9bGTztvHJktp7uQaMdx9p3r9Asrq6eYk7iCh4a/go-merkledag" + cidutil "gx/ipfs/QmPyxJ2QS7L5FhGkNYkNcXHGjDhvGHueJ4auqAstFHYxy5/go-cidutil" ipld "gx/ipfs/QmX5CsuHyVZeTLxgRSYkgLSDQKb9UjE8xnhQzCEJWWWFsC/go-ipld-format" cid "gx/ipfs/QmZFbDTY9jfSBms2MchvYM9oYRbAF19K7Pby47yDBfpPrb/go-cid" blocks "gx/ipfs/Qmeg56ecxRnVv7VWViMrDeEMoBHaNFMs4vQnyQrJ79Zz7i/go-ipfs-blockstore" @@ -29,7 +30,7 @@ func NewPinnedProvider(pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool) outCh := make(chan *cid.Cid) go func() { defer close(outCh) - for c := range set.new { + for c := range set.New { select { case <-ctx.Done(): return @@ -43,21 +44,23 @@ func NewPinnedProvider(pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool) } } -func pinSet(ctx context.Context, pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool) (*streamingSet, error) { - set := newStreamingSet() +func pinSet(ctx context.Context, pinning pin.Pinner, dag ipld.DAGService, onlyRoots bool) (*cidutil.StreamingSet, error) { + set := cidutil.NewStreamingSet() go func() { - defer close(set.new) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + defer close(set.New) for _, key := range pinning.DirectKeys() { - set.add(key) + set.Visitor(ctx)(key) } for _, key := range pinning.RecursiveKeys() { - set.add(key) + set.Visitor(ctx)(key) if !onlyRoots { - err := merkledag.EnumerateChildren(ctx, merkledag.GetLinksWithDAG(dag), key, set.add) + err := merkledag.EnumerateChildren(ctx, merkledag.GetLinksWithDAG(dag), key, set.Visitor(ctx)) if err != nil { log.Errorf("reprovide indirect pins: %s", err) return @@ -68,27 +71,3 @@ func pinSet(ctx context.Context, pinning pin.Pinner, dag ipld.DAGService, onlyRo return set, nil } - -type streamingSet struct { - set *cid.Set - new chan *cid.Cid -} - -// NewSet initializes and returns a new Set. -func newStreamingSet() *streamingSet { - return &streamingSet{ - set: cid.NewSet(), - new: make(chan *cid.Cid), - } -} - -// add adds a Cid to the set only if it is -// not in it already. -func (s *streamingSet) add(c *cid.Cid) bool { - if s.set.Visit(c) { - s.new <- c - return true - } - - return false -}