From 3ab9af74bd49d6b19d24686ed67ee91c071fb39a Mon Sep 17 00:00:00 2001 From: lash Date: Mon, 7 May 2018 20:32:47 +0200 Subject: [PATCH 1/3] swarm/storage: Add timeout to netstore private get --- swarm/storage/netstore.go | 9 ++++--- swarm/storage/resource.go | 44 +++++++++++++++++++++------------- swarm/storage/resource_test.go | 9 ++++--- 3 files changed, 40 insertions(+), 22 deletions(-) diff --git a/swarm/storage/netstore.go b/swarm/storage/netstore.go index b92860229e10..d619e9b3036d 100644 --- a/swarm/storage/netstore.go +++ b/swarm/storage/netstore.go @@ -84,7 +84,7 @@ func (self *NetStore) Get(key Key) (chunk *Chunk, err error) { defer limiter.Stop() for { - chunk, err := self.get(key) + chunk, err := self.get(key, 0) if err != ErrChunkNotFound { // break retry only if the error is nil // or other error then ErrChunkNotFound @@ -121,7 +121,10 @@ func (self *NetStore) Get(key Key) (chunk *Chunk, err error) { } } -func (self *NetStore) get(key Key) (chunk *Chunk, err error) { +func (self *NetStore) get(key Key, timeout time.Duration) (chunk *Chunk, err error) { + if timeout == 0 { + timeout = searchTimeout + } if self.retrieve == nil { chunk, err = self.localStore.Get(key) if err == nil { @@ -148,7 +151,7 @@ func (self *NetStore) get(key Key) (chunk *Chunk, err error) { } } - t := time.NewTicker(searchTimeout) + t := time.NewTicker(timeout) defer t.Stop() select { diff --git a/swarm/storage/resource.go b/swarm/storage/resource.go index 6ceeb4189cdc..ba6dbac40165 100644 --- a/swarm/storage/resource.go +++ b/swarm/storage/resource.go @@ -21,13 +21,14 @@ import ( ) const ( - signatureLength = 65 - indexSize = 18 - DbDirName = "resource" - chunkSize = 4096 // temporary until we implement DPA in the resourcehandler - defaultStoreTimeout = 4000 * time.Millisecond - hasherCount = 8 - resourceHash = SHA3Hash + signatureLength = 65 + indexSize = 18 + DbDirName = "resource" + chunkSize = 4096 // temporary until we implement DPA in the resourcehandler + defaultStoreTimeout = 4000 * time.Millisecond + hasherCount = 8 + resourceHash = SHA3Hash + defaultRetrieveTimeout = 100 * time.Millisecond ) type blockEstimator struct { @@ -49,6 +50,14 @@ func (b *blockEstimator) HeaderByNumber(context.Context, string, *big.Int) (*typ }, nil } +type resourceChunkStore struct { + *NetStore +} + +func (self *resourceChunkStore) GetSingle(key Key, timeout time.Duration) (*Chunk, error) { + return self.get(key, timeout) +} + type ResourceError struct { code int err string @@ -163,7 +172,7 @@ type headerGetter interface { // // TODO: Include modtime in chunk data + signature type ResourceHandler struct { - chunkStore ChunkStore + chunkStore *resourceChunkStore HashSize int signer ResourceSigner ethClient headerGetter @@ -215,8 +224,10 @@ func NewResourceHandler(params *ResourceHandlerParams) (*ResourceHandler, error) } // Sets the store backend for resource updates -func (self *ResourceHandler) SetStore(store ChunkStore) { - self.chunkStore = store +func (self *ResourceHandler) SetStore(store *NetStore) { + self.chunkStore = &resourceChunkStore{ + NetStore: store, + } } // Chunk Validation method (matches ChunkValidatorFunc signature) @@ -496,7 +507,7 @@ func (self *ResourceHandler) lookup(rsrc *resource, period uint32, version uint3 return nil, NewResourceError(ErrPeriodDepth, fmt.Sprintf("Lookup exceeded max period hops (%d)", maxLookup.Max)) } key := self.resourceHash(period, version, rsrc.nameHash) - chunk, err := self.chunkStore.Get(key) + chunk, err := self.chunkStore.GetSingle(key, defaultRetrieveTimeout) if err == nil { if specificversion { return self.updateResourceIndex(rsrc, chunk) @@ -506,7 +517,7 @@ func (self *ResourceHandler) lookup(rsrc *resource, period uint32, version uint3 for { newversion := version + 1 key := self.resourceHash(period, newversion, rsrc.nameHash) - newchunk, err := self.chunkStore.Get(key) + newchunk, err := self.chunkStore.GetSingle(key, defaultRetrieveTimeout) if err != nil { return self.updateResourceIndex(rsrc, chunk) } @@ -542,7 +553,7 @@ func (self *ResourceHandler) loadResource(nameHash common.Hash, name string, ref rsrc.nameHash = nameHash // get the root info chunk and update the cached value - chunk, err := self.chunkStore.Get(Key(rsrc.nameHash[:])) + chunk, err := self.chunkStore.GetSingle(Key(rsrc.nameHash[:]), defaultRetrieveTimeout) if err != nil { return nil, err } @@ -926,15 +937,16 @@ func NewTestResourceHandler(datadir string, params *ResourceHandlerParams) (*Res path := filepath.Join(datadir, DbDirName) rh, err := NewResourceHandler(params) if err != nil { - return nil, err + return nil, fmt.Errorf("resource handler create fail: %v", err) } localstoreparams := NewDefaultLocalStoreParams() localstoreparams.Init(path) localStore, err := NewLocalStore(localstoreparams, nil) if err != nil { - return nil, err + return nil, fmt.Errorf("localstore create fail, path %s: %v", path, err) } localStore.Validators = append(localStore.Validators, rh) - rh.SetStore(localStore) + dpaStore := NewNetStore(localStore, nil) + rh.SetStore(dpaStore) return rh, nil } diff --git a/swarm/storage/resource_test.go b/swarm/storage/resource_test.go index f19243a10f90..e8cfb95653b5 100644 --- a/swarm/storage/resource_test.go +++ b/swarm/storage/resource_test.go @@ -163,7 +163,7 @@ func TestResourceHandler(t *testing.T) { // check that the new resource is stored correctly namehash := ens.EnsNode(safeName) - chunk, err := rh.chunkStore.(*LocalStore).memStore.Get(Key(namehash[:])) + chunk, err := rh.chunkStore.NetStore.localStore.memStore.Get(Key(namehash[:])) if err != nil { t.Fatal(err) } else if len(chunk.SData) < 16 { @@ -232,6 +232,8 @@ func TestResourceHandler(t *testing.T) { Signer: nil, EthClient: rh.ethClient, } + + rh.chunkStore.NetStore.localStore.Close() rh2, err := NewTestResourceHandler(datadir, rhparams) if err != nil { t.Fatal(err) @@ -448,6 +450,7 @@ func TestResourceMultihash(t *testing.T) { EnsClient: rh.ensClient, } // test with signed data + rh.chunkStore.NetStore.localStore.Close() rh2, err := NewTestResourceHandler(datadir, rhparams) if err != nil { t.Fatal(err) @@ -579,7 +582,7 @@ func setupTest(backend headerGetter, ensBackend *ens.ENS, signer ResourceSigner) EnsClient: ensBackend, } rh, err = NewTestResourceHandler(datadir, rhparams) - return rh, datadir, cleanF, nil + return rh, datadir, cleanF, err } // Set up simulated ENS backend for use with ENSResourceHandler tests @@ -636,7 +639,7 @@ func newTestSigner() (*GenericResourceSigner, error) { } func getUpdateDirect(rh *ResourceHandler, key Key) ([]byte, error) { - chunk, err := rh.chunkStore.(*LocalStore).memStore.Get(key) + chunk, err := rh.chunkStore.NetStore.localStore.memStore.Get(key) if err != nil { return nil, err } From f4c0bde272c5f2bbc07afd709704f076ed8e0fe1 Mon Sep 17 00:00:00 2001 From: lash Date: Mon, 7 May 2018 20:39:31 +0200 Subject: [PATCH 2/3] swarm/storage: Skip redundant abstraction --- swarm/storage/resource.go | 20 +++++--------------- swarm/storage/resource_test.go | 8 ++++---- 2 files changed, 9 insertions(+), 19 deletions(-) diff --git a/swarm/storage/resource.go b/swarm/storage/resource.go index ba6dbac40165..0fef3015367e 100644 --- a/swarm/storage/resource.go +++ b/swarm/storage/resource.go @@ -50,14 +50,6 @@ func (b *blockEstimator) HeaderByNumber(context.Context, string, *big.Int) (*typ }, nil } -type resourceChunkStore struct { - *NetStore -} - -func (self *resourceChunkStore) GetSingle(key Key, timeout time.Duration) (*Chunk, error) { - return self.get(key, timeout) -} - type ResourceError struct { code int err string @@ -172,7 +164,7 @@ type headerGetter interface { // // TODO: Include modtime in chunk data + signature type ResourceHandler struct { - chunkStore *resourceChunkStore + chunkStore *NetStore HashSize int signer ResourceSigner ethClient headerGetter @@ -225,9 +217,7 @@ func NewResourceHandler(params *ResourceHandlerParams) (*ResourceHandler, error) // Sets the store backend for resource updates func (self *ResourceHandler) SetStore(store *NetStore) { - self.chunkStore = &resourceChunkStore{ - NetStore: store, - } + self.chunkStore = store } // Chunk Validation method (matches ChunkValidatorFunc signature) @@ -507,7 +497,7 @@ func (self *ResourceHandler) lookup(rsrc *resource, period uint32, version uint3 return nil, NewResourceError(ErrPeriodDepth, fmt.Sprintf("Lookup exceeded max period hops (%d)", maxLookup.Max)) } key := self.resourceHash(period, version, rsrc.nameHash) - chunk, err := self.chunkStore.GetSingle(key, defaultRetrieveTimeout) + chunk, err := self.chunkStore.get(key, defaultRetrieveTimeout) if err == nil { if specificversion { return self.updateResourceIndex(rsrc, chunk) @@ -517,7 +507,7 @@ func (self *ResourceHandler) lookup(rsrc *resource, period uint32, version uint3 for { newversion := version + 1 key := self.resourceHash(period, newversion, rsrc.nameHash) - newchunk, err := self.chunkStore.GetSingle(key, defaultRetrieveTimeout) + newchunk, err := self.chunkStore.get(key, defaultRetrieveTimeout) if err != nil { return self.updateResourceIndex(rsrc, chunk) } @@ -553,7 +543,7 @@ func (self *ResourceHandler) loadResource(nameHash common.Hash, name string, ref rsrc.nameHash = nameHash // get the root info chunk and update the cached value - chunk, err := self.chunkStore.GetSingle(Key(rsrc.nameHash[:]), defaultRetrieveTimeout) + chunk, err := self.chunkStore.get(Key(rsrc.nameHash[:]), defaultRetrieveTimeout) if err != nil { return nil, err } diff --git a/swarm/storage/resource_test.go b/swarm/storage/resource_test.go index e8cfb95653b5..f0c1ca664428 100644 --- a/swarm/storage/resource_test.go +++ b/swarm/storage/resource_test.go @@ -163,7 +163,7 @@ func TestResourceHandler(t *testing.T) { // check that the new resource is stored correctly namehash := ens.EnsNode(safeName) - chunk, err := rh.chunkStore.NetStore.localStore.memStore.Get(Key(namehash[:])) + chunk, err := rh.chunkStore.localStore.memStore.Get(Key(namehash[:])) if err != nil { t.Fatal(err) } else if len(chunk.SData) < 16 { @@ -233,7 +233,7 @@ func TestResourceHandler(t *testing.T) { EthClient: rh.ethClient, } - rh.chunkStore.NetStore.localStore.Close() + rh.chunkStore.localStore.Close() rh2, err := NewTestResourceHandler(datadir, rhparams) if err != nil { t.Fatal(err) @@ -450,7 +450,7 @@ func TestResourceMultihash(t *testing.T) { EnsClient: rh.ensClient, } // test with signed data - rh.chunkStore.NetStore.localStore.Close() + rh.chunkStore.localStore.Close() rh2, err := NewTestResourceHandler(datadir, rhparams) if err != nil { t.Fatal(err) @@ -639,7 +639,7 @@ func newTestSigner() (*GenericResourceSigner, error) { } func getUpdateDirect(rh *ResourceHandler, key Key) ([]byte, error) { - chunk, err := rh.chunkStore.NetStore.localStore.memStore.Get(key) + chunk, err := rh.chunkStore.localStore.memStore.Get(key) if err != nil { return nil, err } From bb1287fb455937f849a375fb7c4d14be39b6ebcb Mon Sep 17 00:00:00 2001 From: lash Date: Tue, 8 May 2018 07:47:18 +0200 Subject: [PATCH 3/3] swarm: Change to NetStore for resource handler in swarm.go --- swarm/swarm.go | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/swarm/swarm.go b/swarm/swarm.go index 2b75681b98d7..0a322a7dd62b 100644 --- a/swarm/swarm.go +++ b/swarm/swarm.go @@ -175,6 +175,22 @@ func NewSwarm(ctx *node.ServiceContext, backend chequebook.Backend, config *api. return } + db := storage.NewDBAPI(self.lstore) + delivery := stream.NewDelivery(to, db) + + self.streamer = stream.NewRegistry(addr, delivery, db, stateStore, &stream.RegistryOptions{ + DoSync: config.SyncEnabled, + DoRetrieve: true, + SyncUpdateDelay: config.SyncUpdateDelay, + }) + + // set up DPA, the cloud storage local access layer + dpaChunkStore := storage.NewNetStore(self.lstore, self.streamer.Retrieve) + log.Debug(fmt.Sprintf("-> Local Access to Swarm")) + // Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage + self.dpa = storage.NewDPA(dpaChunkStore, self.config.DPAParams) + log.Debug(fmt.Sprintf("-> Content Store API")) + if ensresolver == nil { log.Warn("No ENS API specified, resource updates will NOT validate resource update chunks") } @@ -202,7 +218,7 @@ func NewSwarm(ctx *node.ServiceContext, backend chequebook.Backend, config *api. if err != nil { return nil, err } - resourceHandler.SetStore(self.lstore) + resourceHandler.SetStore(dpaChunkStore) var validators []storage.ChunkValidator validators = append(validators, storage.NewContentAddressValidator(storage.MakeHashFunc(storage.DefaultHash))) @@ -214,22 +230,6 @@ func NewSwarm(ctx *node.ServiceContext, backend chequebook.Backend, config *api. // setup local store log.Debug(fmt.Sprintf("Set up local storage")) - db := storage.NewDBAPI(self.lstore) - delivery := stream.NewDelivery(to, db) - - self.streamer = stream.NewRegistry(addr, delivery, db, stateStore, &stream.RegistryOptions{ - DoSync: config.SyncEnabled, - DoRetrieve: true, - SyncUpdateDelay: config.SyncUpdateDelay, - }) - - // set up DPA, the cloud storage local access layer - dpaChunkStore := storage.NewNetStore(self.lstore, self.streamer.Retrieve) - log.Debug(fmt.Sprintf("-> Local Access to Swarm")) - // Swarm Hash Merklised Chunking for Arbitrary-length Document/File storage - self.dpa = storage.NewDPA(dpaChunkStore, self.config.DPAParams) - log.Debug(fmt.Sprintf("-> Content Store API")) - self.bzz = network.NewBzz(bzzconfig, to, stateStore, stream.Spec, self.streamer.Run) // Pss = postal service over swarm (devp2p over bzz)