Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
chunk, storage: add HasMulti to chunk.Store (#1686)
Browse files Browse the repository at this point in the history
  • Loading branch information
janos authored Aug 26, 2019
1 parent ca56afd commit f527723
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 1 deletion.
1 change: 1 addition & 0 deletions chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ type Store interface {
GetMulti(ctx context.Context, mode ModeGet, addrs ...Address) (ch []Chunk, err error)
Put(ctx context.Context, mode ModePut, chs ...Chunk) (exist []bool, err error)
Has(ctx context.Context, addr Address) (yes bool, err error)
HasMulti(ctx context.Context, addrs ...Address) (yes []bool, err error)
Set(ctx context.Context, mode ModeSet, addr Address) (err error)
LastPullSubscriptionBinID(bin uint8) (id uint64, err error)
SubscribePull(ctx context.Context, bin uint8, since, until uint64) (c <-chan Descriptor, stop func())
Expand Down
22 changes: 22 additions & 0 deletions shed/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,28 @@ func (f Index) Has(keyFields Item) (bool, error) {
return f.db.Has(key)
}

// HasMulti accepts multiple multiple key fields represented as Item to check if
// there this Item's encoded key is stored in the index for each of them.
func (f Index) HasMulti(items ...Item) ([]bool, error) {
have := make([]bool, len(items))
snapshot, err := f.db.ldb.GetSnapshot()
if err != nil {
return nil, err
}
defer snapshot.Release()
for i, keyFields := range items {
key, err := f.encodeKeyFunc(keyFields)
if err != nil {
return nil, err
}
have[i], err = snapshot.Has(key, nil)
if err != nil {
return nil, err
}
}
return have, nil
}

// Put accepts Item to encode information from it
// and save it to the database.
func (f Index) Put(i Item) (err error) {
Expand Down
82 changes: 82 additions & 0 deletions shed/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1022,3 +1022,85 @@ func TestIncByteSlice(t *testing.T) {
}
}
}

// TestIndex_HasMulti validates that HasMulti returns a correct
// slice of booleans for provided Items.
func TestIndex_HasMulti(t *testing.T) {
db, cleanupFunc := newTestDB(t)
defer cleanupFunc()

index, err := db.NewIndex("retrieval", retrievalIndexFuncs)
if err != nil {
t.Fatal(err)
}

items := []Item{
{
Address: []byte("hash-01"),
Data: []byte("data94"),
},
{
Address: []byte("hash-03"),
Data: []byte("data33"),
},
{
Address: []byte("hash-05"),
Data: []byte("data55"),
},
{
Address: []byte("hash-02"),
Data: []byte("data21"),
},
{
Address: []byte("hash-06"),
Data: []byte("data8"),
},
}
missingItem := Item{
Address: []byte("hash-10"),
Data: []byte("data0"),
}

batch := new(leveldb.Batch)
for _, i := range items {
index.PutInBatch(batch, i)
}
err = db.WriteBatch(batch)
if err != nil {
t.Fatal(err)
}

got, err := index.HasMulti(items[0])
if err != nil {
t.Fatal(err)
}
if !got[0] {
t.Error("first item not found")
}

got, err = index.HasMulti(missingItem)
if err != nil {
t.Fatal(err)
}
if got[0] {
t.Error("missing item found")
}

got, err = index.HasMulti(items...)
if err != nil {
t.Fatal(err)
}
want := []bool{true, true, true, true, true}
if fmt.Sprint(got) != fmt.Sprint(want) {
t.Errorf("got %v, want %v", got, want)
}

got, err = index.HasMulti(append(items, missingItem)...)
if err != nil {
t.Fatal(err)
}
want = []bool{true, true, true, true, true, false}
if fmt.Sprint(got) != fmt.Sprint(want) {
t.Errorf("got %v, want %v", got, want)
}
}
11 changes: 11 additions & 0 deletions storage/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,17 @@ func (m *MapChunkStore) Has(ctx context.Context, ref Address) (has bool, err err
return has, nil
}

func (m *MapChunkStore) HasMulti(ctx context.Context, refs ...Address) (have []bool, err error) {
m.mu.RLock()
defer m.mu.RUnlock()

have = make([]bool, len(refs))
for i, ref := range refs {
_, have[i] = m.chunks[ref.Hex()]
}
return have, nil
}

func (m *MapChunkStore) Set(ctx context.Context, mode chunk.ModeSet, addr chunk.Address) (err error) {
return nil
}
Expand Down
12 changes: 12 additions & 0 deletions storage/localstore/localstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,18 @@ func addressToItem(addr chunk.Address) shed.Item {
}
}

