Skip to content

Commit

Permalink
namesys: switch to async code
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
  • Loading branch information
magik6k committed Aug 28, 2018
1 parent b2c302c commit d25104d
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 262 deletions.
66 changes: 24 additions & 42 deletions namesys/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,33 @@ type onceResult struct {
}

type resolver interface {
// resolveOnce looks up a name once (without recursion).
resolveOnce(ctx context.Context, name string, options opts.ResolveOpts) (value path.Path, ttl time.Duration, err error)

resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult
}

// resolve is a helper for implementing Resolver.ResolveN using resolveOnce.
func resolve(ctx context.Context, r resolver, name string, options opts.ResolveOpts, prefix string) (path.Path, error) {
depth := options.Depth
for {
p, _, err := r.resolveOnce(ctx, name, options)
if err != nil {
return "", err
}
log.Debugf("resolved %s to %s", name, p.String())
ctx, cancel := context.WithCancel(ctx)
defer cancel()

if strings.HasPrefix(p.String(), "/ipfs/") {
// we've bottomed out with an IPFS path
return p, nil
}
err := ErrResolveFailed
var p path.Path

if depth == 1 {
return p, ErrResolveRecursion
}
resCh := resolveAsync(ctx, r, name, options, prefix)

if !strings.HasPrefix(p.String(), prefix) {
return p, nil
}
name = strings.TrimPrefix(p.String(), prefix)

if depth > 1 {
depth--
for res := range resCh {
p, err = res.Path, res.Err
if err != nil {
break
}
}

return p, err
}

//TODO:
// - better error handling
func resolveAsyncDo(ctx context.Context, r resolver, name string, options opts.ResolveOpts, prefix string) <-chan Result {
// - select on writes
func resolveAsync(ctx context.Context, r resolver, name string, options opts.ResolveOpts, prefix string) <-chan Result {
resCh := r.resolveOnceAsync(ctx, name, options)
depth := options.Depth
outCh := make(chan Result)
Expand All @@ -70,7 +58,7 @@ func resolveAsyncDo(ctx context.Context, r resolver, name string, options opts.R
case res, ok := <-resCh:
if !ok {
resCh = nil
continue
break
}

if res.err != nil {
Expand All @@ -79,14 +67,13 @@ func resolveAsyncDo(ctx context.Context, r resolver, name string, options opts.R
}
log.Debugf("resolved %s to %s", name, res.value.String())
if strings.HasPrefix(res.value.String(), "/ipfs/") {
outCh <- Result{Err: res.err}
continue
outCh <- Result{Path: res.value}
break
}
p := strings.TrimPrefix(res.value.String(), prefix)

if depth == 1 {
outCh <- Result{Err: ErrResolveRecursion}
continue
outCh <- Result{Path: res.value, Err: ErrResolveRecursion}
break
}

subopts := options
Expand All @@ -102,26 +89,21 @@ func resolveAsyncDo(ctx context.Context, r resolver, name string, options opts.R
subCtx, cancelSub = context.WithCancel(ctx)
defer cancelSub()

subCh = resolveAsyncDo(subCtx, r, p, subopts, prefix)
p := strings.TrimPrefix(res.value.String(), prefix)
subCh = resolveAsync(subCtx, r, p, subopts, prefix)
case res, ok := <-subCh:
if !ok {
subCh = nil
continue
}

if res.Err != nil {
outCh <- Result{Err: res.Err}
return
break
}

outCh <- res
case <-ctx.Done():
}
if resCh == nil && subCh == nil {
return
}
}
}()
return outCh
}

func resolveAsync(ctx context.Context, r resolver, name string, options opts.ResolveOpts, prefix string) <-chan Result {
return resolveAsyncDo(ctx, r, name, options, prefix)
}
73 changes: 19 additions & 54 deletions namesys/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@ package namesys
import (
"context"
"errors"
"net"
"strings"
"time"

opts "github.com/ipfs/go-ipfs/namesys/opts"
isd "gx/ipfs/QmZmmuAXgX73UQmX1jRKjTGmjzq24Jinqkq8vzkBtno4uX/go-is-domain"
path "gx/ipfs/QmdMPBephdLYNESkruDX2hcDTgFYhoCt4LimWhgnomSdV2/go-path"
"gx/ipfs/QmdMPBephdLYNESkruDX2hcDTgFYhoCt4LimWhgnomSdV2/go-path"
"net"
"strings"
)

type LookupTXTFunc func(name string) (txt []string, err error)
Expand Down Expand Up @@ -43,51 +41,6 @@ type lookupRes struct {
// resolveOnce implements resolver.
// TXT records for a given domain name should contain a b58
// encoded multihash.
func (r *DNSResolver) resolveOnce(ctx context.Context, name string, options opts.ResolveOpts) (path.Path, time.Duration, error) {
segments := strings.SplitN(name, "/", 2)
domain := segments[0]

if !isd.IsDomain(domain) {
return "", 0, errors.New("not a valid domain name")
}
log.Debugf("DNSResolver resolving %s", domain)

rootChan := make(chan lookupRes, 1)
go workDomain(r, domain, rootChan)

subChan := make(chan lookupRes, 1)
go workDomain(r, "_dnslink."+domain, subChan)

var subRes lookupRes
select {
case subRes = <-subChan:
case <-ctx.Done():
return "", 0, ctx.Err()
}

var p path.Path
if subRes.error == nil {
p = subRes.path
} else {
var rootRes lookupRes
select {
case rootRes = <-rootChan:
case <-ctx.Done():
return "", 0, ctx.Err()
}
if rootRes.error == nil {
p = rootRes.path
} else {
return "", 0, ErrResolveFailed
}
}
var err error
if len(segments) > 1 {
p, err = path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[1])
}
return p, 0, err
}

func (r *DNSResolver) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult {
out := make(chan onceResult, 1)
segments := strings.SplitN(name, "/", 2)
Expand All @@ -106,28 +59,39 @@ func (r *DNSResolver) resolveOnceAsync(ctx context.Context, name string, options
subChan := make(chan lookupRes, 1)
go workDomain(r, "_dnslink."+domain, subChan)

appendPath := func(p path.Path) (path.Path, error) {
if len(segments) > 1 {
return path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[1])
}
return p, nil
}

go func() {
defer close(out)
for {
select {
case subRes, ok := <-subChan:
if !ok {
subChan = nil
break
}
if subRes.error == nil {
p, err := appendPath(subRes.path)
select {
case out <- onceResult{value: subRes.path}:
case out <- onceResult{value: p, err: err}:
case <-ctx.Done():
}
return
}
case rootRes, ok := <-rootChan:
if !ok {
subChan = nil
rootChan = nil
break
}
if rootRes.error == nil {
p, err := appendPath(rootRes.path)
select {
case out <- onceResult{value: rootRes.path}:
case out <- onceResult{value: p, err: err}:
case <-ctx.Done():
}
}
Expand All @@ -144,8 +108,9 @@ func (r *DNSResolver) resolveOnceAsync(ctx context.Context, name string, options
}

func workDomain(r *DNSResolver, name string, res chan lookupRes) {
txt, err := r.lookupTXT(name)
defer close(res)

txt, err := r.lookupTXT(name)
if err != nil {
// Error is != nil
res <- lookupRes{"", err}
Expand Down
11 changes: 5 additions & 6 deletions namesys/ipns_resolver_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,13 @@ func TestResolverValidation(t *testing.T) {
}

// Resolve entry
resp, _, err := resolver.resolveOnce(ctx, id.Pretty(), opts.DefaultResolveOpts())
resp, err := resolve(ctx, resolver, id.Pretty(), opts.DefaultResolveOpts(), "/ipns/")
if err != nil {
t.Fatal(err)
}
if resp != path.Path(p) {
t.Fatalf("Mismatch between published path %s and resolved path %s", p, resp)
}

// Create expired entry
expiredEntry, err := ipns.Create(priv, p, 1, ts.Add(-1*time.Hour))
if err != nil {
Expand All @@ -77,7 +76,7 @@ func TestResolverValidation(t *testing.T) {
}

// Record should fail validation because entry is expired
_, _, err = resolver.resolveOnce(ctx, id.Pretty(), opts.DefaultResolveOpts())
_, err = resolve(ctx, resolver, id.Pretty(), opts.DefaultResolveOpts(), "/ipns/")
if err == nil {
t.Fatal("ValidateIpnsRecord should have returned error")
}
Expand All @@ -99,7 +98,7 @@ func TestResolverValidation(t *testing.T) {

// Record should fail validation because public key defined by
// ipns path doesn't match record signature
_, _, err = resolver.resolveOnce(ctx, id2.Pretty(), opts.DefaultResolveOpts())
_, err = resolve(ctx, resolver, id2.Pretty(), opts.DefaultResolveOpts(), "/ipns/")
if err == nil {
t.Fatal("ValidateIpnsRecord should have failed signature verification")
}
Expand All @@ -117,7 +116,7 @@ func TestResolverValidation(t *testing.T) {

// Record should fail validation because public key is not available
// in peer store or on network
_, _, err = resolver.resolveOnce(ctx, id3.Pretty(), opts.DefaultResolveOpts())
_, err = resolve(ctx, resolver, id3.Pretty(), opts.DefaultResolveOpts(), "/ipns/")
if err == nil {
t.Fatal("ValidateIpnsRecord should have failed because public key was not found")
}
Expand All @@ -132,7 +131,7 @@ func TestResolverValidation(t *testing.T) {
// public key is available in the peer store by looking it up in
// the DHT, which causes the DHT to fetch it and cache it in the
// peer store
_, _, err = resolver.resolveOnce(ctx, id3.Pretty(), opts.DefaultResolveOpts())
_, err = resolve(ctx, resolver, id3.Pretty(), opts.DefaultResolveOpts(), "/ipns/")
if err != nil {
t.Fatal(err)
}
Expand Down
42 changes: 0 additions & 42 deletions namesys/namesys.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,48 +82,6 @@ func (ns *mpns) ResolveAsync(ctx context.Context, name string, options ...opts.R
}

// resolveOnce implements resolver.
func (ns *mpns) resolveOnce(ctx context.Context, name string, options opts.ResolveOpts) (path.Path, time.Duration, error) {
if !strings.HasPrefix(name, "/ipns/") {
name = "/ipns/" + name
}
segments := strings.SplitN(name, "/", 4)
if len(segments) < 3 || segments[0] != "" {
log.Debugf("invalid name syntax for %s", name)
return "", 0, ErrResolveFailed
}

key := segments[2]

p, ok := ns.cacheGet(key)
var err error
if !ok {
// Resolver selection:
// 1. if it is a multihash resolve through "ipns".
// 2. if it is a domain name, resolve through "dns"
// 3. otherwise resolve through the "proquint" resolver
var res resolver
if _, err := mh.FromB58String(key); err == nil {
res = ns.ipnsResolver
} else if isd.IsDomain(key) {
res = ns.dnsResolver
} else {
res = ns.proquintResolver
}

var ttl time.Duration
p, ttl, err = res.resolveOnce(ctx, key, options)
if err != nil {
return "", 0, ErrResolveFailed
}
ns.cacheSet(key, p, ttl)
}

if len(segments) > 3 {
p, err = path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[3])
}
return p, 0, err
}

func (ns *mpns) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult {
out := make(chan onceResult, 1)

Expand Down
21 changes: 9 additions & 12 deletions namesys/namesys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,14 @@ package namesys
import (
"context"
"fmt"
"testing"
"time"

opts "github.com/ipfs/go-ipfs/namesys/opts"
"gx/ipfs/QmQjEpRiwVvtowhq69dAtB4jhioPVFXiCcWZm9Sfgn7eqc/go-unixfs"
path "gx/ipfs/QmdMPBephdLYNESkruDX2hcDTgFYhoCt4LimWhgnomSdV2/go-path"
"gx/ipfs/QmdMPBephdLYNESkruDX2hcDTgFYhoCt4LimWhgnomSdV2/go-path"
"testing"

ipns "gx/ipfs/QmNqBhXpBKa5jcjoUZHfxDgAFxtqK3rDA5jtW811GBvVob/go-ipns"
"gx/ipfs/QmNqBhXpBKa5jcjoUZHfxDgAFxtqK3rDA5jtW811GBvVob/go-ipns"
ci "gx/ipfs/QmPvyPwuCgJ7pDmrKDxRtsScJgBaM5h4EpRL2qQJsmXf4n/go-libp2p-crypto"
peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
"gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
ds "gx/ipfs/QmVG5gxteQNEMhrS8prJSmU2C9rebtFuTd3SYZ5kE3YZ5k/go-datastore"
dssync "gx/ipfs/QmVG5gxteQNEMhrS8prJSmU2C9rebtFuTd3SYZ5kE3YZ5k/go-datastore/sync"
offroute "gx/ipfs/Qmd45r5jHr1PKMNQqifnbZy1ZQwHdtXUDJFamUEvUJE544/go-ipfs-routing/offline"
Expand All @@ -38,13 +36,12 @@ func testResolution(t *testing.T, resolver Resolver, name string, depth uint, ex
}
}

func (r *mockResolver) resolveOnce(ctx context.Context, name string, opts opts.ResolveOpts) (path.Path, time.Duration, error) {
p, err := path.ParsePath(r.entries[name])
return p, 0, err
}

func (r *mockResolver) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult {
panic("stub")
p, err := path.ParsePath(r.entries[name])
out := make(chan onceResult, 1)
out <- onceResult{value: p, err: err}
close(out)
return out
}

func mockResolverOne() *mockResolver {
Expand Down
Loading

0 comments on commit d25104d

Please sign in to comment.