Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
chunk, shed, storage: chunk.Store GetMulti method (#1691)
Browse files Browse the repository at this point in the history
  • Loading branch information
janos authored Aug 26, 2019
1 parent 06a923e commit ca56afd
Show file tree
Hide file tree
Showing 11 changed files with 375 additions and 29 deletions.
1 change: 1 addition & 0 deletions chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func (d *Descriptor) String() string {

type Store interface {
Get(ctx context.Context, mode ModeGet, addr Address) (ch Chunk, err error)
GetMulti(ctx context.Context, mode ModeGet, addrs ...Address) (ch []Chunk, err error)
Put(ctx context.Context, mode ModePut, chs ...Chunk) (exist []bool, err error)
Has(ctx context.Context, addr Address) (yes bool, err error)
Set(ctx context.Context, mode ModeSet, addr Address) (err error)
Expand Down
4 changes: 4 additions & 0 deletions network/stream/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ func (rrs *roundRobinStore) Get(_ context.Context, _ chunk.ModeGet, _ storage.Ad
return nil, errors.New("roundRobinStore doesn't support Get")
}

func (rrs *roundRobinStore) GetMulti(_ context.Context, _ chunk.ModeGet, _ ...storage.Address) ([]storage.Chunk, error) {
return nil, errors.New("roundRobinStore doesn't support GetMulti")
}

func (rrs *roundRobinStore) Put(ctx context.Context, mode chunk.ModePut, chs ...storage.Chunk) ([]bool, error) {
i := atomic.AddUint32(&rrs.index, 1)
idx := int(i) % len(rrs.stores)
Expand Down
31 changes: 31 additions & 0 deletions shed/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,37 @@ func (f Index) Get(keyFields Item) (out Item, err error) {
return out.Merge(keyFields), nil
}

// Fill populates fields on provided items that are part of the
// encoded value by getting them based on information passed in their
// fields. Every item must have all fields needed for encoding the
// key set. The passed slice items will be changed so that they
// contain data from the index values. No new slice is allocated.
// This function uses a single leveldb snapshot.
func (f Index) Fill(items []Item) (err error) {
snapshot, err := f.db.ldb.GetSnapshot()
if err != nil {
return err
}
defer snapshot.Release()

for i, item := range items {
key, err := f.encodeKeyFunc(item)
if err != nil {
return err
}
value, err := snapshot.Get(key, nil)
if err != nil {
return err
}
v, err := f.decodeValueFunc(item, value)
if err != nil {
return err
}
items[i] = v.Merge(item)
}
return nil
}

// Has accepts key fields represented as Item to check
// if there this Item's encoded key is stored in
// the index.
Expand Down
64 changes: 63 additions & 1 deletion shed/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ var retrievalIndexFuncs = IndexFuncs{
},
}

// TestIndex validates put, get, has and delete functions of the Index implementation.
// TestIndex validates put, get, fill, has and delete functions of the Index implementation.
func TestIndex(t *testing.T) {
db, cleanupFunc := newTestDB(t)
defer cleanupFunc()
Expand Down Expand Up @@ -283,6 +283,68 @@ func TestIndex(t *testing.T) {
t.Fatalf("got error %v, want %v", err, wantErr)
}
})

t.Run("fill", func(t *testing.T) {
want := []Item{
{
Address: []byte("put-hash-1"),
Data: []byte("DATA123"),
StoreTimestamp: time.Now().UTC().UnixNano(),
},
{
Address: []byte("put-hash-32"),
Data: []byte("DATA124"),
StoreTimestamp: time.Now().UTC().UnixNano(),
},
{
Address: []byte("put-hash-42"),
Data: []byte("DATA125"),
StoreTimestamp: time.Now().UTC().UnixNano(),
},
{
Address: []byte("put-hash-71"),
Data: []byte("DATA126"),
StoreTimestamp: time.Now().UTC().UnixNano(),
},
}

for _, item := range want {
err := index.Put(item)
if err != nil {
t.Fatal(err)
}
}
items := make([]Item, len(want))
for i, w := range want {
items[i] = Item{
Address: w.Address,
}
}
err = index.Fill(items)
if err != nil {
t.Fatal(err)
}
for i := range items {
checkItem(t, items[i], want[i])
}

t.Run("not found", func(t *testing.T) {
items := make([]Item, len(want))
for i, w := range want {
items[i] = Item{
Address: w.Address,
}
}
items = append(items, Item{
Address: []byte("put-hash-missing"),
})
want := leveldb.ErrNotFound
err := index.Fill(items)
if err != want {
t.Errorf("got error %v, want %v", err, want)
}
})
})
}

// TestIndex_Iterate validates index Iterate
Expand Down
13 changes: 13 additions & 0 deletions storage/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,19 @@ func (m *MapChunkStore) Get(_ context.Context, _ chunk.ModeGet, ref Address) (Ch
return chunk, nil
}

func (m *MapChunkStore) GetMulti(_ context.Context, _ chunk.ModeGet, refs ...Address) (chunks []Chunk, err error) {
m.mu.RLock()
defer m.mu.RUnlock()
for _, ref := range refs {
chunk := m.chunks[ref.Hex()]
if chunk == nil {
return nil, ErrChunkNotFound
}
chunks = append(chunks, chunk)
}
return chunks, nil
}

// Need to implement Has from SyncChunkStore
func (m *MapChunkStore) Has(ctx context.Context, ref Address) (has bool, err error) {
m.mu.RLock()
Expand Down
9 changes: 9 additions & 0 deletions storage/localstore/localstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,15 @@ func generateTestRandomChunks(count int) []chunk.Chunk {
return chunks
}

// chunkAddresses return chunk addresses of provided chunks.
func chunkAddresses(chunks []chunk.Chunk) []chunk.Address {
addrs := make([]chunk.Address, len(chunks))
for i, ch := range chunks {
addrs[i] = ch.Address()
}
return addrs
}

// TestGenerateTestRandomChunk validates that
// generateTestRandomChunk returns random data by comparing
// two generated chunks.
Expand Down
65 changes: 37 additions & 28 deletions storage/localstore/mode_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,34 +67,7 @@ func (db *DB) get(mode chunk.ModeGet, addr chunk.Address) (out shed.Item, err er
switch mode {
// update the access timestamp and gc index
case chunk.ModeGetRequest:
if db.updateGCSem != nil {
// wait before creating new goroutines
// if updateGCSem buffer id full
db.updateGCSem <- struct{}{}
}
db.updateGCWG.Add(1)
go func() {
defer db.updateGCWG.Done()
if db.updateGCSem != nil {
// free a spot in updateGCSem buffer
// for a new goroutine
defer func() { <-db.updateGCSem }()
}

metricName := "localstore.updateGC"
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
defer totalTimeMetric(metricName, time.Now())

err := db.updateGC(out)
if err != nil {
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
log.Error("localstore update gc", "err", err)
}
// if gc update hook is defined, call it
if testHookUpdateGC != nil {
testHookUpdateGC()
}
}()
db.updateGCItems(out)

case chunk.ModeGetPin:
pinnedItem, err := db.pinIndex.Get(item)
Expand All @@ -112,6 +85,42 @@ func (db *DB) get(mode chunk.ModeGet, addr chunk.Address) (out shed.Item, err er
return out, nil
}

// updateGCItems is called when ModeGetRequest is used
// for Get or GetMulti to update access time and gc indexes
// for all returned chunks.
func (db *DB) updateGCItems(items ...shed.Item) {
if db.updateGCSem != nil {
// wait before creating new goroutines
// if updateGCSem buffer id full
db.updateGCSem <- struct{}{}
}
db.updateGCWG.Add(1)
go func() {
defer db.updateGCWG.Done()
if db.updateGCSem != nil {
// free a spot in updateGCSem buffer
// for a new goroutine
defer func() { <-db.updateGCSem }()
}

metricName := "localstore.updateGC"
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
defer totalTimeMetric(metricName, time.Now())

for _, item := range items {
err := db.updateGC(item)
if err != nil {
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
log.Error("localstore update gc", "err", err)
}
}
// if gc update hook is defined, call it
if testHookUpdateGC != nil {
testHookUpdateGC()
}
}()
}

// updateGC updates garbage collection index for
// a single item. Provided item is expected to have
// only Address and Data fields with non zero values,
Expand Down
91 changes: 91 additions & 0 deletions storage/localstore/mode_get_multi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2019 The Swarm Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package localstore

import (
"context"
"fmt"
"time"

"github.com/ethereum/go-ethereum/metrics"
"github.com/ethersphere/swarm/chunk"
"github.com/ethersphere/swarm/shed"
"github.com/syndtr/goleveldb/leveldb"
)

// GetMulti returns chunks from the database. If one of the chunks is not found
// chunk.ErrChunkNotFound will be returned. All required indexes will be updated
// required by the Getter Mode. GetMulti is required to implement chunk.Store
// interface.
func (db *DB) GetMulti(ctx context.Context, mode chunk.ModeGet, addrs ...chunk.Address) (chunks []chunk.Chunk, err error) {
metricName := fmt.Sprintf("localstore.GetMulti.%s", mode)

metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
defer totalTimeMetric(metricName, time.Now())

defer func() {
if err != nil {
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
}
}()

out, err := db.getMulti(mode, addrs...)
if err != nil {
if err == leveldb.ErrNotFound {
return nil, chunk.ErrChunkNotFound
}
return nil, err
}
chunks = make([]chunk.Chunk, len(out))
for i, ch := range out {
chunks[i] = chunk.NewChunk(ch.Address, ch.Data).WithPinCounter(ch.PinCounter)
}
return chunks, nil
}

// getMulti returns Items from the retrieval index
// and updates other indexes.
func (db *DB) getMulti(mode chunk.ModeGet, addrs ...chunk.Address) (out []shed.Item, err error) {
out = make([]shed.Item, len(addrs))
for i, addr := range addrs {
out[i].Address = addr
}

err = db.retrievalDataIndex.Fill(out)
if err != nil {
return nil, err
}

switch mode {
// update the access timestamp and gc index
case chunk.ModeGetRequest:
db.updateGCItems(out...)

case chunk.ModeGetPin:
err := db.pinIndex.Fill(out)
if err != nil {
return nil, err
}

// no updates to indexes
case chunk.ModeGetSync:
case chunk.ModeGetLookup:
default:
return out, ErrInvalidMode
}
return out, nil
}
Loading

0 comments on commit ca56afd

Please sign in to comment.