// addressesToItems constructs a slice of Items with only
// addresses set on them.
func addressesToItems(addrs ...chunk.Address) []shed.Item {
items := make([]shed.Item, len(addrs))
for i, addr := range addrs {
items[i] = shed.Item{
Address: addr,
}
}
return items
}

// now is a helper function that returns a current unix timestamp
// in UTC timezone.
// It is set in the init function for usage in production, and
Expand Down
15 changes: 15 additions & 0 deletions storage/localstore/mode_has.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,18 @@ func (db *DB) Has(ctx context.Context, addr chunk.Address) (bool, error) {
}
return has, err
}

// HasMulti returns a slice of booleans which represent if the provided chunks
// are stored in database.
func (db *DB) HasMulti(ctx context.Context, addrs ...chunk.Address) ([]bool, error) {
metricName := "localstore.HasMulti"

metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
defer totalTimeMetric(metricName, time.Now())

have, err := db.retrievalDataIndex.HasMulti(addressesToItems(addrs...)...)
if err != nil {
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
}
return have, err
}
40 changes: 39 additions & 1 deletion storage/localstore/mode_has_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ package localstore

import (
"context"
"fmt"
"math/rand"
"testing"
"time"

"github.com/ethersphere/swarm/chunk"
)

// TestHas validates that Hasser is returning true for
// TestHas validates that Has method is returning true for
// the stored chunk and false for one that is not stored.
func TestHas(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
Expand Down Expand Up @@ -54,3 +57,38 @@ func TestHas(t *testing.T) {
t.Error("unexpected chunk is found")
}
}

// TestHasMulti validates that HasMulti method is returning correct boolean
// slice for stored chunks.
func TestHasMulti(t *testing.T) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for _, tc := range multiChunkTestCases {
t.Run(tc.name, func(t *testing.T) {
db, cleanupFunc := newTestDB(t, nil)
defer cleanupFunc()

chunks := generateTestRandomChunks(tc.count)
want := make([]bool, tc.count)

for i, ch := range chunks {
if r.Intn(2) == 0 {
// randomly exclude half of the chunks
continue
}
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
want[i] = true
}

got, err := db.HasMulti(context.Background(), chunkAddresses(chunks)...)
if err != nil {
t.Fatal(err)
}
if fmt.Sprint(got) != fmt.Sprint(want) {
t.Errorf("got %v, want %v", got, want)
}
})
}
}
5 changes: 5 additions & 0 deletions storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ func (f *FakeChunkStore) Has(_ context.Context, ref Address) (bool, error) {
panic("FakeChunkStore doesn't support Has")
}

// HasMulti doesn't do anything it is just here to implement ChunkStore
func (f *FakeChunkStore) HasMulti(_ context.Context, refs ...Address) ([]bool, error) {
panic("FakeChunkStore doesn't support HasMulti")
}

// Get doesn't store anything it is just here to implement ChunkStore
func (f *FakeChunkStore) Get(_ context.Context, _ chunk.ModeGet, ref Address) (Chunk, error) {
panic("FakeChunkStore doesn't support Get")
Expand Down

0 comments on commit f527723

Please sign in to comment.