Skip to content

Commit

Permalink
Implement SearchValue/GetValue (#942)
Browse files Browse the repository at this point in the history
  • Loading branch information
dennis-tra authored Sep 28, 2023
1 parent 6a4249c commit 03adce6
Show file tree
Hide file tree
Showing 24 changed files with 1,543 additions and 178 deletions.
12 changes: 12 additions & 0 deletions v2/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,18 @@ type Backend interface {
// wasn't found or another error if any occurred. key won't contain the
// namespace prefix.
Fetch(ctx context.Context, key string) (any, error)

// Validate validates the given values and returns the index of the "best"
// value or an error and -1 if all values are invalid. If the method is used
// with a single value, it will return 0 and no error if it is valid or an
// error and -1 if it is invalid. For multiple values, it will select the
// "best" value based on user-defined logic and return its index in the
// original values list. If we receive a request for /ipns/$binary_id, the
// key parameter will be set to $binary_id. Decisions about which value is
// the "best" from the given list must be stable. So if there are multiple
// equally good values, the implementation must always return the same
// index - for example, always the first good or last good value.
Validate(ctx context.Context, key string, values ...any) (int, error)
}

// NewBackendIPNS initializes a new backend for the "ipns" namespace that can
Expand Down
54 changes: 51 additions & 3 deletions v2/backend_provider.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package dht

import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
"path"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -226,13 +226,50 @@ func (p *ProvidersBackend) Fetch(ctx context.Context, key string) (any, error) {
out.addProvider(addrInfo, rec.expiry)
}

if len(out.providers) > 0 {
if len(out.providers) == 0 {
return nil, ds.ErrNotFound
} else {
p.cache.Add(qKey.String(), *out)
}

return out, nil
}

// Validate verifies that the given values are of type [peer.AddrInfo]. Then it
// decides based on the number of attached multi addresses which value is
// "better" than the other. If there is a tie, Validate will return the index
// of the earliest occurrence.
func (p *ProvidersBackend) Validate(ctx context.Context, key string, values ...any) (int, error) {
// short circuit if it's just a single value
if len(values) == 1 {
_, ok := values[0].(peer.AddrInfo)
if !ok {
return -1, fmt.Errorf("invalid type %T", values[0])
}
return 0, nil
}

bestIdx := -1
for i, value := range values {
addrInfo, ok := value.(peer.AddrInfo)
if !ok {
continue
}

if bestIdx == -1 {
bestIdx = i
} else if len(values[bestIdx].(peer.AddrInfo).Addrs) < len(addrInfo.Addrs) {
bestIdx = i
}
}

if bestIdx == -1 {
return -1, fmt.Errorf("no value of correct type")
}

return bestIdx, nil
}

// Close is here to implement the [io.Closer] interface. This will get called
// when the [DHT] "shuts down"/closes.
func (p *ProvidersBackend) Close() error {
Expand Down Expand Up @@ -431,5 +468,16 @@ func newDatastoreKey(namespace string, binStrs ...string) ds.Key {
for i, bin := range binStrs {
elems[i+1] = base32.RawStdEncoding.EncodeToString([]byte(bin))
}
return ds.NewKey("/" + path.Join(elems...))

return ds.NewKey("/" + strings.Join(elems, "/"))
}

// newRoutingKey uses the given namespace and binary string key and constructs
// a new string of the format: /$namespace/$binStr
func newRoutingKey(namespace string, binStr string) string {
buf := make([]byte, 0, 2+len(namespace)+len(binStr))
buffer := bytes.NewBuffer(buf)
buffer.WriteString("/" + namespace + "/")
buffer.Write([]byte(binStr))
return buffer.String()
}
63 changes: 63 additions & 0 deletions v2/backend_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,13 @@ import (
"github.com/benbjohnson/clock"
ds "github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slog"

"github.com/libp2p/go-libp2p-kad-dht/v2/internal/kadtest"
)

var devnull = slog.New(slog.NewTextHandler(io.Discard, nil))
Expand Down Expand Up @@ -115,3 +119,62 @@ func TestProvidersBackend_GarbageCollection_lifecycle_thread_safe(t *testing.T)
assert.Nil(t, b.gcCancel)
assert.Nil(t, b.gcDone)
}

func TestProvidersBackend_Validate(t *testing.T) {
ctx := kadtest.CtxShort(t)

b := newBackendProvider(t, nil)

pid := newPeerID(t)
peer1 := peer.AddrInfo{ID: pid, Addrs: make([]multiaddr.Multiaddr, 0)}
peer2 := peer.AddrInfo{ID: pid, Addrs: make([]multiaddr.Multiaddr, 1)}
peer3 := peer.AddrInfo{ID: pid, Addrs: make([]multiaddr.Multiaddr, 2)}

t.Run("no values", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key")
assert.Error(t, err)
assert.Equal(t, -1, idx)
})

t.Run("nil value", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key", nil)
assert.Error(t, err)
assert.Equal(t, -1, idx)
})

t.Run("nil values", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key", nil, nil)
assert.Error(t, err)
assert.Equal(t, -1, idx)
})

t.Run("single valid value", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key", peer1)
assert.NoError(t, err)
assert.Equal(t, 0, idx)
})

t.Run("increasing better values", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key", peer1, peer2, peer3)
assert.NoError(t, err)
assert.Equal(t, 2, idx)
})

t.Run("mixed better values", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key", peer1, peer3, peer2)
assert.NoError(t, err)
assert.Equal(t, 1, idx)
})

