Skip to content

Commit

Permalink
Merge pull request ethereum#513 from ethersphere/resource-netstore-si…
Browse files Browse the repository at this point in the history
…ngle-timeout

swarm/storage: Add timeout to netstore private get
  • Loading branch information
nolash authored May 10, 2018
2 parents 7ec3776 + bb1287f commit 02423b0
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 38 deletions.
9 changes: 6 additions & 3 deletions swarm/storage/netstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (self *NetStore) Get(key Key) (chunk *Chunk, err error) {
defer limiter.Stop()

for {
chunk, err := self.get(key)
chunk, err := self.get(key, 0)
if err != ErrChunkNotFound {
// break retry only if the error is nil
// or other error then ErrChunkNotFound
Expand Down Expand Up @@ -121,7 +121,10 @@ func (self *NetStore) Get(key Key) (chunk *Chunk, err error) {
}
}

func (self *NetStore) get(key Key) (chunk *Chunk, err error) {
func (self *NetStore) get(key Key, timeout time.Duration) (chunk *Chunk, err error) {
if timeout == 0 {
timeout = searchTimeout
}
if self.retrieve == nil {
chunk, err = self.localStore.Get(key)
if err == nil {
Expand All @@ -148,7 +151,7 @@ func (self *NetStore) get(key Key) (chunk *Chunk, err error) {
}
}

t := time.NewTicker(searchTimeout)
t := time.NewTicker(timeout)
defer t.Stop()

select {
Expand Down
32 changes: 17 additions & 15 deletions swarm/storage/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ import (
)

const (
signatureLength = 65
indexSize = 18
DbDirName = "resource"
chunkSize = 4096 // temporary until we implement DPA in the resourcehandler
defaultStoreTimeout = 4000 * time.Millisecond
hasherCount = 8
resourceHash = SHA3Hash
signatureLength = 65
indexSize = 18
DbDirName = "resource"
chunkSize = 4096 // temporary until we implement DPA in the resourcehandler
defaultStoreTimeout = 4000 * time.Millisecond
hasherCount = 8
resourceHash = SHA3Hash
defaultRetrieveTimeout = 100 * time.Millisecond
)

type blockEstimator struct {
Expand Down Expand Up @@ -163,7 +164,7 @@ type headerGetter interface {
//
// TODO: Include modtime in chunk data + signature
type ResourceHandler struct {
chunkStore ChunkStore
chunkStore *NetStore
HashSize int
signer ResourceSigner
ethClient headerGetter
Expand Down Expand Up @@ -215,7 +216,7 @@ func NewResourceHandler(params *ResourceHandlerParams) (*ResourceHandler, error)
}

// Sets the store backend for resource updates
func (self *ResourceHandler) SetStore(store ChunkStore) {
func (self *ResourceHandler) SetStore(store *NetStore) {
self.chunkStore = store
}

Expand Down Expand Up @@ -496,7 +497,7 @@ func (self *ResourceHandler) lookup(rsrc *resource, period uint32, version uint3
return nil, NewResourceError(ErrPeriodDepth, fmt.Sprintf("Lookup exceeded max period hops (%d)", maxLookup.Max))
}
key := self.resourceHash(period, version, rsrc.nameHash)
chunk, err := self.chunkStore.Get(key)
chunk, err := self.chunkStore.get(key, defaultRetrieveTimeout)
if err == nil {
if specificversion {
return self.updateResourceIndex(rsrc, chunk)
Expand All @@ -506,7 +507,7 @@ func (self *ResourceHandler) lookup(rsrc *resource, period uint32, version uint3
for {
newversion := version + 1
key := self.resourceHash(period, newversion, rsrc.nameHash)
newchunk, err := self.chunkStore.Get(key)
newchunk, err := self.chunkStore.get(key, defaultRetrieveTimeout)
if err != nil {
return self.updateResourceIndex(rsrc, chunk)
}
Expand Down Expand Up @@ -542,7 +543,7 @@ func (self *ResourceHandler) loadResource(nameHash common.Hash, name string, ref
rsrc.nameHash = nameHash

// get the root info chunk and update the cached value
chunk, err := self.chunkStore.Get(Key(rsrc.nameHash[:]))
chunk, err := self.chunkStore.get(Key(rsrc.nameHash[:]), defaultRetrieveTimeout)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -953,15 +954,16 @@ func NewTestResourceHandler(datadir string, params *ResourceHandlerParams) (*Res
path := filepath.Join(datadir, DbDirName)
rh, err := NewResourceHandler(params)
if err != nil {
return nil, err
return nil, fmt.Errorf("resource handler create fail: %v", err)
}
localstoreparams := NewDefaultLocalStoreParams()
localstoreparams.Init(path)
localStore, err := NewLocalStore(localstoreparams, nil)
if err != nil {
return nil, err
return nil, fmt.Errorf("localstore create fail, path %s: %v", path, err)
}
localStore.Validators = append(localStore.Validators, rh)
rh.SetStore(localStore)
dpaStore := NewNetStore(localStore, nil)
rh.SetStore(dpaStore)
return rh, nil
}
9 changes: 6 additions & 3 deletions swarm/storage/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestResourceHandler(t *testing.T) {

// check that the new resource is stored correctly
namehash := ens.EnsNode(safeName)
chunk, err := rh.chunkStore.(*LocalStore).memStore.Get(Key(namehash[:]))
chunk, err := rh.chunkStore.localStore.memStore.Get(Key(namehash[:]))
if err != nil {
t.Fatal(err)
} else if len(chunk.SData) < 16 {
Expand Down Expand Up @@ -232,6 +232,8 @@ func TestResourceHandler(t *testing.T) {
Signer: nil,
EthClient: rh.ethClient,
}

rh.chunkStore.localStore.Close()
rh2, err := NewTestResourceHandler(datadir, rhparams)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -448,6 +450,7 @@ func TestResourceMultihash(t *testing.T) {
EnsClient: rh.ensClient,
}
// test with signed data
rh.chunkStore.localStore.Close()
rh2, err := NewTestResourceHandler(datadir, rhparams)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -579,7 +582,7 @@ func setupTest(backend headerGetter, ensBackend *ens.ENS, signer ResourceSigner)
EnsClient: ensBackend,
}
rh, err = NewTestResourceHandler(datadir, rhparams)
return rh, datadir, cleanF, nil
return rh, datadir, cleanF, err
}

// Set up simulated ENS backend for use with ENSResourceHandler tests
Expand Down Expand Up @@ -636,7 +639,7 @@ func newTestSigner() (*GenericResourceSigner, error) {
}

func getUpdateDirect(rh *ResourceHandler, key Key) ([]byte, error) {
chunk, err := rh.chunkStore.(*LocalStore).memStore.Get(key)
chunk, err := rh.chunkStore.localStore.memStore.Get(key)
if err != nil {
return nil, err
}
Expand Down
34 changes: 17 additions & 17 deletions swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,22 @@ func NewSwarm(ctx *node.ServiceContext, backend chequebook.Backend, config *api.
return
}

db := storage.NewDBAPI(self.lstore)
delivery := stream.NewDelivery(to, db)

self.streamer = stream.NewRegistry(addr, delivery, db, stateStore, &stream.RegistryOptions{
DoSync: config.SyncEnabled,
DoRetrieve: true,
SyncUpdateDelay: config.SyncUpdateDelay,
})

// set up DPA, the cloud storage local access layer
dpaChunkStore := storage.NewNetStore(self.lstore, self.streamer.Retrieve)
log.Debug(fmt.Sprintf("-> Local Access to Swarm"))
// Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage
self.dpa = storage.NewDPA(dpaChunkStore, self.config.DPAParams)
log.Debug(fmt.Sprintf("-> Content Store API"))

if ensresolver == nil {
log.Warn("No ENS API specified, resource updates will NOT validate resource update chunks")
}
Expand Down Expand Up @@ -202,7 +218,7 @@ func NewSwarm(ctx *node.ServiceContext, backend chequebook.Backend, config *api.
if err != nil {
return nil, err
}
resourceHandler.SetStore(self.lstore)
resourceHandler.SetStore(dpaChunkStore)

var validators []storage.ChunkValidator
validators = append(validators, storage.NewContentAddressValidator(storage.MakeHashFunc(storage.DefaultHash)))
Expand All @@ -214,22 +230,6 @@ func NewSwarm(ctx *node.ServiceContext, backend chequebook.Backend, config *api.
// setup local store
log.Debug(fmt.Sprintf("Set up local storage"))

db := storage.NewDBAPI(self.lstore)
delivery := stream.NewDelivery(to, db)

self.streamer = stream.NewRegistry(addr, delivery, db, stateStore, &stream.RegistryOptions{
DoSync: config.SyncEnabled,
DoRetrieve: true,
SyncUpdateDelay: config.SyncUpdateDelay,
})

// set up DPA, the cloud storage local access layer
dpaChunkStore := storage.NewNetStore(self.lstore, self.streamer.Retrieve)
log.Debug(fmt.Sprintf("-> Local Access to Swarm"))
// Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage
self.dpa = storage.NewDPA(dpaChunkStore, self.config.DPAParams)
log.Debug(fmt.Sprintf("-> Content Store API"))

self.bzz = network.NewBzz(bzzconfig, to, stateStore, stream.Spec, self.streamer.Run)

// Pss = postal service over swarm (devp2p over bzz)
Expand Down

0 comments on commit 02423b0

Please sign in to comment.