From 66653733598ff9ed606f71111d5562c79f5caa6d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Sat, 31 Mar 2018 02:06:56 +0200 Subject: [PATCH] coreapi/dht: Review, cleanup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Ɓukasz Magiera --- core/coreapi/dht.go | 145 +++------------------------------- core/coreapi/interface/dht.go | 4 +- 2 files changed, 13 insertions(+), 136 deletions(-) diff --git a/core/coreapi/dht.go b/core/coreapi/dht.go index a27628b4229d..a9d0812093c7 100644 --- a/core/coreapi/dht.go +++ b/core/coreapi/dht.go @@ -10,10 +10,7 @@ import ( dag "github.com/ipfs/go-ipfs/merkledag" routing "gx/ipfs/QmTiWLZ6Fo5j4KcTVutZJ5KWRRJrbxzmxA4td8NfEdrPh7/go-libp2p-routing" - notif "gx/ipfs/QmTiWLZ6Fo5j4KcTVutZJ5KWRRJrbxzmxA4td8NfEdrPh7/go-libp2p-routing/notifications" - ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr" pstore "gx/ipfs/QmXauCuJzmzapetmC6W4TuDJLL1yFFrVzSHoWv8YdbmnxH/go-libp2p-peerstore" - ipdht "gx/ipfs/QmY1y2M1aCcVhy8UuTbZJBvuFbegZm47f9cDAdgxiehQfx/go-libp2p-kad-dht" peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer" cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid" ipld "gx/ipfs/Qme5bWv7wtjUNGsK2BNGVUFPKiuxWrsqrtvYwCLRw8YFES/go-ipld-format" @@ -23,73 +20,16 @@ var ErrNotDHT = errors.New("routing service is not a DHT") type DhtAPI CoreAPI -func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (<-chan ma.Multiaddr, error) { - dht, ok := api.node.Routing.(*ipdht.IpfsDHT) - if !ok { - return nil, ErrNotDHT - } - - outChan := make(chan ma.Multiaddr) - events := make(chan *notif.QueryEvent) - ctx = notif.RegisterForQueryEvents(ctx, events) - - go func() { - defer close(outChan) - - sendAddrs := func(responses []*pstore.PeerInfo) error { - for _, response := range responses { - for _, addr := range response.Addrs { - select { - case outChan <- addr: - case <-ctx.Done(): - return ctx.Err() - } - } - } - return nil - } - - for event := range events { - if event.Type == notif.FinalPeer { - err := sendAddrs(event.Responses) - if err != nil { - return - } - } - } - }() - - go func() { - defer close(events) - pi, err := dht.FindPeer(ctx, peer.ID(p)) - if err != nil { - notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ - Type: notif.QueryError, - Extra: err.Error(), - }) - return - } - - notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ - Type: notif.FinalPeer, - Responses: []*pstore.PeerInfo{&pi}, - }) - }() - - return outChan, nil +func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (pstore.PeerInfo, error) { + return api.node.Routing.FindPeer(ctx, p) } -func (api *DhtAPI) FindProviders(ctx context.Context, p coreiface.Path, opts ...caopts.DhtFindProvidersOption) (<-chan peer.ID, error) { +func (api *DhtAPI) FindProviders(ctx context.Context, p coreiface.Path, opts ...caopts.DhtFindProvidersOption) (<-chan pstore.PeerInfo, error) { settings, err := caopts.DhtFindProvidersOptions(opts...) if err != nil { return nil, err } - dht, ok := api.node.Routing.(*ipdht.IpfsDHT) - if !ok { - return nil, ErrNotDHT - } - p, err = api.core().ResolvePath(ctx, p) if err != nil { return nil, err @@ -99,50 +39,10 @@ func (api *DhtAPI) FindProviders(ctx context.Context, p coreiface.Path, opts ... numProviders := settings.NumProviders if numProviders < 1 { - return nil, fmt.Errorf("number of providers must be greater than 0") + return nil, fmt.Errorf("number of providers to find must be greater than 0") } - outChan := make(chan peer.ID) - events := make(chan *notif.QueryEvent) - ctx = notif.RegisterForQueryEvents(ctx, events) - - pchan := dht.FindProvidersAsync(ctx, c, numProviders) - go func() { - defer close(outChan) - - sendProviders := func(responses []*pstore.PeerInfo) error { - for _, response := range responses { - select { - case outChan <- response.ID: - case <-ctx.Done(): - return ctx.Err() - } - } - return nil - } - - for event := range events { - if event.Type == notif.Provider { - err := sendProviders(event.Responses) - if err != nil { - return - } - } - } - }() - - go func() { - defer close(events) - for p := range pchan { - np := p - notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ - Type: notif.Provider, - Responses: []*pstore.PeerInfo{&np}, - }) - } - }() - - return outChan, nil + return api.node.Routing.FindProvidersAsync(ctx, c, numProviders), nil } func (api *DhtAPI) Provide(ctx context.Context, path coreiface.Path, opts ...caopts.DhtProvideOption) error { @@ -155,43 +55,19 @@ func (api *DhtAPI) Provide(ctx context.Context, path coreiface.Path, opts ...cao return errors.New("cannot provide in offline mode") } - if len(api.node.PeerHost.Network().Conns()) == 0 { - return errors.New("cannot provide, no connected peers") - } - - c := path.Cid() - - has, err := api.node.Blockstore.Has(c) + has, err := api.node.Blockstore.Has(path.Cid()) if err != nil { return err } if !has { - return fmt.Errorf("block %s not found locally, cannot provide", c) + return fmt.Errorf("block %s not found locally, cannot provide", path.Cid()) } - //TODO: either remove or use - //outChan := make(chan interface{}) - - //events := make(chan *notif.QueryEvent) - //ctx = notif.RegisterForQueryEvents(ctx, events) - - /*go func() { - defer close(outChan) - for range events { - select { - case <-ctx.Done(): - return - default: - } - } - }()*/ - - //defer close(events) if settings.Recursive { - err = provideKeysRec(ctx, api.node.Routing, api.node.DAG, []*cid.Cid{c}) + err = provideKeysRec(ctx, api.node.Routing, api.node.DAG, []*cid.Cid{path.Cid()}) } else { - err = provideKeys(ctx, api.node.Routing, []*cid.Cid{c}) + err = provideKeys(ctx, api.node.Routing, []*cid.Cid{path.Cid()}) } if err != nil { return err @@ -211,10 +87,11 @@ func provideKeys(ctx context.Context, r routing.IpfsRouting, cids []*cid.Cid) er } func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv ipld.DAGService, cids []*cid.Cid) error { - provided := cid.NewSet() + provided := cid.NewSet() //TODO: Use a bloom filter for _, c := range cids { kset := cid.NewSet() + //TODO: After https://github.com/ipfs/go-ipfs/pull/4333 is merged, use n.Provider for this err := dag.EnumerateChildrenAsync(ctx, dag.GetLinksDirect(dserv), c, kset.Visit) if err != nil { return err diff --git a/core/coreapi/interface/dht.go b/core/coreapi/interface/dht.go index c49903bee763..60cde426a1a6 100644 --- a/core/coreapi/interface/dht.go +++ b/core/coreapi/interface/dht.go @@ -5,7 +5,7 @@ import ( options "github.com/ipfs/go-ipfs/core/coreapi/interface/options" - ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr" + pstore "gx/ipfs/QmXauCuJzmzapetmC6W4TuDJLL1yFFrVzSHoWv8YdbmnxH/go-libp2p-peerstore" peer "gx/ipfs/QmZoWKhxUmZ2seW4BzX6fJkNR8hh9PsGModr7q171yq2SS/go-libp2p-peer" ) @@ -13,7 +13,7 @@ import ( type DhtAPI interface { // FindPeer queries the DHT for all of the multiaddresses associated with a // Peer ID - FindPeer(context.Context, peer.ID) (<-chan ma.Multiaddr, error) + FindPeer(context.Context, peer.ID) (pstore.PeerInfo, error) // FindProviders finds peers in the DHT who can provide a specific value // given a key.