t.Run("mixed invalid values", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key", peer1, nil, peer2, nil)
assert.NoError(t, err)
assert.Equal(t, 2, idx)
})

t.Run("identically good values", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key", peer1, peer1)
assert.NoError(t, err)
assert.Equal(t, 0, idx)
})
}
51 changes: 51 additions & 0 deletions v2/backend_record.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,57 @@ func (r *RecordBackend) Fetch(ctx context.Context, key string) (any, error) {
return rec, nil
}

func (r *RecordBackend) Validate(ctx context.Context, key string, values ...any) (int, error) {
k := newRoutingKey(r.namespace, key)

// short circuit if it's just a single value
if len(values) == 1 {
data, ok := values[0].([]byte)
if !ok {
return -1, fmt.Errorf("value not byte slice")
}

if err := r.validator.Validate(k, data); err != nil {
return -1, err
}

return 0, nil
}

// In case there are invalid values in the slice, we still want to return
// the index in the original list of values. The Select method below will
// return the index of the "best" value in the slice of valid values. This
// slice can have a different length and therefore that method will return
// an index that doesn't match the values slice that's passed into this
// method. origIdx stores the original index
origIdx := map[int]int{}
validValues := [][]byte{}
for i, value := range values {
data, ok := value.([]byte)
if !ok {
continue
}

if err := r.validator.Validate(k, data); err != nil {
continue
}

origIdx[len(validValues)] = i
validValues = append(validValues, data)
}

if len(validValues) == 0 {
return -1, fmt.Errorf("no valid values")
}

sel, err := r.validator.Select(k, validValues)
if err != nil {
return -1, err
}

return origIdx[sel], nil
}

// shouldReplaceExistingRecord returns true if the given record should replace any
// existing one in the local datastore. It queries the datastore, unmarshalls
// the record, validates it, and compares it to the incoming record. If the
Expand Down
114 changes: 114 additions & 0 deletions v2/backend_record_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package dht

import (
"fmt"
"strconv"
"strings"
"testing"

record "github.com/libp2p/go-libp2p-record"
"github.com/stretchr/testify/assert"

"github.com/libp2p/go-libp2p-kad-dht/v2/internal/kadtest"
)

// testValidator is a validator that considers all values valid that have a
// "valid-" prefix. Then the suffix will determine which value is better. For
// example, "valid-2" is better than "valid-1".
type testValidator struct{}

var _ record.Validator = (*testValidator)(nil)

func (t testValidator) Validate(key string, value []byte) error {
if strings.HasPrefix(string(value), "valid-") {
return nil
}
return fmt.Errorf("invalid value")
}

func (t testValidator) Select(key string, values [][]byte) (int, error) {
idx := -1
best := -1
for i, val := range values {
if !strings.HasPrefix(string(val), "valid-") {
continue
}
newBest, err := strconv.Atoi(string(val)[6:])
if err != nil {
continue
}
if newBest > best {
idx = i
best = newBest
}
}

if idx == -1 {
return idx, fmt.Errorf("no valid value")
}

return idx, nil
}

func TestRecordBackend_Validate(t *testing.T) {
ctx := kadtest.CtxShort(t)

b := &RecordBackend{
namespace: "test",
validator: &testValidator{},
}

t.Run("no values", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key")
assert.Error(t, err)
assert.Equal(t, -1, idx)
})

t.Run("nil value", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key", nil)
assert.Error(t, err)
assert.Equal(t, -1, idx)
})

t.Run("nil values", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key", nil, nil)
assert.Error(t, err)
assert.Equal(t, -1, idx)
})

t.Run("single valid value", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key", []byte("valid-0"))
assert.NoError(t, err)
assert.Equal(t, 0, idx)
})

t.Run("increasing better values", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key", []byte("valid-0"), []byte("valid-1"), []byte("valid-2"))
assert.NoError(t, err)
assert.Equal(t, 2, idx)
})

t.Run("mixed better values", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key", []byte("valid-0"), []byte("valid-2"), []byte("valid-1"))
assert.NoError(t, err)
assert.Equal(t, 1, idx)
})

t.Run("mixed invalid values", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key", []byte("valid-0"), []byte("invalid"), []byte("valid-2"), []byte("invalid"))
assert.NoError(t, err)
assert.Equal(t, 2, idx)
})

t.Run("only invalid values", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key", []byte("invalid"), nil)
assert.Error(t, err)
assert.Equal(t, -1, idx)
})

t.Run("identically good values", func(t *testing.T) {
idx, err := b.Validate(ctx, "some-key", []byte("valid-0"), []byte("valid-0"))
assert.NoError(t, err)
assert.Equal(t, 0, idx)
})
}
15 changes: 15 additions & 0 deletions v2/backend_trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,21 @@ func (t *tracedBackend) Fetch(ctx context.Context, key string) (any, error) {
return result, err
}

func (t *tracedBackend) Validate(ctx context.Context, key string, values ...any) (int, error) {
ctx, span := t.tracer.Start(ctx, "Validate", t.traceAttributes(key))
defer span.End()

idx, err := t.backend.Validate(ctx, key, values...)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
} else {
span.SetAttributes(attribute.Int("idx", idx))
}

return idx, err
}

// traceAttributes is a helper to build the trace attributes.
func (t *tracedBackend) traceAttributes(key string) trace.SpanStartEventOption {
return trace.WithAttributes(attribute.String("namespace", t.namespace), attribute.String("key", key))
Expand Down
Loading

0 comments on commit 03adce6

Please sign in to comment.