Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ipfs name resolve --stream #5404

Merged
merged 15 commits into from
Oct 18, 2018
26 changes: 24 additions & 2 deletions core/commands/name/ipns.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
nocacheOptionName = "nocache"
dhtRecordCountOptionName = "dht-record-count"
dhtTimeoutOptionName = "dht-timeout"
streamOptionName = "stream"
)

var IpnsCmd = &cmds.Command{
Expand Down Expand Up @@ -77,6 +78,7 @@ Resolve the value of a dnslink:
cmdkit.BoolOption(nocacheOptionName, "n", "Do not use cached entries."),
cmdkit.UintOption(dhtRecordCountOptionName, "dhtrc", "Number of records to request for DHT resolution."),
cmdkit.StringOption(dhtTimeoutOptionName, "dhtt", "Max time to collect values during DHT resolution eg \"30s\". Pass 0 for no timeout."),
cmdkit.BoolOption(streamOptionName, "s", "Stream entries as they are found."),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
api, err := cmdenv.GetApi(env)
Expand All @@ -101,6 +103,7 @@ Resolve the value of a dnslink:
recursive, _ := req.Options[recursiveOptionName].(bool)
rc, rcok := req.Options[dhtRecordCountOptionName].(int)
dhtt, dhttok := req.Options[dhtTimeoutOptionName].(string)
stream, _ := req.Options[streamOptionName].(bool)

opts := []options.NameResolveOption{
options.Name.Local(local),
Expand Down Expand Up @@ -128,12 +131,31 @@ Resolve the value of a dnslink:
name = "/ipns/" + name
}

output, err := api.Name().Resolve(req.Context, name, opts...)
if !stream {
output, err := api.Name().Resolve(req.Context, name, opts...)
if err != nil {
return err
}

return cmds.EmitOnce(res, &ResolvedPath{path.FromString(output.String())})
}

output, err := api.Name().Search(req.Context, name, opts...)
if err != nil {
return err
}

return cmds.EmitOnce(res, &ResolvedPath{path.FromString(output.String())})
for v := range output {
if v.Err != nil {
return err
}
if err := res.Emit(&ResolvedPath{path.FromString(v.Path.String())}); err != nil {
return err
}

}

return nil
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
Expand Down
3 changes: 2 additions & 1 deletion core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,8 +523,8 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost
n.RecordValidator,
)
n.Routing = rhelpers.Tiered{
// Always check pubsub first.
Routers: []routing.IpfsRouting{
// Always check pubsub first.
&rhelpers.Compose{
ValueStore: &rhelpers.LimitedValueStore{
ValueStore: n.PSRouter,
Expand All @@ -533,6 +533,7 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost
},
n.Routing,
},
Validator: n.RecordValidator,
}
}

Expand Down
15 changes: 15 additions & 0 deletions core/coreapi/interface/name.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package iface

import (
"context"
"errors"

options "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
)

var ErrResolveFailed = errors.New("could not resolve name")

// IpnsEntry specifies the interface to IpnsEntries
type IpnsEntry interface {
// Name returns IpnsEntry name
Expand All @@ -14,6 +17,11 @@ type IpnsEntry interface {
Value() Path
}

type IpnsResult struct {
Path
Err error
}

// NameAPI specifies the interface to IPNS.
//
// IPNS is a PKI namespace, where names are the hashes of public keys, and the
Expand All @@ -28,4 +36,11 @@ type NameAPI interface {

// Resolve attempts to resolve the newest version of the specified name
Resolve(ctx context.Context, name string, opts ...options.NameResolveOption) (Path, error)

// Search is a version of Resolve which outputs paths as they are discovered,
// reducing the time to first entry
//
// Note: by default, all paths read from the channel are considered unsafe,
// except the latest (last path in channel read buffer).
Search(ctx context.Context, name string, opts ...options.NameResolveOption) (<-chan IpnsResult, error)
}
40 changes: 34 additions & 6 deletions core/coreapi/name.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import (
caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
"github.com/ipfs/go-ipfs/keystore"
"github.com/ipfs/go-ipfs/namesys"
ipath "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path"

"gx/ipfs/QmPvyPwuCgJ7pDmrKDxRtsScJgBaM5h4EpRL2qQJsmXf4n/go-libp2p-crypto"
"gx/ipfs/QmQ9PR61a8rwEFuFNs7JMA1QtQC9yZnBwoDn51JWXDbaTd/go-ipfs-routing/offline"
"gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer"
ipath "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path"
)

type NameAPI CoreAPI
Expand Down Expand Up @@ -89,9 +89,7 @@ func (api *NameAPI) Publish(ctx context.Context, p coreiface.Path, opts ...caopt
}, nil
}

