diff --git a/cmd/swarm/db.go b/cmd/swarm/db.go index 63c954e51a4f..a080be7f57d1 100644 --- a/cmd/swarm/db.go +++ b/cmd/swarm/db.go @@ -112,6 +112,8 @@ func openLDBStore(path string, basekey []byte) (*storage.LDBStore, error) { if _, err := os.Stat(filepath.Join(path, "CURRENT")); err != nil { return nil, fmt.Errorf("invalid chunkdb path: %s", err) } - storeParams := storage.NewLDBStoreParams(path, 10000000, nil, nil) - return storage.NewLDBStore(storeParams) + + storeparams := storage.NewDefaultStoreParams() + ldbparams := storage.NewLDBStoreParams(storeparams, path) + return storage.NewLDBStore(ldbparams) } diff --git a/metrics/timer_test.go b/metrics/timer_test.go index c1f0ff9388fb..8638a2270bbb 100644 --- a/metrics/timer_test.go +++ b/metrics/timer_test.go @@ -47,8 +47,8 @@ func TestTimerStop(t *testing.T) { func TestTimerFunc(t *testing.T) { tm := NewTimer() tm.Time(func() { time.Sleep(50e6) }) - if max := tm.Max(); 35e6 > max || max > 95e6 { - t.Errorf("tm.Max(): 35e6 > %v || %v > 95e6\n", max, max) + if max := tm.Max(); 35e6 > max || max > 145e6 { + t.Errorf("tm.Max(): 35e6 > %v || %v > 145e6\n", max, max) } } diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go index 18f97d81194d..dc8a217821ba 100644 --- a/swarm/network/stream/common_test.go +++ b/swarm/network/stream/common_test.go @@ -93,7 +93,7 @@ func NewStreamerService(ctx *adapters.ServiceContext) (node.Service, error) { db := storage.NewDBAPI(store) delivery := NewDelivery(kad, db) deliveries[id] = delivery - r := NewRegistry(addr, delivery, db, state.NewMemStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: defaultSkipCheck, DoRetrieve: false, }) @@ -153,7 +153,7 @@ func newStreamerTester(t *testing.T) (*p2ptest.ProtocolTester, *Registry, *stora db := storage.NewDBAPI(localStore) delivery := NewDelivery(to, db) - streamer := NewRegistry(addr, delivery, db, state.NewMemStore(), &RegistryOptions{ + streamer := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: defaultSkipCheck, }) teardown := func() { diff --git a/swarm/network/stream/intervals/store_test.go b/swarm/network/stream/intervals/store_test.go index 5efb6ae8a6ce..0ab14c065c13 100644 --- a/swarm/network/stream/intervals/store_test.go +++ b/swarm/network/stream/intervals/store_test.go @@ -25,9 +25,9 @@ import ( var ErrNotFound = errors.New("not found") -// TestMemStore tests basic functionality of MemStore. -func TestMemStore(t *testing.T) { - testStore(t, state.NewMemStore()) +// TestInmemoryStore tests basic functionality of InmemoryStore. +func TestInmemoryStore(t *testing.T) { + testStore(t, state.NewInmemoryStore()) } // testStore is a helper function to test various Store implementations. diff --git a/swarm/network/stream/intervals_test.go b/swarm/network/stream/intervals_test.go index c65ac27fc416..4dd12191610d 100644 --- a/swarm/network/stream/intervals_test.go +++ b/swarm/network/stream/intervals_test.go @@ -51,7 +51,7 @@ func newIntervalsStreamerService(ctx *adapters.ServiceContext) (node.Service, er db := storage.NewDBAPI(store) delivery := NewDelivery(kad, db) deliveries[id] = delivery - r := NewRegistry(addr, delivery, db, state.NewMemStore(), &RegistryOptions{ + r := NewRegistry(addr, delivery, db, state.NewInmemoryStore(), &RegistryOptions{ SkipCheck: defaultSkipCheck, }) diff --git a/swarm/pss/client/client_test.go b/swarm/pss/client/client_test.go index cc33b47231a3..02fbb76cc96d 100644 --- a/swarm/pss/client/client_test.go +++ b/swarm/pss/client/client_test.go @@ -210,7 +210,7 @@ func setupNetwork(numnodes int) (clients []*rpc.Client, err error) { } func newServices() adapters.Services { - stateStore := state.NewMemStore() + stateStore := state.NewInmemoryStore() kademlias := make(map[discover.NodeID]*network.Kademlia) kademlia := func(id discover.NodeID) *network.Kademlia { if k, ok := kademlias[id]; ok { diff --git a/swarm/pss/pss_test.go b/swarm/pss/pss_test.go index 29961ee3ae5a..14cbd660c428 100644 --- a/swarm/pss/pss_test.go +++ b/swarm/pss/pss_test.go @@ -1326,7 +1326,7 @@ func setupNetwork(numnodes int, allowRaw bool) (clients []*rpc.Client, err error } func newServices(allowRaw bool) adapters.Services { - stateStore := state.NewMemStore() + stateStore := state.NewInmemoryStore() kademlias := make(map[discover.NodeID]*network.Kademlia) kademlia := func(id discover.NodeID) *network.Kademlia { if k, ok := kademlias[id]; ok { diff --git a/swarm/state/memstore.go b/swarm/state/inmemorystore.go similarity index 79% rename from swarm/state/memstore.go rename to swarm/state/inmemorystore.go index 140697bdd09d..1ca25404a19e 100644 --- a/swarm/state/memstore.go +++ b/swarm/state/inmemorystore.go @@ -22,23 +22,23 @@ import ( "sync" ) -// MemStore is the reference implementation of Store interface that is supposed +// InmemoryStore is the reference implementation of Store interface that is supposed // to be used in tests. -type MemStore struct { +type InmemoryStore struct { db map[string][]byte mu sync.RWMutex } -// NewMemStore returns a new instance of MemStore. -func NewMemStore() *MemStore { - return &MemStore{ +// NewInmemoryStore returns a new instance of InmemoryStore. +func NewInmemoryStore() *InmemoryStore { + return &InmemoryStore{ db: make(map[string][]byte), } } // Get retrieves a value stored for a specific key. If there is no value found, // ErrNotFound is returned. -func (s *MemStore) Get(key string, i interface{}) (err error) { +func (s *InmemoryStore) Get(key string, i interface{}) (err error) { s.mu.RLock() defer s.mu.RUnlock() @@ -56,7 +56,7 @@ func (s *MemStore) Get(key string, i interface{}) (err error) { } // Put stores a value for a specific key. -func (s *MemStore) Put(key string, i interface{}) (err error) { +func (s *InmemoryStore) Put(key string, i interface{}) (err error) { s.mu.Lock() defer s.mu.Unlock() bytes := []byte{} @@ -77,7 +77,7 @@ func (s *MemStore) Put(key string, i interface{}) (err error) { } // Delete removes value stored under a specific key. -func (s *MemStore) Delete(key string) (err error) { +func (s *InmemoryStore) Delete(key string) (err error) { s.mu.Lock() defer s.mu.Unlock() @@ -89,6 +89,6 @@ func (s *MemStore) Delete(key string) (err error) { } // Close does not do anything. -func (s *MemStore) Close() error { +func (s *InmemoryStore) Close() error { return nil } diff --git a/swarm/storage/common_test.go b/swarm/storage/common_test.go index 0ad114409562..41155e8d7f7a 100644 --- a/swarm/storage/common_test.go +++ b/swarm/storage/common_test.go @@ -34,7 +34,7 @@ import ( ) var ( - loglevel = flag.Int("loglevel", 2, "verbosity of logs") + loglevel = flag.Int("loglevel", 3, "verbosity of logs") ) func init() { diff --git a/swarm/storage/dpa.go b/swarm/storage/dpa.go index 753edbe85367..0a5bd95c6e53 100644 --- a/swarm/storage/dpa.go +++ b/swarm/storage/dpa.go @@ -34,8 +34,9 @@ implementation for storage or retrieval. */ const ( - singletonSwarmDbCapacity = 50000 - singletonSwarmCacheCapacity = 500 + defaultLDBCapacity = 5000000 // capacity for LevelDB, by default 5*10^6*4096 bytes == 20GB + defaultCacheCapacity = 500 // capacity for in-memory chunks' cache + defaultChunkRequestsCacheCapacity = 5000000 // capacity for container holding outgoing requests for chunks. should be set to LevelDB capacity ) var ( diff --git a/swarm/storage/dpa_test.go b/swarm/storage/dpa_test.go index 5bb7257c15cb..245c4430a5e8 100644 --- a/swarm/storage/dpa_test.go +++ b/swarm/storage/dpa_test.go @@ -39,8 +39,7 @@ func testDpaRandom(toEncrypt bool, t *testing.T) { defer tdb.close() db := tdb.LDBStore db.setCapacity(50000) - storeParams := NewStoreParams(defaultCacheCapacity, nil, nil) - memStore := NewMemStore(storeParams, db) + memStore := NewMemStore(NewDefaultStoreParams(), db) localStore := &LocalStore{ memStore: memStore, DbStore: db, @@ -72,7 +71,7 @@ func testDpaRandom(toEncrypt bool, t *testing.T) { } ioutil.WriteFile("/tmp/slice.bzz.16M", slice, 0666) ioutil.WriteFile("/tmp/result.bzz.16M", resultSlice, 0666) - localStore.memStore = NewMemStore(storeParams, db) + localStore.memStore = NewMemStore(NewDefaultStoreParams(), db) resultReader, isEncrypted = dpa.Retrieve(key) if isEncrypted != toEncrypt { t.Fatalf("isEncrypted expected %v got %v", toEncrypt, isEncrypted) @@ -104,9 +103,7 @@ func testDPA_capacity(toEncrypt bool, t *testing.T) { } defer tdb.close() db := tdb.LDBStore - storeParams := NewStoreParams(0, nil, nil) - storeParams.CacheCapacity = 10000000 - memStore := NewMemStore(storeParams, db) + memStore := NewMemStore(NewDefaultStoreParams(), db) localStore := &LocalStore{ memStore: memStore, DbStore: db, diff --git a/swarm/storage/ldbstore.go b/swarm/storage/ldbstore.go index 597642629b90..42fa731aafcd 100644 --- a/swarm/storage/ldbstore.go +++ b/swarm/storage/ldbstore.go @@ -30,6 +30,7 @@ import ( "fmt" "io" "io/ioutil" + "sort" "sync" "github.com/ethereum/go-ethereum/log" @@ -47,15 +48,8 @@ var ( ) const ( - defaultDbCapacity = 5000000 - defaultRadius = 0 // not yet used - - gcArraySize = 10000 gcArrayFreeRatio = 0.1 - - // key prefixes for leveldb storage - kpIndex = 0 - kpData = 1 + maxGCitems = 5000 // max number of items to be gc'd per call to collectGarbage() ) var ( @@ -73,6 +67,7 @@ type gcItem struct { idx uint64 value uint64 idxKey []byte + po uint8 } type LDBStoreParams struct { @@ -82,16 +77,7 @@ type LDBStoreParams struct { } // NewLDBStoreParams constructs LDBStoreParams with the specified values. -// if 0 is set for capacity or nil for hash, and basekey is specified, default values for these params will be used -// path has no default value -func NewLDBStoreParams(path string, capacity uint64, hash SwarmHasher, basekey []byte) *LDBStoreParams { - if hash == nil { - hash = MakeHashFunc(SHA3Hash) - } - if capacity == 0 { - capacity = singletonSwarmDbCapacity - } - storeparams := NewStoreParams(capacity, hash, basekey) +func NewLDBStoreParams(storeparams *StoreParams, path string) *LDBStoreParams { return &LDBStoreParams{ StoreParams: storeparams, Path: path, @@ -103,11 +89,11 @@ type LDBStore struct { db *LDBDatabase // this should be stored in db, accessed transactionally - entryCnt, accessCnt, dataIdx, capacity uint64 - bucketCnt []uint64 - - gcPos, gcStartPos []byte - gcArray []*gcItem + entryCnt uint64 // number of items in the LevelDB + accessCnt uint64 // ever-accumulating number increased every time we read/access an entry + dataIdx uint64 // similar to entryCnt, but we only increment it + capacity uint64 + bucketCnt []uint64 hashfunc SwarmHasher po func(Key) uint8 @@ -149,10 +135,6 @@ func NewLDBStore(params *LDBStoreParams) (s *LDBStore, err error) { s.po = params.Po s.setCapacity(params.DbCapacity) - s.gcStartPos = make([]byte, 1) - s.gcStartPos[0] = kpIndex - s.gcArray = make([]*gcItem, gcArraySize) - s.bucketCnt = make([]uint64, 0x100) for i := 0; i < 0x100; i++ { k := make([]byte, 2) @@ -172,10 +154,6 @@ func NewLDBStore(params *LDBStoreParams) (s *LDBStore, err error) { s.dataIdx = BytesToU64(data) s.dataIdx++ - s.gcPos, _ = s.db.Get(keyGCPos) - if s.gcPos == nil { - s.gcPos = s.gcStartPos - } return s, nil } @@ -205,21 +183,15 @@ func BytesToU64(data []byte) uint64 { if len(data) < 8 { return 0 } - //return binary.LittleEndian.Uint64(data) return binary.BigEndian.Uint64(data) } func U64ToBytes(val uint64) []byte { data := make([]byte, 8) - //binary.LittleEndian.PutUint64(data, val) binary.BigEndian.PutUint64(data, val) return data } -func getIndexGCValue(index *dpaDBIndex) uint64 { - return index.Access -} - func (s *LDBStore) updateIndexAccess(index *dpaDBIndex) { index.Access = s.accessCnt } @@ -261,7 +233,6 @@ func encodeData(chunk *Chunk) []byte { func decodeIndex(data []byte, index *dpaDBIndex) error { dec := rlp.NewStream(bytes.NewReader(data), 0) return dec.Decode(index) - } func decodeData(data []byte, chunk *Chunk) { @@ -274,98 +245,49 @@ func decodeOldData(data []byte, chunk *Chunk) { chunk.Size = int64(binary.BigEndian.Uint64(data[0:8])) } -func gcListPartition(list []*gcItem, left int, right int, pivotIndex int) int { - pivotValue := list[pivotIndex].value - dd := list[pivotIndex] - list[pivotIndex] = list[right] - list[right] = dd - storeIndex := left - for i := left; i < right; i++ { - if list[i].value < pivotValue { - dd = list[storeIndex] - list[storeIndex] = list[i] - list[i] = dd - storeIndex++ - } - } - dd = list[storeIndex] - list[storeIndex] = list[right] - list[right] = dd - return storeIndex -} - -func gcListSelect(list []*gcItem, left int, right int, n int) int { - if left == right { - return left - } - pivotIndex := (left + right) / 2 - pivotIndex = gcListPartition(list, left, right, pivotIndex) - if n == pivotIndex { - return n - } else { - if n < pivotIndex { - return gcListSelect(list, left, pivotIndex-1, n) - } else { - return gcListSelect(list, pivotIndex+1, right, n) - } - } -} - func (s *LDBStore) collectGarbage(ratio float32) { it := s.db.NewIterator() - it.Seek(s.gcPos) - if it.Valid() { - s.gcPos = it.Key() - } else { - s.gcPos = nil - } - gcnt := 0 + defer it.Release() - for (gcnt < gcArraySize) && (uint64(gcnt) < s.entryCnt) { + garbage := []*gcItem{} + gcnt := 0 - if (s.gcPos == nil) || (s.gcPos[0] != kpIndex) { - it.Seek(s.gcStartPos) - if it.Valid() { - s.gcPos = it.Key() - } else { - s.gcPos = nil - } - } + for ok := it.Seek([]byte{keyIndex}); ok && (gcnt < maxGCitems) && (uint64(gcnt) < s.entryCnt); ok = it.Next() { + itkey := it.Key() - if (s.gcPos == nil) || (s.gcPos[0] != kpIndex) { + if (itkey == nil) || (itkey[0] != keyIndex) { break } - gci := new(gcItem) - gci.idxKey = s.gcPos + // it.Key() contents change on next call to it.Next(), so we must copy it + key := make([]byte, len(it.Key())) + copy(key, it.Key()) + + val := it.Value() + var index dpaDBIndex - decodeIndex(it.Value(), &index) - gci.idx = index.Idx - // the smaller, the more likely to be gc'd - gci.value = getIndexGCValue(&index) - s.gcArray[gcnt] = gci - gcnt++ - it.Next() - if it.Valid() { - s.gcPos = it.Key() - } else { - s.gcPos = nil - } - } - it.Release() - cutidx := gcListSelect(s.gcArray, 0, gcnt-1, int(float32(gcnt)*ratio)) - cutval := s.gcArray[cutidx].value + hash := key[1:] + decodeIndex(val, &index) + po := s.po(hash) - // actual gc - for i := 0; i < gcnt; i++ { - if s.gcArray[i].value <= cutval { - gcCounter.Inc(1) - s.delete(s.gcArray[i].idx, s.gcArray[i].idxKey, s.po(Key(s.gcPos[1:]))) + gci := &gcItem{ + idxKey: key, + idx: index.Idx, + value: index.Access, // the smaller, the more likely to be gc'd. see sort comparator below. + po: po, } + + garbage = append(garbage, gci) + gcnt++ } - s.db.Put(keyGCPos, s.gcPos) + sort.Slice(garbage[:gcnt], func(i, j int) bool { return garbage[i].value < garbage[j].value }) + + cutoff := int(float32(gcnt) * ratio) + for i := 0; i < cutoff; i++ { + s.delete(garbage[i].idx, garbage[i].idxKey, garbage[i].po) + } } // Export writes all chunks from the store to a tar archive, returning the @@ -377,9 +299,9 @@ func (s *LDBStore) Export(out io.Writer) (int64, error) { it := s.db.NewIterator() defer it.Release() var count int64 - for ok := it.Seek([]byte{kpIndex}); ok; ok = it.Next() { + for ok := it.Seek([]byte{keyIndex}); ok; ok = it.Next() { key := it.Key() - if (key == nil) || (key[0] != kpIndex) { + if (key == nil) || (key[0] != keyIndex) { break } @@ -460,13 +382,13 @@ func (s *LDBStore) Import(in io.Reader) (int64, error) { func (s *LDBStore) Cleanup() { //Iterates over the database and checks that there are no faulty chunks it := s.db.NewIterator() - startPosition := []byte{kpIndex} + startPosition := []byte{keyIndex} it.Seek(startPosition) var key []byte var errorsFound, total int for it.Valid() { key = it.Key() - if (key == nil) || (key[0] != kpIndex) { + if (key == nil) || (key[0] != keyIndex) { break } total++ @@ -632,17 +554,17 @@ func (s *LDBStore) writeBatches() { c := s.batchC s.batchC = make(chan bool) s.batch = new(leveldb.Batch) - s.lock.Unlock() err := s.writeBatch(b, e, d, a) // TODO: set this error on the batch, then tell the chunk if err != nil { - log.Error(fmt.Sprintf("DbStore: spawn batch write (%d chunks): %v", b.Len(), err)) + log.Error(fmt.Sprintf("spawn batch write (%d entries): %v", b.Len(), err)) } close(c) - if e >= s.capacity { - log.Trace(fmt.Sprintf("DbStore: collecting garbage...(%d chunks)", e)) + for e > s.capacity { s.collectGarbage(gcArrayFreeRatio) + e = s.entryCnt } + s.lock.Unlock() } log.Trace(fmt.Sprintf("DbStore: quit batch write loop")) } @@ -656,7 +578,7 @@ func (s *LDBStore) writeBatch(b *leveldb.Batch, entryCnt, dataIdx, accessCnt uin if err := s.db.Write(b); err != nil { return fmt.Errorf("unable to write batch: %v", err) } - log.Trace(fmt.Sprintf("DbStore: batch write (%d chunks) complete", l)) + log.Trace(fmt.Sprintf("batch write (%d entries)", l)) return nil } diff --git a/swarm/storage/ldbstore_test.go b/swarm/storage/ldbstore_test.go index 005d4dca30a6..3702bef0fa5f 100644 --- a/swarm/storage/ldbstore_test.go +++ b/swarm/storage/ldbstore_test.go @@ -23,10 +23,13 @@ import ( "os" "sync" "testing" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/swarm/storage/mock/mem" + + ldberrors "github.com/syndtr/goleveldb/leveldb/errors" ) type testDbStore struct { @@ -41,7 +44,8 @@ func newTestDbStore(mock bool, trusted bool) (*testDbStore, error) { } var db *LDBStore - params := NewLDBStoreParams(dir, defaultDbCapacity, nil, nil) + storeparams := NewDefaultStoreParams() + params := NewLDBStoreParams(storeparams, dir) params.Po = testPoFunc if mock { @@ -271,3 +275,245 @@ func BenchmarkMockDbStoreGet_1_5k(b *testing.B) { func BenchmarkMockDbStoreGet_8_5k(b *testing.B) { benchmarkDbStoreGet(5000, 8, 4096, true, b) } + +// TestLDBStoreWithoutCollectGarbage tests that we can put a number of random chunks in the LevelDB store, and +// retrieve them, provided we don't hit the garbage collection +func TestLDBStoreWithoutCollectGarbage(t *testing.T) { + chunkSize := uint64(4096) + capacity := 50 + n := 10 + + ldb, cleanup := newLDBStore(t) + ldb.setCapacity(uint64(capacity)) + defer cleanup() + + chunks := []*Chunk{} + for i := 0; i < n; i++ { + c := NewRandomChunk(chunkSize) + chunks = append(chunks, c) + log.Trace("generate random chunk", "idx", i, "chunk", c) + } + + for i := 0; i < n; i++ { + go ldb.Put(chunks[i]) + } + + // wait for all chunks to be stored + for i := 0; i < n; i++ { + <-chunks[i].dbStoredC + } + + log.Info("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt) + + for i := 0; i < n; i++ { + ret, err := ldb.Get(chunks[i].Key) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(ret.SData, chunks[i].SData) { + t.Fatal("expected to get the same data back, but got smth else") + } + + log.Info("got back chunk", "chunk", ret) + } + + if ldb.entryCnt != uint64(n+1) { + t.Fatalf("expected entryCnt to be equal to %v, but got %v", n+1, ldb.entryCnt) + } + + if ldb.accessCnt != uint64(2*n+1) { + t.Fatalf("expected accessCnt to be equal to %v, but got %v", n+1, ldb.accessCnt) + } +} + +// TestLDBStoreCollectGarbage tests that we can put more chunks than LevelDB's capacity, and +// retrieve only some of them, because garbage collection must have cleared some of them +func TestLDBStoreCollectGarbage(t *testing.T) { + chunkSize := uint64(4096) + capacity := 500 + n := 2000 + + ldb, cleanup := newLDBStore(t) + ldb.setCapacity(uint64(capacity)) + defer cleanup() + + chunks := []*Chunk{} + for i := 0; i < n; i++ { + c := NewRandomChunk(chunkSize) + chunks = append(chunks, c) + log.Trace("generate random chunk", "idx", i, "chunk", c) + } + + for i := 0; i < n; i++ { + ldb.Put(chunks[i]) + } + + // wait for all chunks to be stored + for i := 0; i < n; i++ { + <-chunks[i].dbStoredC + } + + log.Info("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt) + + // wait for garbage collection to kick in on the responsible actor + time.Sleep(5 * time.Second) + + var missing int + for i := 0; i < n; i++ { + ret, err := ldb.Get(chunks[i].Key) + if err == ErrChunkNotFound || err == ldberrors.ErrNotFound { + missing++ + continue + } + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(ret.SData, chunks[i].SData) { + t.Fatal("expected to get the same data back, but got smth else") + } + + log.Trace("got back chunk", "chunk", ret) + } + + if missing < n-capacity { + t.Fatalf("gc failure: expected to miss %v chunks, but only %v are actually missing", n-capacity, missing) + } + + log.Info("ldbstore", "total", n, "missing", missing, "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt) +} + +// TestLDBStoreAddRemove tests that we can put and then delete a given chunk +func TestLDBStoreAddRemove(t *testing.T) { + ldb, cleanup := newLDBStore(t) + ldb.setCapacity(200) + defer cleanup() + + n := 100 + + chunks := []*Chunk{} + for i := 0; i < n; i++ { + c := NewRandomChunk(chunkSize) + chunks = append(chunks, c) + log.Trace("generate random chunk", "idx", i, "chunk", c) + } + + for i := 0; i < n; i++ { + go ldb.Put(chunks[i]) + } + + // wait for all chunks to be stored before continuing + for i := 0; i < n; i++ { + <-chunks[i].dbStoredC + } + + for i := 0; i < n; i++ { + // delete all even index chunks + if i%2 == 0 { + + key := chunks[i].Key + ikey := getIndexKey(key) + + var indx dpaDBIndex + ldb.tryAccessIdx(ikey, &indx) + + ldb.delete(indx.Idx, ikey, ldb.po(key)) + } + } + + log.Info("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt) + + for i := 0; i < n; i++ { + ret, err := ldb.Get(chunks[i].Key) + + if i%2 == 0 { + // expect even chunks to be missing + if err == nil || ret != nil { + t.Fatal("expected chunk to be missing, but got no error") + } + } else { + // expect odd chunks to be retrieved successfully + if err != nil { + t.Fatalf("expected no error, but got %s", err) + } + + if !bytes.Equal(ret.SData, chunks[i].SData) { + t.Fatal("expected to get the same data back, but got smth else") + } + } + } +} + +// TestLDBStoreRemoveThenCollectGarbage tests that we can delete chunks and that we can trigger garbage collection +func TestLDBStoreRemoveThenCollectGarbage(t *testing.T) { + capacity := 10 + + ldb, cleanup := newLDBStore(t) + ldb.setCapacity(uint64(capacity)) + + n := 7 + + chunks := []*Chunk{} + for i := 0; i < capacity; i++ { + c := NewRandomChunk(chunkSize) + chunks = append(chunks, c) + log.Trace("generate random chunk", "idx", i, "chunk", c) + } + + for i := 0; i < n; i++ { + ldb.Put(chunks[i]) + } + + // wait for all chunks to be stored before continuing + for i := 0; i < n; i++ { + <-chunks[i].dbStoredC + } + + // delete all chunks + for i := 0; i < n; i++ { + key := chunks[i].Key + ikey := getIndexKey(key) + + var indx dpaDBIndex + ldb.tryAccessIdx(ikey, &indx) + + ldb.delete(indx.Idx, ikey, ldb.po(key)) + } + + log.Info("ldbstore", "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt) + + cleanup() + + ldb, cleanup = newLDBStore(t) + ldb.setCapacity(uint64(capacity)) + + n = 10 + + for i := 0; i < n; i++ { + ldb.Put(chunks[i]) + } + + // wait for all chunks to be stored before continuing + for i := 0; i < n; i++ { + <-chunks[i].dbStoredC + } + + // expect for first chunk to be missing, because it has the smallest access value + idx := 0 + ret, err := ldb.Get(chunks[idx].Key) + if err == nil || ret != nil { + t.Fatal("expected first chunk to be missing, but got no error") + } + + // expect for last chunk to be present, as it has the largest access value + idx = 9 + ret, err = ldb.Get(chunks[idx].Key) + if err != nil { + t.Fatalf("expected no error, but got %s", err) + } + + if !bytes.Equal(ret.SData, chunks[idx].SData) { + t.Fatal("expected to get the same data back, but got smth else") + } +} diff --git a/swarm/storage/localstore.go b/swarm/storage/localstore.go index 34c05859fad8..4ab87c3e93d0 100644 --- a/swarm/storage/localstore.go +++ b/swarm/storage/localstore.go @@ -39,7 +39,7 @@ type LocalStoreParams struct { func NewDefaultLocalStoreParams() *LocalStoreParams { return &LocalStoreParams{ - StoreParams: NewStoreParams(0, nil, nil), + StoreParams: NewDefaultStoreParams(), } } @@ -62,7 +62,7 @@ type LocalStore struct { // This constructor uses MemStore and DbStore as components func NewLocalStore(params *LocalStoreParams, mockStore *mock.NodeStore) (*LocalStore, error) { - ldbparams := NewLDBStoreParams(params.ChunkDbPath, params.DbCapacity, params.Hash, params.BaseKey) + ldbparams := NewLDBStoreParams(params.StoreParams, params.ChunkDbPath) dbStore, err := NewMockDbStore(ldbparams, mockStore) if err != nil { return nil, err @@ -75,7 +75,7 @@ func NewLocalStore(params *LocalStoreParams, mockStore *mock.NodeStore) (*LocalS } func NewTestLocalStoreForAddr(params *LocalStoreParams) (*LocalStore, error) { - ldbparams := NewLDBStoreParams(params.ChunkDbPath, params.DbCapacity, params.Hash, params.BaseKey) + ldbparams := NewLDBStoreParams(params.StoreParams, params.ChunkDbPath) dbStore, err := NewLDBStore(ldbparams) if err != nil { return nil, err @@ -88,10 +88,6 @@ func NewTestLocalStoreForAddr(params *LocalStoreParams) (*LocalStore, error) { return localStore, nil } -func (self *LocalStore) CacheCounter() uint64 { - return uint64(self.memStore.Counter()) -} - func (self *LocalStore) Put(chunk *Chunk) { valid := true for _, v := range self.Validators { diff --git a/swarm/storage/memstore.go b/swarm/storage/memstore.go index 529b9dfa247e..7266bc92bed7 100644 --- a/swarm/storage/memstore.go +++ b/swarm/storage/memstore.go @@ -1,4 +1,4 @@ -// Copyright 2016 The go-ethereum Authors +// Copyright 2018 The go-ethereum Authors // This file is part of the go-ethereum library. // // The go-ethereum library is free software: you can redistribute it and/or modify @@ -19,356 +19,129 @@ package storage import ( - "fmt" "sync" - "time" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/metrics" -) - -//metrics variables -var ( - memstorePutCounter = metrics.NewRegisteredCounter("storage.db.memstore.put.count", nil) - memstoreRemoveCounter = metrics.NewRegisteredCounter("storage.db.memstore.rm.count", nil) -) - -const ( - memTreeLW = 2 // log2(subtree count) of the subtrees - memTreeFLW = 14 // log2(subtree count) of the root layer - dbForceUpdateAccessCnt = 1000 - defaultCacheCapacity = 5000 + lru "github.com/hashicorp/golang-lru" ) type MemStore struct { - memtree *memTree - entryCnt, capacity uint // stored entries - accessCnt uint64 // access counter; oldest is thrown away when full - dbAccessCnt uint64 - ldbStore *LDBStore - lock sync.Mutex -} - -/* -a hash prefix subtree containing subtrees or one storage entry (but never both) - -- access[0] stores the smallest (oldest) access count value in this subtree -- if it contains more subtrees and its subtree count is at least 4, access[1:2] - stores the smallest access count in the first and second halves of subtrees - (so that access[0] = min(access[1], access[2]) -- likewise, if subtree count is at least 8, - access[1] = min(access[3], access[4]) - access[2] = min(access[5], access[6]) - (access[] is a binary tree inside the multi-bit leveled hash tree) -*/ - -func NewMemStore(params *StoreParams, d *LDBStore) (m *MemStore) { - - capacity := params.CacheCapacity - m = &MemStore{} - m.memtree = newMemTree(memTreeFLW, nil, 0) - m.ldbStore = d - m.setCapacity(capacity) - return -} - -type memTree struct { - subtree []*memTree - parent *memTree - parentIdx uint - - bits uint // log2(subtree count) - width uint // subtree count - - entry *Chunk // if subtrees are present, entry should be nil - lastDBaccess uint64 - access []uint64 -} - -func newMemTree(b uint, parent *memTree, pidx uint) (node *memTree) { - node = new(memTree) - node.bits = b - node.width = 1 << b - node.subtree = make([]*memTree, node.width) - node.access = make([]uint64, node.width-1) - node.parent = parent - node.parentIdx = pidx - if parent != nil { - parent.subtree[pidx] = node - } - - return node + cache *lru.Cache + requests *lru.Cache + mu sync.RWMutex + disabled bool } -func (node *memTree) updateAccess(a uint64) { - aidx := uint(0) - var aa uint64 - oa := node.access[0] - for node.access[aidx] == oa { - node.access[aidx] = a - if aidx > 0 { - aa = node.access[((aidx-1)^1)+1] - aidx = (aidx - 1) >> 1 - } else { - pidx := node.parentIdx - node = node.parent - if node == nil { - return - } - nn := node.subtree[pidx^1] - if nn != nil { - aa = nn.access[0] - } else { - aa = 0 - } - aidx = (node.width + pidx - 2) >> 1 - } - - if (aa != 0) && (aa < a) { - a = aa +//NewMemStore is instantiating a MemStore cache. We are keeping a record of all outgoing requests for chunks, that +//should later be delivered by peer nodes, in the `requests` LRU cache. We are also keeping all frequently requested +//chunks in the `cache` LRU cache. +// +//`requests` LRU cache capacity should ideally never be reached, this is why for the time being it should be initialised +//with the same value as the LDBStore capacity. +func NewMemStore(params *StoreParams, _ *LDBStore) (m *MemStore) { + if params.CacheCapacity == 0 { + return &MemStore{ + disabled: true, } } -} - -func (s *MemStore) setCapacity(c uint) { - s.lock.Lock() - defer s.lock.Unlock() - for c < s.entryCnt { - s.removeOldest() + onEvicted := func(key interface{}, value interface{}) { + v := value.(*Chunk) + <-v.dbStoredC } - s.capacity = c -} - -func (s *MemStore) Counter() uint { - return s.entryCnt -} - -// entry (not its copy) is going to be in MemStore -func (s *MemStore) Put(entry *Chunk) { - log.Trace("memstore.put", "key", entry.Key) - if s.capacity == 0 { - return + c, err := lru.NewWithEvict(int(params.CacheCapacity), onEvicted) + if err != nil { + panic(err) } - s.lock.Lock() - defer s.lock.Unlock() - - if s.entryCnt >= s.capacity { - s.removeOldest() + requestEvicted := func(key interface{}, value interface{}) { + log.Error("evict called on outgoing request") } - - s.accessCnt++ - - memstorePutCounter.Inc(1) - - node := s.memtree - bitpos := uint(0) - for node.entry == nil { - l := entry.Key.bits(bitpos, node.bits) - st := node.subtree[l] - if st == nil { - st = newMemTree(memTreeLW, node, l) - bitpos += node.bits - node = st - break - } - bitpos += node.bits - node = st + r, err := lru.NewWithEvict(int(params.ChunkRequestsCacheCapacity), requestEvicted) + if err != nil { + panic(err) } - if node.entry != nil { - - if node.entry.Key.isEqual(entry.Key) { - node.updateAccess(s.accessCnt) - if entry.SData == nil { - entry.Size = node.entry.Size - entry.SData = node.entry.SData - } - if entry.ReqC == nil { - entry.ReqC = node.entry.ReqC - } - entry.C = node.entry.C - node.entry = entry - return - } - - for node.entry != nil { - - l := node.entry.Key.bits(bitpos, node.bits) - st := node.subtree[l] - if st == nil { - st = newMemTree(memTreeLW, node, l) - } - st.entry = node.entry - node.entry = nil - st.updateAccess(node.access[0]) - - l = entry.Key.bits(bitpos, node.bits) - st = node.subtree[l] - if st == nil { - st = newMemTree(memTreeLW, node, l) - } - bitpos += node.bits - node = st - - } + return &MemStore{ + cache: c, + requests: r, } - - node.entry = entry - node.lastDBaccess = s.dbAccessCnt - node.updateAccess(s.accessCnt) - s.entryCnt++ } -func (s *MemStore) Get(hash Key) (chunk *Chunk, err error) { - log.Trace("memstore.get", "key", hash) - s.lock.Lock() - defer s.lock.Unlock() - - node := s.memtree - bitpos := uint(0) - for node.entry == nil { - l := hash.bits(bitpos, node.bits) - st := node.subtree[l] - if st == nil { - log.Trace("memstore.get ErrChunkNotFound", "key", hash) - return nil, ErrChunkNotFound - } - bitpos += node.bits - node = st +func (m *MemStore) Get(key Key) (*Chunk, error) { + if m.disabled { + return nil, ErrChunkNotFound } - if node.entry.Key.isEqual(hash) { - s.accessCnt++ - node.updateAccess(s.accessCnt) - chunk = node.entry - if s.dbAccessCnt-node.lastDBaccess > dbForceUpdateAccessCnt { - s.dbAccessCnt++ - node.lastDBaccess = s.dbAccessCnt - if s.ldbStore != nil { - s.ldbStore.updateAccessCnt(hash) - } - } - } else { - err = ErrChunkNotFound + m.mu.RLock() + defer m.mu.RUnlock() + + r, ok := m.requests.Get(string(key)) + // it is a request + if ok { + return r.(*Chunk), nil } - log.Trace("memstore.get return", "key", hash, "chunk", chunk, "err", err) - return + // it is not a request + c, ok := m.cache.Get(string(key)) + if !ok { + return nil, ErrChunkNotFound + } + return c.(*Chunk), nil } -func (s *MemStore) removeOldest() { - defer metrics.GetOrRegisterResettingTimer("memstore.purge", metrics.DefaultRegistry).UpdateSince(time.Now()) - - node := s.memtree - for node.entry == nil { +func (m *MemStore) Put(c *Chunk) { + if m.disabled { + return + } - aidx := uint(0) - av := node.access[aidx] + m.mu.Lock() + defer m.mu.Unlock() - for aidx < node.width/2-1 { - if av == node.access[aidx*2+1] { - node.access[aidx] = node.access[aidx*2+2] - aidx = aidx*2 + 1 - } else if av == node.access[aidx*2+2] { - node.access[aidx] = node.access[aidx*2+1] - aidx = aidx*2 + 2 - } else { - panic(nil) - } - } - pidx := aidx*2 + 2 - node.width - if (node.subtree[pidx] != nil) && (av == node.subtree[pidx].access[0]) { - if node.subtree[pidx+1] != nil { - node.access[aidx] = node.subtree[pidx+1].access[0] - } else { - node.access[aidx] = 0 - } - } else if (node.subtree[pidx+1] != nil) && (av == node.subtree[pidx+1].access[0]) { - if node.subtree[pidx] != nil { - node.access[aidx] = node.subtree[pidx].access[0] - } else { - node.access[aidx] = 0 + // it is a request + if c.ReqC != nil { + select { + case <-c.ReqC: + if c.GetErrored() != nil { + m.requests.Remove(string(c.Key)) + return } - pidx++ - } else { - panic(nil) + m.cache.Add(string(c.Key), c) + m.requests.Remove(string(c.Key)) + default: + m.requests.Add(string(c.Key), c) } - - //fmt.Println(pidx) - node = node.subtree[pidx] - - } - - if node.entry.ReqC == nil { - log.Trace(fmt.Sprintf("Memstore Clean: Waiting for chunk %v to be saved", node.entry.Key.Log())) - <-node.entry.dbStoredC - log.Trace(fmt.Sprintf("Memstore Clean: Chunk %v saved to DBStore. Ready to clear from mem.", node.entry.Key.Log())) - - memstoreRemoveCounter.Inc(1) - node.entry = nil - s.entryCnt-- - } else { return } - node.access[0] = 0 + // it is not a request + m.cache.Add(string(c.Key), c) + m.requests.Remove(string(c.Key)) +} - //--- +func (m *MemStore) setCapacity(n int) { + if n <= 0 { + m.disabled = true + } else { + onEvicted := func(key interface{}, value interface{}) { + v := value.(*Chunk) + <-v.dbStoredC + } + c, err := lru.NewWithEvict(n, onEvicted) + if err != nil { + panic(err) + } - aidx := uint(0) - for { - aa := node.access[aidx] - if aidx > 0 { - aidx = (aidx - 1) >> 1 - } else { - pidx := node.parentIdx - node = node.parent - if node == nil { - return - } - aidx = (node.width + pidx - 2) >> 1 + r, err := lru.New(defaultChunkRequestsCacheCapacity) + if err != nil { + panic(err) } - if (aa != 0) && ((aa < node.access[aidx]) || (node.access[aidx] == 0)) { - node.access[aidx] = aa + + m = &MemStore{ + cache: c, + requests: r, } } } -// type MemStore struct { -// m map[string]*Chunk -// mu sync.RWMutex -// } - -// func NewMemStore(d *DbStore, capacity uint) (m *MemStore) { -// return &MemStore{ -// m: make(map[string]*Chunk), -// } -// } - -// func (m *MemStore) Get(key Key) (*Chunk, error) { -// m.mu.RLock() -// defer m.mu.RUnlock() -// c, ok := m.m[string(key[:])] -// if !ok { -// return nil, ErrNotFound -// } -// if !bytes.Equal(c.Key, key) { -// panic(fmt.Errorf("MemStore.Get: chunk key %s != req key %s", c.Key.Hex(), key.Hex())) -// } -// return c, nil -// } - -// func (m *MemStore) Put(c *Chunk) { -// m.mu.Lock() -// defer m.mu.Unlock() -// m.m[string(c.Key[:])] = c -// } - -// func (m *MemStore) setCapacity(n int) { - -// } - -// Close memstore func (s *MemStore) Close() {} diff --git a/swarm/storage/memstore_test.go b/swarm/storage/memstore_test.go index 7164df39d73f..5dc3ae5f8602 100644 --- a/swarm/storage/memstore_test.go +++ b/swarm/storage/memstore_test.go @@ -16,10 +16,19 @@ package storage -import "testing" +import ( + "crypto/rand" + "encoding/binary" + "io/ioutil" + "os" + "sync" + "testing" + + "github.com/ethereum/go-ethereum/log" +) func newTestMemStore() *MemStore { - storeparams := NewStoreParams(defaultCacheCapacity, nil, nil) + storeparams := NewDefaultStoreParams() return NewMemStore(storeparams, nil) } @@ -96,3 +105,144 @@ func BenchmarkMemStoreGet_1_5k(b *testing.B) { func BenchmarkMemStoreGet_8_5k(b *testing.B) { benchmarkMemStoreGet(5000, 8, 4096, b) } + +func newLDBStore(t *testing.T) (*LDBStore, func()) { + dir, err := ioutil.TempDir("", "bzz-storage-test") + if err != nil { + t.Fatal(err) + } + log.Trace("memstore.tempdir", "dir", dir) + + ldbparams := NewLDBStoreParams(NewDefaultStoreParams(), dir) + db, err := NewLDBStore(ldbparams) + if err != nil { + t.Fatal(err) + } + + cleanup := func() { + db.Close() + err := os.RemoveAll(dir) + if err != nil { + t.Fatal(err) + } + } + + return db, cleanup +} + +func TestMemStoreAndLDBStore(t *testing.T) { + ldb, cleanup := newLDBStore(t) + ldb.setCapacity(4000) + defer cleanup() + + cacheCap := 200 + requestsCap := 200 + memStore := NewMemStore(NewStoreParams(4000, 200, 200, nil, nil), nil) + + tests := []struct { + n int // number of chunks to push to memStore + chunkSize uint64 // size of chunk (by default in Swarm - 4096) + request bool // whether or not to set the ReqC channel on the random chunks + }{ + { + n: 1, + chunkSize: 4096, + request: false, + }, + { + n: 201, + chunkSize: 4096, + request: false, + }, + { + n: 501, + chunkSize: 4096, + request: false, + }, + { + n: 3100, + chunkSize: 4096, + request: false, + }, + { + n: 100, + chunkSize: 4096, + request: true, + }, + } + + for i, tt := range tests { + log.Info("running test", "idx", i, "tt", tt) + var chunks []*Chunk + + for i := 0; i < tt.n; i++ { + var c *Chunk + if tt.request { + c = NewRandomRequestChunk(tt.chunkSize) + } else { + c = NewRandomChunk(tt.chunkSize) + } + + chunks = append(chunks, c) + } + + for i := 0; i < tt.n; i++ { + go ldb.Put(chunks[i]) + memStore.Put(chunks[i]) + + if got := memStore.cache.Len(); got > cacheCap { + t.Fatalf("expected to get cache capacity less than %v, but got %v", cacheCap, got) + } + + if got := memStore.requests.Len(); got > requestsCap { + t.Fatalf("expected to get requests capacity less than %v, but got %v", requestsCap, got) + } + } + + for i := 0; i < tt.n; i++ { + _, err := memStore.Get(chunks[i].Key) + if err != nil { + if err == ErrChunkNotFound { + _, err := ldb.Get(chunks[i].Key) + if err != nil { + t.Fatalf("couldn't get chunk %v from ldb, got error: %v", i, err) + } + } else { + t.Fatalf("got error from memstore: %v", err) + } + } + } + + // wait for all chunks to be stored before ending the test are cleaning up + for i := 0; i < tt.n; i++ { + <-chunks[i].dbStoredC + } + } +} + +func NewRandomChunk(chunkSize uint64) *Chunk { + c := &Chunk{ + Key: make([]byte, 32), + ReqC: nil, + SData: make([]byte, chunkSize+8), // SData should be chunkSize + 8 bytes reserved for length + dbStoredC: make(chan bool), + dbStoredMu: &sync.Mutex{}, + } + + rand.Read(c.SData) + + binary.LittleEndian.PutUint64(c.SData[:8], chunkSize) + + hasher := MakeHashFunc(SHA3Hash)() + hasher.Write(c.SData) + copy(c.Key, hasher.Sum(nil)) + + return c +} + +func NewRandomRequestChunk(chunkSize uint64) *Chunk { + c := NewRandomChunk(chunkSize) + c.ReqC = make(chan bool) + + return c +} diff --git a/swarm/storage/mock/mem/mem_test.go b/swarm/storage/mock/mem/mem_test.go index b93471c446ba..adcefaabb41d 100644 --- a/swarm/storage/mock/mem/mem_test.go +++ b/swarm/storage/mock/mem/mem_test.go @@ -22,9 +22,9 @@ import ( "github.com/ethereum/go-ethereum/swarm/storage/mock/test" ) -// TestDBStore is running test for a GlobalStore +// TestGlobalStore is running test for a GlobalStore // using test.MockStore function. -func TestMemStore(t *testing.T) { +func TestGlobalStore(t *testing.T) { test.MockStore(t, NewGlobalStore(), 100) } diff --git a/swarm/storage/types.go b/swarm/storage/types.go index cb561ee68ae9..aa2973e0de11 100644 --- a/swarm/storage/types.go +++ b/swarm/storage/types.go @@ -259,27 +259,30 @@ func (self *LazyTestSectionReader) Size(chan bool) (int64, error) { } type StoreParams struct { - Hash SwarmHasher `toml:"-"` - DbCapacity uint64 - CacheCapacity uint - BaseKey []byte + Hash SwarmHasher `toml:"-"` + DbCapacity uint64 + CacheCapacity uint + ChunkRequestsCacheCapacity uint + BaseKey []byte } -func NewStoreParams(capacity uint64, hash SwarmHasher, basekey []byte) *StoreParams { +func NewDefaultStoreParams() *StoreParams { + return NewStoreParams(defaultLDBCapacity, defaultCacheCapacity, defaultChunkRequestsCacheCapacity, nil, nil) +} + +func NewStoreParams(ldbCap uint64, cacheCap uint, requestsCap uint, hash SwarmHasher, basekey []byte) *StoreParams { if basekey == nil { basekey = make([]byte, 32) } if hash == nil { hash = MakeHashFunc("SHA3") } - if capacity == 0 { - capacity = defaultDbCapacity - } return &StoreParams{ - Hash: hash, - DbCapacity: capacity, - CacheCapacity: defaultCacheCapacity, - BaseKey: basekey, + Hash: hash, + DbCapacity: ldbCap, + CacheCapacity: cacheCap, + ChunkRequestsCacheCapacity: requestsCap, + BaseKey: basekey, } } diff --git a/swarm/swarm.go b/swarm/swarm.go index f087aba32b47..a1c96127f540 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -410,7 +410,6 @@ func (self *Swarm) periodicallyUpdateGauges() { } func (self *Swarm) updateGauges() { - cacheSizeGauge.Update(int64(self.lstore.CacheCounter())) uptimeGauge.Update(time.Since(startTime).Nanoseconds()) }