diff --git a/core/commands/name/ipns.go b/core/commands/name/ipns.go index d8ad67c5ae8..0782cf6d661 100644 --- a/core/commands/name/ipns.go +++ b/core/commands/name/ipns.go @@ -29,6 +29,7 @@ const ( nocacheOptionName = "nocache" dhtRecordCountOptionName = "dht-record-count" dhtTimeoutOptionName = "dht-timeout" + streamOptionName = "stream" ) var IpnsCmd = &cmds.Command{ @@ -77,6 +78,7 @@ Resolve the value of a dnslink: cmdkit.BoolOption(nocacheOptionName, "n", "Do not use cached entries."), cmdkit.UintOption(dhtRecordCountOptionName, "dhtrc", "Number of records to request for DHT resolution."), cmdkit.StringOption(dhtTimeoutOptionName, "dhtt", "Max time to collect values during DHT resolution eg \"30s\". Pass 0 for no timeout."), + cmdkit.BoolOption(streamOptionName, "s", "Stream entries as they are found."), }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { api, err := cmdenv.GetApi(env) @@ -101,6 +103,7 @@ Resolve the value of a dnslink: recursive, _ := req.Options[recursiveOptionName].(bool) rc, rcok := req.Options[dhtRecordCountOptionName].(int) dhtt, dhttok := req.Options[dhtTimeoutOptionName].(string) + stream, _ := req.Options[streamOptionName].(bool) opts := []options.NameResolveOption{ options.Name.Local(local), @@ -128,12 +131,31 @@ Resolve the value of a dnslink: name = "/ipns/" + name } - output, err := api.Name().Resolve(req.Context, name, opts...) + if !stream { + output, err := api.Name().Resolve(req.Context, name, opts...) + if err != nil { + return err + } + + return cmds.EmitOnce(res, &ResolvedPath{path.FromString(output.String())}) + } + + output, err := api.Name().Search(req.Context, name, opts...) if err != nil { return err } - return cmds.EmitOnce(res, &ResolvedPath{path.FromString(output.String())}) + for v := range output { + if v.Err != nil { + return err + } + if err := res.Emit(&ResolvedPath{path.FromString(v.Path.String())}); err != nil { + return err + } + + } + + return nil }, Encoders: cmds.EncoderMap{ cmds.Text: cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error { diff --git a/core/core.go b/core/core.go index f9a7e1114f8..195b1460706 100644 --- a/core/core.go +++ b/core/core.go @@ -523,8 +523,8 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost n.RecordValidator, ) n.Routing = rhelpers.Tiered{ - // Always check pubsub first. Routers: []routing.IpfsRouting{ + // Always check pubsub first. &rhelpers.Compose{ ValueStore: &rhelpers.LimitedValueStore{ ValueStore: n.PSRouter, @@ -533,6 +533,7 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost }, n.Routing, }, + Validator: n.RecordValidator, } } diff --git a/core/coreapi/interface/name.go b/core/coreapi/interface/name.go index a6aad0c3e6f..a02bc078748 100644 --- a/core/coreapi/interface/name.go +++ b/core/coreapi/interface/name.go @@ -2,10 +2,13 @@ package iface import ( "context" + "errors" options "github.com/ipfs/go-ipfs/core/coreapi/interface/options" ) +var ErrResolveFailed = errors.New("could not resolve name") + // IpnsEntry specifies the interface to IpnsEntries type IpnsEntry interface { // Name returns IpnsEntry name @@ -14,6 +17,11 @@ type IpnsEntry interface { Value() Path } +type IpnsResult struct { + Path + Err error +} + // NameAPI specifies the interface to IPNS. // // IPNS is a PKI namespace, where names are the hashes of public keys, and the @@ -28,4 +36,11 @@ type NameAPI interface { // Resolve attempts to resolve the newest version of the specified name Resolve(ctx context.Context, name string, opts ...options.NameResolveOption) (Path, error) + + // Search is a version of Resolve which outputs paths as they are discovered, + // reducing the time to first entry + // + // Note: by default, all paths read from the channel are considered unsafe, + // except the latest (last path in channel read buffer). + Search(ctx context.Context, name string, opts ...options.NameResolveOption) (<-chan IpnsResult, error) } diff --git a/core/coreapi/name.go b/core/coreapi/name.go index 57b276161d0..b5bf3fbd633 100644 --- a/core/coreapi/name.go +++ b/core/coreapi/name.go @@ -12,11 +12,11 @@ import ( caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options" "github.com/ipfs/go-ipfs/keystore" "github.com/ipfs/go-ipfs/namesys" - ipath "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path" "gx/ipfs/QmPvyPwuCgJ7pDmrKDxRtsScJgBaM5h4EpRL2qQJsmXf4n/go-libp2p-crypto" "gx/ipfs/QmQ9PR61a8rwEFuFNs7JMA1QtQC9yZnBwoDn51JWXDbaTd/go-ipfs-routing/offline" "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer" + ipath "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path" ) type NameAPI CoreAPI @@ -89,9 +89,7 @@ func (api *NameAPI) Publish(ctx context.Context, p coreiface.Path, opts ...caopt }, nil } -// Resolve attempts to resolve the newest version of the specified name and -// returns its path. -func (api *NameAPI) Resolve(ctx context.Context, name string, opts ...caopts.NameResolveOption) (coreiface.Path, error) { +func (api *NameAPI) Search(ctx context.Context, name string, opts ...caopts.NameResolveOption) (<-chan coreiface.IpnsResult, error) { options, err := caopts.NameResolveOptions(opts...) if err != nil { return nil, err @@ -125,12 +123,42 @@ func (api *NameAPI) Resolve(ctx context.Context, name string, opts ...caopts.Nam name = "/ipns/" + name } - output, err := resolver.Resolve(ctx, name, options.ResolveOpts...) + out := make(chan coreiface.IpnsResult) + go func() { + defer close(out) + for res := range resolver.ResolveAsync(ctx, name, options.ResolveOpts...) { + p, _ := coreiface.ParsePath(res.Path.String()) + + select { + case out <- coreiface.IpnsResult{Path: p, Err: res.Err}: + case <-ctx.Done(): + return + } + } + }() + + return out, nil +} + +// Resolve attempts to resolve the newest version of the specified name and +// returns its path. +func (api *NameAPI) Resolve(ctx context.Context, name string, opts ...caopts.NameResolveOption) (coreiface.Path, error) { + results, err := api.Search(ctx, name, opts...) if err != nil { return nil, err } - return coreiface.ParsePath(output.String()) + err = coreiface.ErrResolveFailed + var p coreiface.Path + + for res := range results { + p, err = res.Path, res.Err + if err != nil { + break + } + } + + return p, err } func keylookup(n *core.IpfsNode, k string) (crypto.PrivKey, error) { diff --git a/core/corehttp/gateway_test.go b/core/corehttp/gateway_test.go index aebe93d8c57..e7e34af559b 100644 --- a/core/corehttp/gateway_test.go +++ b/core/corehttp/gateway_test.go @@ -35,7 +35,7 @@ type mockNamesys map[string]path.Path func (m mockNamesys) Resolve(ctx context.Context, name string, opts ...nsopts.ResolveOpt) (value path.Path, err error) { cfg := nsopts.DefaultResolveOpts() for _, o := range opts { - o(cfg) + o(&cfg) } depth := cfg.Depth if depth == nsopts.UnlimitedDepth { @@ -57,6 +57,14 @@ func (m mockNamesys) Resolve(ctx context.Context, name string, opts ...nsopts.Re return value, nil } +func (m mockNamesys) ResolveAsync(ctx context.Context, name string, opts ...nsopts.ResolveOpt) <-chan namesys.Result { + out := make(chan namesys.Result, 1) + v, err := m.Resolve(ctx, name, opts...) + out <- namesys.Result{Path: v, Err: err} + close(out) + return out +} + func (m mockNamesys) Publish(ctx context.Context, name ci.PrivKey, value path.Path) error { return errors.New("not implemented for mockNamesys") } diff --git a/namesys/base.go b/namesys/base.go index a650f094afa..f90e8add164 100644 --- a/namesys/base.go +++ b/namesys/base.go @@ -1,56 +1,121 @@ package namesys import ( + "context" "strings" "time" - context "context" - opts "github.com/ipfs/go-ipfs/namesys/opts" + path "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path" ) +type onceResult struct { + value path.Path + ttl time.Duration + err error +} + type resolver interface { - // resolveOnce looks up a name once (without recursion). - resolveOnce(ctx context.Context, name string, options *opts.ResolveOpts) (value path.Path, ttl time.Duration, err error) + resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult } // resolve is a helper for implementing Resolver.ResolveN using resolveOnce. -func resolve(ctx context.Context, r resolver, name string, options *opts.ResolveOpts, prefixes ...string) (path.Path, error) { - depth := options.Depth - for { - p, _, err := r.resolveOnce(ctx, name, options) +func resolve(ctx context.Context, r resolver, name string, options opts.ResolveOpts) (path.Path, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + err := ErrResolveFailed + var p path.Path + + resCh := resolveAsync(ctx, r, name, options) + + for res := range resCh { + p, err = res.Path, res.Err if err != nil { - return "", err + break } - log.Debugf("resolved %s to %s", name, p.String()) + } - if strings.HasPrefix(p.String(), "/ipfs/") { - // we've bottomed out with an IPFS path - return p, nil - } + return p, err +} - if depth == 1 { - return p, ErrResolveRecursion - } +func resolveAsync(ctx context.Context, r resolver, name string, options opts.ResolveOpts) <-chan Result { + resCh := r.resolveOnceAsync(ctx, name, options) + depth := options.Depth + outCh := make(chan Result, 1) - matched := false - for _, prefix := range prefixes { - if strings.HasPrefix(p.String(), prefix) { - matched = true - if len(prefixes) == 1 { - name = strings.TrimPrefix(p.String(), prefix) - } - break + go func() { + defer close(outCh) + var subCh <-chan Result + var cancelSub context.CancelFunc + defer func() { + if cancelSub != nil { + cancelSub() } - } + }() - if !matched { - return p, nil - } + for { + select { + case res, ok := <-resCh: + if !ok { + resCh = nil + break + } + + if res.err != nil { + emitResult(ctx, outCh, Result{Err: res.err}) + return + } + log.Debugf("resolved %s to %s", name, res.value.String()) + if !strings.HasPrefix(res.value.String(), ipnsPrefix) { + emitResult(ctx, outCh, Result{Path: res.value}) + break + } + + if depth == 1 { + emitResult(ctx, outCh, Result{Path: res.value, Err: ErrResolveRecursion}) + break + } + + subopts := options + if subopts.Depth > 1 { + subopts.Depth-- + } - if depth > 1 { - depth-- + var subCtx context.Context + if cancelSub != nil { + // Cancel previous recursive resolve since it won't be used anyways + cancelSub() + } + subCtx, cancelSub = context.WithCancel(ctx) + _ = cancelSub + + p := strings.TrimPrefix(res.value.String(), ipnsPrefix) + subCh = resolveAsync(subCtx, r, p, subopts) + case res, ok := <-subCh: + if !ok { + subCh = nil + break + } + + // We don't bother returning here in case of context timeout as there is + // no good reason to do that, and we may still be able to emit a result + emitResult(ctx, outCh, res) + case <-ctx.Done(): + return + } + if resCh == nil && subCh == nil { + return + } } + }() + return outCh +} + +func emitResult(ctx context.Context, outCh chan<- Result, r Result) { + select { + case outCh <- r: + case <-ctx.Done(): } } diff --git a/namesys/dns.go b/namesys/dns.go index 7aa4e24d2fc..bd62b7d220f 100644 --- a/namesys/dns.go +++ b/namesys/dns.go @@ -5,9 +5,9 @@ import ( "errors" "net" "strings" - "time" opts "github.com/ipfs/go-ipfs/namesys/opts" + isd "gx/ipfs/QmZmmuAXgX73UQmX1jRKjTGmjzq24Jinqkq8vzkBtno4uX/go-is-domain" path "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path" ) @@ -28,7 +28,12 @@ func NewDNSResolver() *DNSResolver { // Resolve implements Resolver. func (r *DNSResolver) Resolve(ctx context.Context, name string, options ...opts.ResolveOpt) (path.Path, error) { - return resolve(ctx, r, name, opts.ProcessOpts(options), "/ipns/") + return resolve(ctx, r, name, opts.ProcessOpts(options)) +} + +// ResolveAsync implements Resolver. +func (r *DNSResolver) ResolveAsync(ctx context.Context, name string, options ...opts.ResolveOpt) <-chan Result { + return resolveAsync(ctx, r, name, opts.ProcessOpts(options)) } type lookupRes struct { @@ -39,12 +44,15 @@ type lookupRes struct { // resolveOnce implements resolver. // TXT records for a given domain name should contain a b58 // encoded multihash. -func (r *DNSResolver) resolveOnce(ctx context.Context, name string, options *opts.ResolveOpts) (path.Path, time.Duration, error) { +func (r *DNSResolver) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult { + out := make(chan onceResult, 1) segments := strings.SplitN(name, "/", 2) domain := segments[0] if !isd.IsDomain(domain) { - return "", 0, errors.New("not a valid domain name") + out <- onceResult{err: errors.New("not a valid domain name")} + close(out) + return out } log.Debugf("DNSResolver resolving %s", domain) @@ -54,39 +62,52 @@ func (r *DNSResolver) resolveOnce(ctx context.Context, name string, options *opt subChan := make(chan lookupRes, 1) go workDomain(r, "_dnslink."+domain, subChan) - var subRes lookupRes - select { - case subRes = <-subChan: - case <-ctx.Done(): - return "", 0, ctx.Err() + appendPath := func(p path.Path) (path.Path, error) { + if len(segments) > 1 { + return path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[1]) + } + return p, nil } - var p path.Path - if subRes.error == nil { - p = subRes.path - } else { - var rootRes lookupRes - select { - case rootRes = <-rootChan: - case <-ctx.Done(): - return "", 0, ctx.Err() - } - if rootRes.error == nil { - p = rootRes.path - } else { - return "", 0, ErrResolveFailed + go func() { + defer close(out) + for { + select { + case subRes, ok := <-subChan: + if !ok { + subChan = nil + break + } + if subRes.error == nil { + p, err := appendPath(subRes.path) + emitOnceResult(ctx, out, onceResult{value: p, err: err}) + return + } + case rootRes, ok := <-rootChan: + if !ok { + rootChan = nil + break + } + if rootRes.error == nil { + p, err := appendPath(rootRes.path) + emitOnceResult(ctx, out, onceResult{value: p, err: err}) + } + case <-ctx.Done(): + return + } + if subChan == nil && rootChan == nil { + return + } } - } - var err error - if len(segments) > 1 { - p, err = path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[1]) - } - return p, 0, err + }() + + return out } func workDomain(r *DNSResolver, name string, res chan lookupRes) { - txt, err := r.lookupTXT(name) + defer close(res) + txt, err := r.lookupTXT(name) if err != nil { // Error is != nil res <- lookupRes{"", err} diff --git a/namesys/interface.go b/namesys/interface.go index c8413fbcecd..a1b1308ca84 100644 --- a/namesys/interface.go +++ b/namesys/interface.go @@ -63,6 +63,12 @@ type NameSystem interface { Publisher } +// Result is the return type for Resolver.ResolveAsync. +type Result struct { + Path path.Path + Err error +} + // Resolver is an object capable of resolving names. type Resolver interface { @@ -81,6 +87,11 @@ type Resolver interface { // users will be fine with this default limit, but if you need to // adjust the limit you can specify it as an option. Resolve(ctx context.Context, name string, options ...opts.ResolveOpt) (value path.Path, err error) + + // ResolveAsync performs recursive name lookup, like Resolve, but it returns + // entries as they are discovered in the DHT. Each returned result is guaranteed + // to be "better" (which usually means newer) than the previous one. + ResolveAsync(ctx context.Context, name string, options ...opts.ResolveOpt) <-chan Result } // Publisher is an object capable of publishing particular names. diff --git a/namesys/ipns_resolver_validation_test.go b/namesys/ipns_resolver_validation_test.go index 5504bdea1b5..5bcef4e1474 100644 --- a/namesys/ipns_resolver_validation_test.go +++ b/namesys/ipns_resolver_validation_test.go @@ -5,9 +5,10 @@ import ( "testing" "time" - opts "github.com/ipfs/go-ipfs/namesys/opts" path "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path" + opts "github.com/ipfs/go-ipfs/namesys/opts" + testutil "gx/ipfs/QmNfQbgBfARAtrYsBguChX6VJ5nbjeoYy1KdC36aaYWqG8/go-testutil" u "gx/ipfs/QmPdKqUcHGFdeSpvjVoaTRPPstGif9GBZb5Q56RVw9o69A/go-ipfs-util" routing "gx/ipfs/QmPmFeQ5oY5G6M7aBWggi5phxEPXwsQntE1DFcUzETULdp/go-libp2p-routing" @@ -57,14 +58,13 @@ func TestResolverValidation(t *testing.T) { } // Resolve entry - resp, _, err := resolver.resolveOnce(ctx, id.Pretty(), opts.DefaultResolveOpts()) + resp, err := resolve(ctx, resolver, id.Pretty(), opts.DefaultResolveOpts()) if err != nil { t.Fatal(err) } if resp != path.Path(p) { t.Fatalf("Mismatch between published path %s and resolved path %s", p, resp) } - // Create expired entry expiredEntry, err := ipns.Create(priv, p, 1, ts.Add(-1*time.Hour)) if err != nil { @@ -78,7 +78,7 @@ func TestResolverValidation(t *testing.T) { } // Record should fail validation because entry is expired - _, _, err = resolver.resolveOnce(ctx, id.Pretty(), opts.DefaultResolveOpts()) + _, err = resolve(ctx, resolver, id.Pretty(), opts.DefaultResolveOpts()) if err == nil { t.Fatal("ValidateIpnsRecord should have returned error") } @@ -100,7 +100,7 @@ func TestResolverValidation(t *testing.T) { // Record should fail validation because public key defined by // ipns path doesn't match record signature - _, _, err = resolver.resolveOnce(ctx, id2.Pretty(), opts.DefaultResolveOpts()) + _, err = resolve(ctx, resolver, id2.Pretty(), opts.DefaultResolveOpts()) if err == nil { t.Fatal("ValidateIpnsRecord should have failed signature verification") } @@ -118,7 +118,7 @@ func TestResolverValidation(t *testing.T) { // Record should fail validation because public key is not available // in peer store or on network - _, _, err = resolver.resolveOnce(ctx, id3.Pretty(), opts.DefaultResolveOpts()) + _, err = resolve(ctx, resolver, id3.Pretty(), opts.DefaultResolveOpts()) if err == nil { t.Fatal("ValidateIpnsRecord should have failed because public key was not found") } @@ -133,7 +133,7 @@ func TestResolverValidation(t *testing.T) { // public key is available in the peer store by looking it up in // the DHT, which causes the DHT to fetch it and cache it in the // peer store - _, _, err = resolver.resolveOnce(ctx, id3.Pretty(), opts.DefaultResolveOpts()) + _, err = resolve(ctx, resolver, id3.Pretty(), opts.DefaultResolveOpts()) if err != nil { t.Fatal(err) } diff --git a/namesys/namesys.go b/namesys/namesys.go index fd3853371e5..aa37a93fe9f 100644 --- a/namesys/namesys.go +++ b/namesys/namesys.go @@ -5,9 +5,10 @@ import ( "strings" "time" - opts "github.com/ipfs/go-ipfs/namesys/opts" path "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path" + opts "github.com/ipfs/go-ipfs/namesys/opts" + routing "gx/ipfs/QmPmFeQ5oY5G6M7aBWggi5phxEPXwsQntE1DFcUzETULdp/go-libp2p-routing" mh "gx/ipfs/QmPnFwZ2JXKnXgMw8CdBPxn7FWh6LLdjUjxV1fKHuJnkr8/go-multihash" ci "gx/ipfs/QmPvyPwuCgJ7pDmrKDxRtsScJgBaM5h4EpRL2qQJsmXf4n/go-libp2p-crypto" @@ -61,50 +62,104 @@ func (ns *mpns) Resolve(ctx context.Context, name string, options ...opts.Resolv return path.ParsePath("/ipfs/" + name) } - return resolve(ctx, ns, name, opts.ProcessOpts(options), "/ipns/") + return resolve(ctx, ns, name, opts.ProcessOpts(options)) +} + +func (ns *mpns) ResolveAsync(ctx context.Context, name string, options ...opts.ResolveOpt) <-chan Result { + res := make(chan Result, 1) + if strings.HasPrefix(name, "/ipfs/") { + p, err := path.ParsePath(name) + res <- Result{p, err} + return res + } + + if !strings.HasPrefix(name, "/") { + p, err := path.ParsePath("/ipfs/" + name) + res <- Result{p, err} + return res + } + + return resolveAsync(ctx, ns, name, opts.ProcessOpts(options)) } // resolveOnce implements resolver. -func (ns *mpns) resolveOnce(ctx context.Context, name string, options *opts.ResolveOpts) (path.Path, time.Duration, error) { - if !strings.HasPrefix(name, "/ipns/") { - name = "/ipns/" + name +func (ns *mpns) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult { + out := make(chan onceResult, 1) + + if !strings.HasPrefix(name, ipnsPrefix) { + name = ipnsPrefix + name } segments := strings.SplitN(name, "/", 4) if len(segments) < 3 || segments[0] != "" { log.Debugf("invalid name syntax for %s", name) - return "", 0, ErrResolveFailed + out <- onceResult{err: ErrResolveFailed} + close(out) + return out } key := segments[2] - p, ok := ns.cacheGet(key) - var err error - if !ok { - // Resolver selection: - // 1. if it is a multihash resolve through "ipns". - // 2. if it is a domain name, resolve through "dns" - // 3. otherwise resolve through the "proquint" resolver - var res resolver - if _, err := mh.FromB58String(key); err == nil { - res = ns.ipnsResolver - } else if isd.IsDomain(key) { - res = ns.dnsResolver - } else { - res = ns.proquintResolver - } + if p, ok := ns.cacheGet(key); ok { + out <- onceResult{value: p} + close(out) + return out + } - var ttl time.Duration - p, ttl, err = res.resolveOnce(ctx, key, options) - if err != nil { - return "", 0, ErrResolveFailed - } - ns.cacheSet(key, p, ttl) + // Resolver selection: + // 1. if it is a multihash resolve through "ipns". + // 2. if it is a domain name, resolve through "dns" + // 3. otherwise resolve through the "proquint" resolver + + var res resolver + if _, err := mh.FromB58String(key); err == nil { + res = ns.ipnsResolver + } else if isd.IsDomain(key) { + res = ns.dnsResolver + } else { + res = ns.proquintResolver } - if len(segments) > 3 { - p, err = path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[3]) + resCh := res.resolveOnceAsync(ctx, key, options) + var best onceResult + go func() { + defer close(out) + for { + select { + case res, ok := <-resCh: + if !ok { + if best != (onceResult{}) { + ns.cacheSet(key, best.value, best.ttl) + } + return + } + if res.err == nil { + best = res + } + p := res.value + + // Attach rest of the path + if len(segments) > 3 { + p, err := path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[3]) + if err != nil { + emitOnceResult(ctx, out, onceResult{value: p, ttl: res.ttl, err: err}) + } + } + + emitOnceResult(ctx, out, onceResult{value: p, ttl: res.ttl, err: res.err}) + case <-ctx.Done(): + return + } + } + }() + + return out +} + +func emitOnceResult(ctx context.Context, outCh chan<- onceResult, r onceResult) { + select { + case outCh <- r: + case <-ctx.Done(): } - return p, 0, err } // Publish implements Publisher diff --git a/namesys/namesys_test.go b/namesys/namesys_test.go index 8be28a7d35c..e088b293361 100644 --- a/namesys/namesys_test.go +++ b/namesys/namesys_test.go @@ -4,19 +4,18 @@ import ( "context" "fmt" "testing" - "time" opts "github.com/ipfs/go-ipfs/namesys/opts" - "gx/ipfs/QmWE6Ftsk98cG2MTVgH4wJT8VP2nL9TuBkYTrz9GSqcsh5/go-unixfs" - path "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path" ci "gx/ipfs/QmPvyPwuCgJ7pDmrKDxRtsScJgBaM5h4EpRL2qQJsmXf4n/go-libp2p-crypto" offroute "gx/ipfs/QmQ9PR61a8rwEFuFNs7JMA1QtQC9yZnBwoDn51JWXDbaTd/go-ipfs-routing/offline" + "gx/ipfs/QmWE6Ftsk98cG2MTVgH4wJT8VP2nL9TuBkYTrz9GSqcsh5/go-unixfs" pstoremem "gx/ipfs/QmWtCpWB39Rzc2xTB75MKorsxNpo3TyecTEN24CJ3KVohE/go-libp2p-peerstore/pstoremem" ipns "gx/ipfs/QmX72XT6sSQRkNHKcAFLM2VqB3B4bWPetgWnHY8LgsUVeT/go-ipns" ds "gx/ipfs/QmaRb5yNXKonhbkpNxNawoydk4N6es6b4fPj19sjEKsh5D/go-datastore" dssync "gx/ipfs/QmaRb5yNXKonhbkpNxNawoydk4N6es6b4fPj19sjEKsh5D/go-datastore/sync" peer "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer" + path "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path" ) type mockResolver struct { @@ -38,9 +37,12 @@ func testResolution(t *testing.T, resolver Resolver, name string, depth uint, ex } } -func (r *mockResolver) resolveOnce(ctx context.Context, name string, opts *opts.ResolveOpts) (path.Path, time.Duration, error) { +func (r *mockResolver) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult { p, err := path.ParsePath(r.entries[name]) - return p, 0, err + out := make(chan onceResult, 1) + out <- onceResult{value: p, err: err} + close(out) + return out } func mockResolverOne() *mockResolver { diff --git a/namesys/opts/opts.go b/namesys/opts/opts.go index 6690cf77942..ee2bd5ac2a4 100644 --- a/namesys/opts/opts.go +++ b/namesys/opts/opts.go @@ -31,8 +31,8 @@ type ResolveOpts struct { // DefaultResolveOpts returns the default options for resolving // an IPNS path -func DefaultResolveOpts() *ResolveOpts { - return &ResolveOpts{ +func DefaultResolveOpts() ResolveOpts { + return ResolveOpts{ Depth: DefaultDepthLimit, DhtRecordCount: 16, DhtTimeout: time.Minute, @@ -65,10 +65,10 @@ func DhtTimeout(timeout time.Duration) ResolveOpt { } // ProcessOpts converts an array of ResolveOpt into a ResolveOpts object -func ProcessOpts(opts []ResolveOpt) *ResolveOpts { +func ProcessOpts(opts []ResolveOpt) ResolveOpts { rsopts := DefaultResolveOpts() for _, option := range opts { - option(rsopts) + option(&rsopts) } return rsopts } diff --git a/namesys/proquint.go b/namesys/proquint.go index c382d8a26b7..89457931370 100644 --- a/namesys/proquint.go +++ b/namesys/proquint.go @@ -1,29 +1,33 @@ package namesys import ( + "context" "errors" - "time" - context "context" - - opts "github.com/ipfs/go-ipfs/namesys/opts" proquint "gx/ipfs/QmYnf27kzqR2cxt6LFZdrAFJuQd6785fTkBvMuEj9EeRxM/proquint" path "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path" + + opts "github.com/ipfs/go-ipfs/namesys/opts" ) type ProquintResolver struct{} // Resolve implements Resolver. func (r *ProquintResolver) Resolve(ctx context.Context, name string, options ...opts.ResolveOpt) (path.Path, error) { - return resolve(ctx, r, name, opts.ProcessOpts(options), "/ipns/") + return resolve(ctx, r, name, opts.ProcessOpts(options)) } // resolveOnce implements resolver. Decodes the proquint string. -func (r *ProquintResolver) resolveOnce(ctx context.Context, name string, options *opts.ResolveOpts) (path.Path, time.Duration, error) { +func (r *ProquintResolver) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult { + out := make(chan onceResult, 1) + defer close(out) + ok, err := proquint.IsProquint(name) if err != nil || !ok { - return "", 0, errors.New("not a valid proquint string") + out <- onceResult{err: errors.New("not a valid proquint string")} + return out } // Return a 0 TTL as caching this result is pointless. - return path.FromString(string(proquint.Decode(name))), 0, nil + out <- onceResult{value: path.FromString(string(proquint.Decode(name)))} + return out } diff --git a/namesys/routing.go b/namesys/routing.go index 13c554b720e..76aa8603471 100644 --- a/namesys/routing.go +++ b/namesys/routing.go @@ -5,9 +5,10 @@ import ( "strings" "time" - opts "github.com/ipfs/go-ipfs/namesys/opts" path "gx/ipfs/QmdrpbDgeYH3VxkCciQCJY5LkDYdXtig6unDzQmMxFtWEw/go-path" + opts "github.com/ipfs/go-ipfs/namesys/opts" + cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid" routing "gx/ipfs/QmPmFeQ5oY5G6M7aBWggi5phxEPXwsQntE1DFcUzETULdp/go-libp2p-routing" mh "gx/ipfs/QmPnFwZ2JXKnXgMw8CdBPxn7FWh6LLdjUjxV1fKHuJnkr8/go-multihash" @@ -39,27 +40,34 @@ func NewIpnsResolver(route routing.ValueStore) *IpnsResolver { // Resolve implements Resolver. func (r *IpnsResolver) Resolve(ctx context.Context, name string, options ...opts.ResolveOpt) (path.Path, error) { - return resolve(ctx, r, name, opts.ProcessOpts(options), "/ipns/") + return resolve(ctx, r, name, opts.ProcessOpts(options)) +} + +// ResolveAsync implements Resolver. +func (r *IpnsResolver) ResolveAsync(ctx context.Context, name string, options ...opts.ResolveOpt) <-chan Result { + return resolveAsync(ctx, r, name, opts.ProcessOpts(options)) } // resolveOnce implements resolver. Uses the IPFS routing system to // resolve SFS-like names. -func (r *IpnsResolver) resolveOnce(ctx context.Context, name string, options *opts.ResolveOpts) (path.Path, time.Duration, error) { +func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult { + out := make(chan onceResult, 1) log.Debugf("RoutingResolver resolving %s", name) + cancel := func() {} if options.DhtTimeout != 0 { // Resolution must complete within the timeout - var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, options.DhtTimeout) - defer cancel() } name = strings.TrimPrefix(name, "/ipns/") pid, err := peer.IDB58Decode(name) if err != nil { - // name should be a multihash. if it isn't, error out here. - log.Debugf("RoutingResolver: IPNS address not a valid peer ID: [%s]\n", name) - return "", 0, err + log.Debugf("RoutingResolver: could not convert public key hash %s to peer ID: %s\n", name, err) + out <- onceResult{err: err} + close(out) + cancel() + return out } // Name should be the hash of a public key retrievable from ipfs. @@ -70,59 +78,86 @@ func (r *IpnsResolver) resolveOnce(ctx context.Context, name string, options *op _, err = routing.GetPublicKey(r.routing, ctx, pid) if err != nil { log.Debugf("RoutingResolver: could not retrieve public key %s: %s\n", name, err) - return "", 0, err + out <- onceResult{err: err} + close(out) + cancel() + return out } // Use the routing system to get the name. // Note that the DHT will call the ipns validator when retrieving // the value, which in turn verifies the ipns record signature ipnsKey := ipns.RecordKey(pid) - val, err := r.routing.GetValue(ctx, ipnsKey, dht.Quorum(int(options.DhtRecordCount))) - if err != nil { - log.Debugf("RoutingResolver: dht get for name %s failed: %s", name, err) - return "", 0, err - } - entry := new(pb.IpnsEntry) - err = proto.Unmarshal(val, entry) + vals, err := r.routing.SearchValue(ctx, ipnsKey, dht.Quorum(int(options.DhtRecordCount))) if err != nil { - log.Debugf("RoutingResolver: could not unmarshal value for name %s: %s", name, err) - return "", 0, err - } - - var p path.Path - // check for old style record: - if valh, err := mh.Cast(entry.GetValue()); err == nil { - // Its an old style multihash record - log.Debugf("encountered CIDv0 ipns entry: %s", valh) - p = path.FromCid(cid.NewCidV0(valh)) - } else { - // Not a multihash, probably a new record - p, err = path.ParsePath(string(entry.GetValue())) - if err != nil { - return "", 0, err - } + log.Debugf("RoutingResolver: dht get for name %s failed: %s", name, err) + out <- onceResult{err: err} + close(out) + cancel() + return out } - ttl := DefaultResolverCacheTTL - if entry.Ttl != nil { - ttl = time.Duration(*entry.Ttl) - } - switch eol, err := ipns.GetEOL(entry); err { - case ipns.ErrUnrecognizedValidity: - // No EOL. - case nil: - ttEol := eol.Sub(time.Now()) - if ttEol < 0 { - // It *was* valid when we first resolved it. - ttl = 0 - } else if ttEol < ttl { - ttl = ttEol + go func() { + defer cancel() + defer close(out) + for { + select { + case val, ok := <-vals: + if !ok { + return + } + + entry := new(pb.IpnsEntry) + err = proto.Unmarshal(val, entry) + if err != nil { + log.Debugf("RoutingResolver: could not unmarshal value for name %s: %s", name, err) + emitOnceResult(ctx, out, onceResult{err: err}) + return + } + + var p path.Path + // check for old style record: + if valh, err := mh.Cast(entry.GetValue()); err == nil { + // Its an old style multihash record + log.Debugf("encountered CIDv0 ipns entry: %s", valh) + p = path.FromCid(cid.NewCidV0(valh)) + } else { + // Not a multihash, probably a new style record + p, err = path.ParsePath(string(entry.GetValue())) + if err != nil { + emitOnceResult(ctx, out, onceResult{err: err}) + return + } + } + + ttl := DefaultResolverCacheTTL + if entry.Ttl != nil { + ttl = time.Duration(*entry.Ttl) + } + switch eol, err := ipns.GetEOL(entry); err { + case ipns.ErrUnrecognizedValidity: + // No EOL. + case nil: + ttEol := eol.Sub(time.Now()) + if ttEol < 0 { + // It *was* valid when we first resolved it. + ttl = 0 + } else if ttEol < ttl { + ttl = ttEol + } + default: + log.Errorf("encountered error when parsing EOL: %s", err) + emitOnceResult(ctx, out, onceResult{err: err}) + return + } + + emitOnceResult(ctx, out, onceResult{value: p, ttl: ttl}) + case <-ctx.Done(): + return + } } - default: - log.Errorf("encountered error when parsing EOL: %s", err) - return "", 0, err - } + }() - return p, ttl, nil + return out } diff --git a/test/sharness/t0100-name.sh b/test/sharness/t0100-name.sh index 019a5f94776..7a23fd1968d 100755 --- a/test/sharness/t0100-name.sh +++ b/test/sharness/t0100-name.sh @@ -114,6 +114,27 @@ test_expect_success "publish an explicit node ID as key name looks good" ' test_cmp expected_node_id_publish actual_node_id_publish ' +# test IPNS + IPLD +test_expect_success "'ipfs dag put' succeeds" ' + HELLO_HASH="$(echo "\"hello world\"" | ipfs dag put)" && + OBJECT_HASH="$(echo "{\"thing\": {\"/\": \"${HELLO_HASH}\" }}" | ipfs dag put)" +' +test_expect_success "'ipfs name publish --allow-offline /ipld/...' succeeds" ' + PEERID=`ipfs id --format=""` && + test_check_peerid "${PEERID}" && + ipfs name publish --allow-offline "/ipld/$OBJECT_HASH/thing" >publish_out +' +test_expect_success "publish a path looks good" ' + echo "Published to ${PEERID}: /ipld/$OBJECT_HASH/thing" >expected3 && + test_cmp expected3 publish_out +' +test_expect_success "'ipfs name resolve' succeeds" ' + ipfs name resolve "$PEERID" >output +' +test_expect_success "resolve output looks good" ' + printf "/ipld/%s/thing\n" "$OBJECT_HASH" >expected4 && + test_cmp expected4 output +' # test publishing nothing diff --git a/test/sharness/t0183-namesys-pubsub.sh b/test/sharness/t0183-namesys-pubsub.sh index eb3c27921f2..467c9c1e9a0 100755 --- a/test/sharness/t0183-namesys-pubsub.sh +++ b/test/sharness/t0183-namesys-pubsub.sh @@ -28,8 +28,8 @@ test_expect_success 'check namesys pubsub state' ' # These commands are *expected* to fail. We haven't published anything yet. test_expect_success 'subscribe nodes to the publisher topic' ' - ipfsi 1 name resolve /ipns/$PEERID_0; - ipfsi 2 name resolve /ipns/$PEERID_0; + ipfsi 1 name resolve /ipns/$PEERID_0 --timeout=1s; + ipfsi 2 name resolve /ipns/$PEERID_0 --timeout=1s; true '