// Resolve attempts to resolve the newest version of the specified name and
// returns its path.
func (api *NameAPI) Resolve(ctx context.Context, name string, opts ...caopts.NameResolveOption) (coreiface.Path, error) {
func (api *NameAPI) Search(ctx context.Context, name string, opts ...caopts.NameResolveOption) (<-chan coreiface.IpnsResult, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document this function.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are docs for this on the interface (like all other coreapi functions). We may want to point people there, but I'd this in a separate PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just do this now. Separate PRs for documentation tend to not happen.

options, err := caopts.NameResolveOptions(opts...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -125,12 +123,42 @@ func (api *NameAPI) Resolve(ctx context.Context, name string, opts ...caopts.Nam
name = "/ipns/" + name
}

output, err := resolver.Resolve(ctx, name, options.ResolveOpts...)
out := make(chan coreiface.IpnsResult)
go func() {
defer close(out)
for res := range resolver.ResolveAsync(ctx, name, options.ResolveOpts...) {
p, _ := coreiface.ParsePath(res.Path.String())
magik6k marked this conversation as resolved.
Show resolved Hide resolved

select {
case out <- coreiface.IpnsResult{Path: p, Err: res.Err}:
case <-ctx.Done():
return
}
}
}()

return out, nil
}

// Resolve attempts to resolve the newest version of the specified name and
// returns its path.
func (api *NameAPI) Resolve(ctx context.Context, name string, opts ...caopts.NameResolveOption) (coreiface.Path, error) {
results, err := api.Search(ctx, name, opts...)
if err != nil {
return nil, err
}

return coreiface.ParsePath(output.String())
err = coreiface.ErrResolveFailed
var p coreiface.Path

for res := range results {
p, err = res.Path, res.Err
if err != nil {
break
}
}

return p, err
}

func keylookup(n *core.IpfsNode, k string) (crypto.PrivKey, error) {
Expand Down
10 changes: 9 additions & 1 deletion core/corehttp/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type mockNamesys map[string]path.Path
func (m mockNamesys) Resolve(ctx context.Context, name string, opts ...nsopts.ResolveOpt) (value path.Path, err error) {
cfg := nsopts.DefaultResolveOpts()
for _, o := range opts {
o(cfg)
o(&cfg)
}
depth := cfg.Depth
if depth == nsopts.UnlimitedDepth {
Expand All @@ -57,6 +57,14 @@ func (m mockNamesys) Resolve(ctx context.Context, name string, opts ...nsopts.Re
return value, nil
}

func (m mockNamesys) ResolveAsync(ctx context.Context, name string, opts ...nsopts.ResolveOpt) <-chan namesys.Result {
out := make(chan namesys.Result, 1)
v, err := m.Resolve(ctx, name, opts...)
out <- namesys.Result{Path: v, Err: err}
close(out)
return out
}

func (m mockNamesys) Publish(ctx context.Context, name ci.PrivKey, value path.Path) error {
return errors.New("not implemented for mockNamesys")
}
Expand Down
127 changes: 96 additions & 31 deletions namesys/base.go
Original file line number Diff line number Diff line change
@@ -1,56 +1,121 @@
package namesys

import (
"context"
"strings"
"time"

context "context"

opts "github.com/ipfs/go-ipfs/namesys/opts"

path "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path"
)

type onceResult struct {
value path.Path
ttl time.Duration
err error
}

type resolver interface {
// resolveOnce looks up a name once (without recursion).
resolveOnce(ctx context.Context, name string, options *opts.ResolveOpts) (value path.Path, ttl time.Duration, err error)
resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult
}

// resolve is a helper for implementing Resolver.ResolveN using resolveOnce.
func resolve(ctx context.Context, r resolver, name string, options *opts.ResolveOpts, prefixes ...string) (path.Path, error) {
depth := options.Depth
for {
p, _, err := r.resolveOnce(ctx, name, options)
func resolve(ctx context.Context, r resolver, name string, options opts.ResolveOpts) (path.Path, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

err := ErrResolveFailed
var p path.Path

resCh := resolveAsync(ctx, r, name, options)

for res := range resCh {
p, err = res.Path, res.Err
if err != nil {
return "", err
break
}
log.Debugf("resolved %s to %s", name, p.String())
}

if strings.HasPrefix(p.String(), "/ipfs/") {
// we've bottomed out with an IPFS path
return p, nil
}
return p, err
}

if depth == 1 {
return p, ErrResolveRecursion
}
func resolveAsync(ctx context.Context, r resolver, name string, options opts.ResolveOpts) <-chan Result {
resCh := r.resolveOnceAsync(ctx, name, options)
depth := options.Depth
outCh := make(chan Result, 1)

matched := false
for _, prefix := range prefixes {
if strings.HasPrefix(p.String(), prefix) {
matched = true
if len(prefixes) == 1 {
name = strings.TrimPrefix(p.String(), prefix)
}
break
go func() {
defer close(outCh)
var subCh <-chan Result
var cancelSub context.CancelFunc
defer func() {
if cancelSub != nil {
cancelSub()
}
}
}()

if !matched {
return p, nil
}
for {
select {
case res, ok := <-resCh:
if !ok {
resCh = nil
break
}

if res.err != nil {
emitResult(ctx, outCh, Result{Err: res.err})
return
}
log.Debugf("resolved %s to %s", name, res.value.String())
if !strings.HasPrefix(res.value.String(), ipnsPrefix) {
emitResult(ctx, outCh, Result{Path: res.value})
break
}

if depth == 1 {
emitResult(ctx, outCh, Result{Path: res.value, Err: ErrResolveRecursion})
break
}

subopts := options
if subopts.Depth > 1 {
subopts.Depth--
}

if depth > 1 {
depth--
var subCtx context.Context
if cancelSub != nil {
// Cancel previous recursive resolve since it won't be used anyways
cancelSub()
}
subCtx, cancelSub = context.WithCancel(ctx)
_ = cancelSub

p := strings.TrimPrefix(res.value.String(), ipnsPrefix)
subCh = resolveAsync(subCtx, r, p, subopts)
case res, ok := <-subCh:
if !ok {
subCh = nil
break
}

// We don't bother returning here in case of context timeout as there is
// no good reason to do that, and we may still be able to emit a result
emitResult(ctx, outCh, res)
magik6k marked this conversation as resolved.
Show resolved Hide resolved
case <-ctx.Done():
return
}
if resCh == nil && subCh == nil {
return
}
}
}()
return outCh
}

func emitResult(ctx context.Context, outCh chan<- Result, r Result) {
select {
case outCh <- r:
case <-ctx.Done():
}
}
Loading