From 266bf37023b72d323fc5b5b7ac71752a7a67e3a6 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Mon, 26 Oct 2020 15:57:58 -0700 Subject: [PATCH 1/6] test(benchmarks): add large file tests add tests of large file processing to benchmarks --- benchmarks/benchmark_test.go | 59 +++++++++++-------------- benchmarks/testinstance/testinstance.go | 47 +++++++++++++------- go.mod | 1 + go.sum | 2 + 4 files changed, 59 insertions(+), 50 deletions(-) diff --git a/benchmarks/benchmark_test.go b/benchmarks/benchmark_test.go index 7f06cf97..6c8634a8 100644 --- a/benchmarks/benchmark_test.go +++ b/benchmarks/benchmark_test.go @@ -35,6 +35,7 @@ import ( "github.com/ipfs/go-graphsync/benchmarks/testinstance" tn "github.com/ipfs/go-graphsync/benchmarks/testnet" + graphsync "github.com/ipfs/go-graphsync/impl" ) const stdBlockSize = 8000 @@ -51,19 +52,25 @@ func BenchmarkRoundtripSuccess(b *testing.B) { tdm, err := newTempDirMaker(b) require.NoError(b, err) b.Run("test-20-10000", func(b *testing.B) { - subtestDistributeAndFetch(ctx, b, 20, delay.Fixed(0), time.Duration(0), allFilesUniformSize(10000, defaultUnixfsChunkSize, defaultUnixfsLinksPerLevel), tdm) + subtestDistributeAndFetch(ctx, b, 20, delay.Fixed(0), time.Duration(0), allFilesUniformSize(10000, defaultUnixfsChunkSize, defaultUnixfsLinksPerLevel, true), tdm) }) b.Run("test-20-128MB", func(b *testing.B) { - subtestDistributeAndFetch(ctx, b, 10, delay.Fixed(0), time.Duration(0), allFilesUniformSize(128*(1<<20), defaultUnixfsChunkSize, defaultUnixfsLinksPerLevel), tdm) + subtestDistributeAndFetch(ctx, b, 20, delay.Fixed(0), time.Duration(0), allFilesUniformSize(128*(1<<20), defaultUnixfsChunkSize, defaultUnixfsLinksPerLevel, true), tdm) }) b.Run("test-p2p-stress-10-128MB", func(b *testing.B) { - p2pStrestTest(ctx, b, 20, allFilesUniformSize(128*(1<<20), 1<<20, 1024), tdm) + p2pStrestTest(ctx, b, 10, allFilesUniformSize(128*(1<<20), 1<<20, 1024, true), tdm, nil, false) }) b.Run("test-p2p-stress-10-128MB-1KB-chunks", func(b *testing.B) { - p2pStrestTest(ctx, b, 10, allFilesUniformSize(128*(1<<20), 1<<10, 1024), tdm) + p2pStrestTest(ctx, b, 10, allFilesUniformSize(128*(1<<20), 1<<10, 1024, true), tdm, nil, false) + }) + b.Run("test-p2p-stress-1-1GB-memory-pressure", func(b *testing.B) { + p2pStrestTest(ctx, b, 1, allFilesUniformSize(1*(1<<30), 1<<20, 1024, true), tdm, []graphsync.Option{}, true) + }) + b.Run("test-p2p-stress-1-1GB-memory-pressure-no-raw-nodes", func(b *testing.B) { + p2pStrestTest(ctx, b, 1, allFilesUniformSize(1*(1<<30), 1<<20, 1024, false), tdm, []graphsync.Option{}, true) }) b.Run("test-repeated-disconnects-20-10000", func(b *testing.B) { - benchmarkRepeatedDisconnects(ctx, b, 20, allFilesUniformSize(10000, defaultUnixfsChunkSize, defaultUnixfsLinksPerLevel), tdm) + benchmarkRepeatedDisconnects(ctx, b, 20, allFilesUniformSize(10000, defaultUnixfsChunkSize, defaultUnixfsLinksPerLevel, true), tdm) }) } @@ -71,7 +78,7 @@ func benchmarkRepeatedDisconnects(ctx context.Context, b *testing.B, numnodes in ctx, cancel := context.WithCancel(ctx) mn := mocknet.New(ctx) net := tn.StreamNet(ctx, mn) - ig := testinstance.NewTestInstanceGenerator(ctx, net, nil, tdm) + ig := testinstance.NewTestInstanceGenerator(ctx, net, nil, tdm, false) instances, err := ig.Instances(numnodes + 1) require.NoError(b, err) var allCids [][]cid.Cid @@ -132,13 +139,13 @@ func benchmarkRepeatedDisconnects(ctx context.Context, b *testing.B, numnodes in ig.Close() } -func p2pStrestTest(ctx context.Context, b *testing.B, numfiles int, df distFunc, tdm *tempDirMaker) { +func p2pStrestTest(ctx context.Context, b *testing.B, numfiles int, df distFunc, tdm *tempDirMaker, options []graphsync.Option, diskBasedDatastore bool) { ctx, cancel := context.WithCancel(ctx) defer cancel() mn := mocknet.New(ctx) mn.SetLinkDefaults(mocknet.LinkOptions{Latency: 100 * time.Millisecond, Bandwidth: 3000000}) net := tn.StreamNet(ctx, mn) - ig := testinstance.NewTestInstanceGenerator(ctx, net, nil, tdm) + ig := testinstance.NewTestInstanceGenerator(ctx, net, options, tdm, diskBasedDatastore) instances, err := ig.Instances(1 + b.N) require.NoError(b, err) var allCids []cid.Cid @@ -160,32 +167,16 @@ func p2pStrestTest(ctx context.Context, b *testing.B, numfiles int, df distFunc, ctx, cancel := context.WithTimeout(ctx, 10*time.Second) require.NoError(b, err) start := time.Now() - disconnectOn := rand.Intn(numfiles) for j := 0; j < numfiles; j++ { - resultChan, errChan := fetcher.Exchange.Request(ctx, instances[0].Peer, cidlink.Link{Cid: allCids[j]}, allSelector) + responseChan, errChan := fetcher.Exchange.Request(ctx, instances[0].Peer, cidlink.Link{Cid: allCids[j]}, allSelector) wg.Add(1) go func(j int) { defer wg.Done() - results := 0 - for { - select { - case <-ctx.Done(): - return - case <-resultChan: - results++ - if results == 100 && j == disconnectOn { - mn.DisconnectPeers(instances[0].Peer, instances[i+1].Peer) - mn.UnlinkPeers(instances[0].Peer, instances[i+1].Peer) - time.Sleep(100 * time.Millisecond) - mn.LinkPeers(instances[0].Peer, instances[i+1].Peer) - } - case err, ok := <-errChan: - if !ok { - return - } - b.Fatalf("received error on request: %s", err.Error()) - } + for _ = range responseChan { + } + for err := range errChan { + b.Fatalf("received error on request: %s", err.Error()) } }(j) } @@ -205,7 +196,7 @@ func subtestDistributeAndFetch(ctx context.Context, b *testing.B, numnodes int, ctx, cancel := context.WithCancel(ctx) defer cancel() net := tn.VirtualNetwork(d) - ig := testinstance.NewTestInstanceGenerator(ctx, net, nil, tdm) + ig := testinstance.NewTestInstanceGenerator(ctx, net, nil, tdm, false) instances, err := ig.Instances(numnodes + b.N) require.NoError(b, err) destCids := df(ctx, b, instances[:numnodes]) @@ -268,7 +259,7 @@ type distFunc func(ctx context.Context, b *testing.B, provs []testinstance.Insta const defaultUnixfsChunkSize uint64 = 1 << 10 const defaultUnixfsLinksPerLevel = 1024 -func loadRandomUnixFxFile(ctx context.Context, b *testing.B, bs blockstore.Blockstore, size uint64, unixfsChunkSize uint64, unixfsLinksPerLevel int) cid.Cid { +func loadRandomUnixFxFile(ctx context.Context, b *testing.B, bs blockstore.Blockstore, size uint64, unixfsChunkSize uint64, unixfsLinksPerLevel int, useRawNodes bool) cid.Cid { data := make([]byte, size) _, err := rand.Read(data) @@ -283,7 +274,7 @@ func loadRandomUnixFxFile(ctx context.Context, b *testing.B, bs blockstore.Block params := ihelper.DagBuilderParams{ Maxlinks: unixfsLinksPerLevel, - RawLeaves: true, + RawLeaves: useRawNodes, CidBuilder: nil, Dagserv: bufferedDS, } @@ -300,11 +291,11 @@ func loadRandomUnixFxFile(ctx context.Context, b *testing.B, bs blockstore.Block return nd.Cid() } -func allFilesUniformSize(size uint64, unixfsChunkSize uint64, unixfsLinksPerLevel int) distFunc { +func allFilesUniformSize(size uint64, unixfsChunkSize uint64, unixfsLinksPerLevel int, useRawNodes bool) distFunc { return func(ctx context.Context, b *testing.B, provs []testinstance.Instance) []cid.Cid { cids := make([]cid.Cid, 0, len(provs)) for _, prov := range provs { - c := loadRandomUnixFxFile(ctx, b, prov.BlockStore, size, unixfsChunkSize, unixfsLinksPerLevel) + c := loadRandomUnixFxFile(ctx, b, prov.BlockStore, size, unixfsChunkSize, unixfsLinksPerLevel, useRawNodes) cids = append(cids, c) } return cids diff --git a/benchmarks/testinstance/testinstance.go b/benchmarks/testinstance/testinstance.go index b5b48f40..1b4048cd 100644 --- a/benchmarks/testinstance/testinstance.go +++ b/benchmarks/testinstance/testinstance.go @@ -8,6 +8,7 @@ import ( ds "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/delayed" ds_sync "github.com/ipfs/go-datastore/sync" + badgerds "github.com/ipfs/go-ds-badger" blockstore "github.com/ipfs/go-ipfs-blockstore" delay "github.com/ipfs/go-ipfs-delay" "github.com/ipld/go-ipld-prime" @@ -29,26 +30,28 @@ type TempDirGenerator interface { // NewTestInstanceGenerator generates a new InstanceGenerator for the given // testnet -func NewTestInstanceGenerator(ctx context.Context, net tn.Network, gsOptions []gsimpl.Option, tempDirGenerator TempDirGenerator) InstanceGenerator { +func NewTestInstanceGenerator(ctx context.Context, net tn.Network, gsOptions []gsimpl.Option, tempDirGenerator TempDirGenerator, diskBasedDatastore bool) InstanceGenerator { ctx, cancel := context.WithCancel(ctx) return InstanceGenerator{ - net: net, - seq: 0, - ctx: ctx, // TODO take ctx as param to Next, Instances - cancel: cancel, - gsOptions: gsOptions, - tempDirGenerator: tempDirGenerator, + net: net, + seq: 0, + ctx: ctx, // TODO take ctx as param to Next, Instances + cancel: cancel, + gsOptions: gsOptions, + tempDirGenerator: tempDirGenerator, + diskBasedDatastore: diskBasedDatastore, } } // InstanceGenerator generates new test instances of bitswap+dependencies type InstanceGenerator struct { - seq int - net tn.Network - ctx context.Context - cancel context.CancelFunc - gsOptions []gsimpl.Option - tempDirGenerator TempDirGenerator + seq int + net tn.Network + ctx context.Context + cancel context.CancelFunc + gsOptions []gsimpl.Option + tempDirGenerator TempDirGenerator + diskBasedDatastore bool } // Close closes the clobal context, shutting down all test instances @@ -64,7 +67,7 @@ func (g *InstanceGenerator) Next() (Instance, error) { if err != nil { return Instance{}, err } - return NewInstance(g.ctx, g.net, p, g.gsOptions, g.tempDirGenerator.TempDir()) + return NewInstance(g.ctx, g.net, p, g.gsOptions, g.tempDirGenerator.TempDir(), g.diskBasedDatastore) } // Instances creates N test instances of bitswap + dependencies and connects @@ -138,11 +141,23 @@ func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration { // NB: It's easy make mistakes by providing the same peer ID to two different // instances. To safeguard, use the InstanceGenerator to generate instances. It's // just a much better idea. -func NewInstance(ctx context.Context, net tn.Network, p tnet.Identity, gsOptions []gsimpl.Option, tempDir string) (Instance, error) { +func NewInstance(ctx context.Context, net tn.Network, p tnet.Identity, gsOptions []gsimpl.Option, tempDir string, diskBasedDatastore bool) (Instance, error) { bsdelay := delay.Fixed(0) adapter := net.Adapter(p) - dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay)) + var dstore datastore.Batching + var err error + if diskBasedDatastore { + defopts := badgerds.DefaultOptions + defopts.SyncWrites = false + defopts.Truncate = true + dstore, err = badgerds.NewDatastore(tempDir, &defopts) + if err != nil { + return Instance{}, err + } + } else { + dstore = ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay)) + } bstore, err := blockstore.CachedBlockstore(ctx, blockstore.NewBlockstore(dstore), blockstore.DefaultCacheOpts()) diff --git a/go.mod b/go.mod index 05fbe725..8f8e7a07 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/ipfs/go-blockservice v0.1.3 github.com/ipfs/go-cid v0.0.6 github.com/ipfs/go-datastore v0.4.4 + github.com/ipfs/go-ds-badger v0.2.1 github.com/ipfs/go-ipfs-blockstore v0.1.4 github.com/ipfs/go-ipfs-blocksutil v0.0.1 github.com/ipfs/go-ipfs-chunker v0.0.5 diff --git a/go.sum b/go.sum index 91bdb59e..260b4e07 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,6 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= +github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9 h1:HD8gA2tkByhMAwYaFAX9w2l7vxvBQ5NMoxDrkhqhtn4= github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= @@ -168,6 +169,7 @@ github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps= github.com/ipfs/go-ds-badger v0.0.2/go.mod h1:Y3QpeSFWQf6MopLTiZD+VT6IC1yZqaGmjvRcKeSGij8= github.com/ipfs/go-ds-badger v0.0.5/go.mod h1:g5AuuCGmr7efyzQhLL8MzwqcauPojGPUaHzfGTzuE3s= +github.com/ipfs/go-ds-badger v0.2.1 h1:RsC9DDlwFhFdfT+s2PeC8joxbSp2YMufK8w/RBOxKtk= github.com/ipfs/go-ds-badger v0.2.1/go.mod h1:Tx7l3aTph3FMFrRS838dcSJh+jjA7cX9DrGVwx/NOwE= github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc= github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s= From eb896e7d8d841aa26e512146203acb4dfd43b61d Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 21 Oct 2020 04:43:53 -0700 Subject: [PATCH 2/6] feat(allocator): add allocator for memory backpressure add an allocator that manages global memory allocations on responder and blocks peerresponsesenders as needed --- go.mod | 2 + impl/graphsync.go | 38 ++- responsemanager/allocator/allocator.go | 232 ++++++++++++++++++ responsemanager/allocator/allocator_test.go | 219 +++++++++++++++++ .../peerresponsemanager/peerresponsesender.go | 60 ++++- .../peerresponsesender_test.go | 84 ++++++- 6 files changed, 611 insertions(+), 24 deletions(-) create mode 100644 responsemanager/allocator/allocator.go create mode 100644 responsemanager/allocator/allocator_test.go diff --git a/go.mod b/go.mod index 8f8e7a07..19fabf23 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/ipfs/go-ipfs-delay v0.0.1 github.com/ipfs/go-ipfs-exchange-offline v0.0.1 github.com/ipfs/go-ipfs-files v0.0.8 + github.com/ipfs/go-ipfs-pq v0.0.2 github.com/ipfs/go-ipfs-routing v0.1.0 github.com/ipfs/go-ipfs-util v0.0.1 github.com/ipfs/go-ipld-cbor v0.0.4 // indirect @@ -33,6 +34,7 @@ require ( github.com/libp2p/go-libp2p v0.6.0 github.com/libp2p/go-libp2p-core v0.5.0 github.com/libp2p/go-libp2p-netutil v0.1.0 + github.com/libp2p/go-libp2p-peer v0.2.0 github.com/libp2p/go-libp2p-record v0.1.1 // indirect github.com/libp2p/go-libp2p-testing v0.1.1 github.com/libp2p/go-msgio v0.0.6 diff --git a/impl/graphsync.go b/impl/graphsync.go index 85398f3d..2d06698d 100644 --- a/impl/graphsync.go +++ b/impl/graphsync.go @@ -17,6 +17,7 @@ import ( "github.com/ipfs/go-graphsync/requestmanager/asyncloader" requestorhooks "github.com/ipfs/go-graphsync/requestmanager/hooks" "github.com/ipfs/go-graphsync/responsemanager" + "github.com/ipfs/go-graphsync/responsemanager/allocator" responderhooks "github.com/ipfs/go-graphsync/responsemanager/hooks" "github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager" "github.com/ipfs/go-graphsync/responsemanager/persistenceoptions" @@ -26,6 +27,8 @@ import ( var log = logging.Logger("graphsync") const maxRecursionDepth = 100 +const defaultTotalMaxMemory = uint64(4 * 1 << 30) +const defaultMaxMemoryPerPeer = uint64(1 << 30) // GraphSync is an instance of a GraphSync exchange that implements // the graphsync protocol. @@ -53,6 +56,9 @@ type GraphSync struct { ctx context.Context cancel context.CancelFunc unregisterDefaultValidator graphsync.UnregisterHookFunc + allocator *allocator.Allocator + totalMaxMemory uint64 + maxMemoryPerPeer uint64 } // Option defines the functional option type that can be used to configure @@ -67,6 +73,18 @@ func RejectAllRequestsByDefault() Option { } } +func MaxMemoryResponder(totalMaxMemory uint64) Option { + return func(gs *GraphSync) { + gs.totalMaxMemory = totalMaxMemory + } +} + +func MaxMemoryPerPeerResponder(maxMemoryPerPeer uint64) Option { + return func(gs *GraphSync) { + gs.maxMemoryPerPeer = maxMemoryPerPeer + } +} + // New creates a new GraphSync Exchange on the given network, // and the given link loader+storer. func New(parent context.Context, network gsnet.GraphSyncNetwork, @@ -83,10 +101,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, incomingBlockHooks := requestorhooks.NewBlockHooks() requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks) peerTaskQueue := peertaskqueue.New() - createdResponseQueue := func(ctx context.Context, p peer.ID) peerresponsemanager.PeerResponseSender { - return peerresponsemanager.NewResponseSender(ctx, p, peerManager) - } - peerResponseManager := peerresponsemanager.New(ctx, createdResponseQueue) + persistenceOptions := persistenceoptions.New() incomingRequestHooks := responderhooks.NewRequestHooks(persistenceOptions) outgoingBlockHooks := responderhooks.NewBlockHooks() @@ -95,7 +110,6 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, requestorCancelledListeners := responderhooks.NewRequestorCancelledListeners() blockSentListeners := responderhooks.NewBlockSentListeners() networkErrorListeners := responderhooks.NewNetworkErrorListeners() - responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners) unregisterDefaultValidator := incomingRequestHooks.Register(selectorvalidator.SelectorValidator(maxRecursionDepth)) graphSync := &GraphSync{ network: network, @@ -116,8 +130,8 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, outgoingRequestHooks: outgoingRequestHooks, incomingBlockHooks: incomingBlockHooks, peerTaskQueue: peerTaskQueue, - peerResponseManager: peerResponseManager, - responseManager: responseManager, + totalMaxMemory: defaultTotalMaxMemory, + maxMemoryPerPeer: defaultMaxMemoryPerPeer, ctx: ctx, cancel: cancel, unregisterDefaultValidator: unregisterDefaultValidator, @@ -126,7 +140,17 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, for _, option := range options { option(graphSync) } + allocator := allocator.NewAllocator(ctx, graphSync.totalMaxMemory, graphSync.maxMemoryPerPeer) + graphSync.allocator = allocator + createdResponseQueue := func(ctx context.Context, p peer.ID) peerresponsemanager.PeerResponseSender { + return peerresponsemanager.NewResponseSender(ctx, p, peerManager, allocator) + } + peerResponseManager := peerresponsemanager.New(ctx, createdResponseQueue) + graphSync.peerResponseManager = peerResponseManager + responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners) + graphSync.responseManager = responseManager + allocator.Start() asyncLoader.Startup() requestManager.SetDelegate(peerManager) requestManager.Startup() diff --git a/responsemanager/allocator/allocator.go b/responsemanager/allocator/allocator.go new file mode 100644 index 00000000..9ecb6d76 --- /dev/null +++ b/responsemanager/allocator/allocator.go @@ -0,0 +1,232 @@ +package allocator + +import ( + "context" + "errors" + + pq "github.com/ipfs/go-ipfs-pq" + peer "github.com/libp2p/go-libp2p-peer" +) + +type Allocator struct { + ctx context.Context + totalMemoryMax uint64 + perPeerMax uint64 + total uint64 + nextAllocIndex uint64 + messages chan allocationRequest + peerStatuses map[peer.ID]*peerStatus + peerStatusQueue pq.PQ +} + +func NewAllocator(ctx context.Context, totalMemoryMax uint64, perPeerMax uint64) *Allocator { + return &Allocator{ + ctx: ctx, + totalMemoryMax: totalMemoryMax, + perPeerMax: perPeerMax, + total: 0, + peerStatuses: make(map[peer.ID]*peerStatus), + peerStatusQueue: pq.New(makePeerStatusCompare(perPeerMax)), + messages: make(chan allocationRequest, 16), + } +} + +func (a *Allocator) AllocateBlockMemory(p peer.ID, amount uint64) <-chan error { + responseChan := make(chan error, 1) + done := make(chan struct{}, 1) + select { + case <-a.ctx.Done(): + responseChan <- errors.New("context closed") + case a.messages <- allocationRequest{p, amount, false, responseChan, done}: + } + select { + case <-a.ctx.Done(): + case <-done: + } + return responseChan +} + +func (a *Allocator) ReleaseBlockMemory(p peer.ID, amount uint64) error { + responseChan := make(chan error, 1) + select { + case <-a.ctx.Done(): + responseChan <- errors.New("context closed") + case a.messages <- allocationRequest{p, amount, true, responseChan, nil}: + } + select { + case <-a.ctx.Done(): + return errors.New("context closed") + case err := <-responseChan: + return err + } +} + +func (a *Allocator) Start() { + go func() { + a.run() + a.cleanup() + }() +} + +func (a *Allocator) run() { + for { + select { + case <-a.ctx.Done(): + return + case request := <-a.messages: + status, ok := a.peerStatuses[request.p] + if request.isDelloc { + if !ok { + request.response <- errors.New("cannot deallocate from peer with no allocations") + continue + } + a.handleDeallocRequest(request, status) + } else { + if !ok { + status = &peerStatus{ + p: request.p, + totalAllocated: 0, + } + a.peerStatusQueue.Push(status) + a.peerStatuses[request.p] = status + } + a.handleAllocRequest(request, status) + } + } + } +} + +func (a *Allocator) cleanup() { + for { + if a.peerStatusQueue.Len() == 0 { + return + } + nextPeer := a.peerStatusQueue.Peek().(*peerStatus) + if len(nextPeer.pendingAllocations) == 0 { + return + } + pendingAllocation := nextPeer.pendingAllocations[0] + nextPeer.pendingAllocations = nextPeer.pendingAllocations[1:] + pendingAllocation.response <- errors.New("never allocated") + a.peerStatusQueue.Update(nextPeer.Index()) + } +} + +func (a *Allocator) handleAllocRequest(request allocationRequest, status *peerStatus) { + if (a.total+request.amount <= a.totalMemoryMax) && (status.totalAllocated+request.amount <= a.perPeerMax) && len(status.pendingAllocations) == 0 { + a.total += request.amount + status.totalAllocated += request.amount + request.response <- nil + } else { + pendingAllocation := pendingAllocation{ + allocationRequest: request, + allocIndex: a.nextAllocIndex, + } + a.nextAllocIndex++ + status.pendingAllocations = append(status.pendingAllocations, pendingAllocation) + } + a.peerStatusQueue.Update(status.Index()) + request.done <- struct{}{} +} + +func (a *Allocator) handleDeallocRequest(request allocationRequest, status *peerStatus) { + status.totalAllocated -= request.amount + a.total -= request.amount + a.peerStatusQueue.Update(status.Index()) + for a.processNextPendingAllocation() { + } + request.response <- nil +} + +func (a *Allocator) processNextPendingAllocation() bool { + if a.peerStatusQueue.Len() == 0 { + return false + } + nextPeer := a.peerStatusQueue.Peek().(*peerStatus) + + if len(nextPeer.pendingAllocations) > 0 { + if !a.processNextPendingAllocationForPeer(nextPeer) { + return false + } + a.peerStatusQueue.Update(nextPeer.Index()) + } else { + if nextPeer.totalAllocated > 0 { + return false + } + a.peerStatusQueue.Pop() + target := nextPeer.p + delete(a.peerStatuses, target) + } + return true +} + +func (a *Allocator) processNextPendingAllocationForPeer(nextPeer *peerStatus) bool { + pendingAllocation := nextPeer.pendingAllocations[0] + if a.total+pendingAllocation.amount > a.totalMemoryMax { + return false + } + if nextPeer.totalAllocated+pendingAllocation.amount > a.perPeerMax { + return false + } + a.total += pendingAllocation.amount + nextPeer.totalAllocated += pendingAllocation.amount + nextPeer.pendingAllocations = nextPeer.pendingAllocations[1:] + pendingAllocation.response <- nil + return true +} + +type allocationRequest struct { + p peer.ID + amount uint64 + isDelloc bool + response chan error + done chan struct{} +} + +type peerStatus struct { + p peer.ID + totalAllocated uint64 + index int + pendingAllocations []pendingAllocation +} + +type pendingAllocation struct { + allocationRequest + allocIndex uint64 +} + +// SetIndex stores the int index. +func (ps *peerStatus) SetIndex(index int) { + ps.index = index +} + +// Index returns the last given by SetIndex(int). +func (ps *peerStatus) Index() int { + return ps.index +} + +func makePeerStatusCompare(maxPerPeer uint64) pq.ElemComparator { + return func(a, b pq.Elem) bool { + pa := a.(*peerStatus) + pb := b.(*peerStatus) + if len(pa.pendingAllocations) == 0 { + if len(pb.pendingAllocations) == 0 { + return pa.totalAllocated < pb.totalAllocated + } + return false + } + if len(pb.pendingAllocations) == 0 { + return true + } + if pa.totalAllocated+pa.pendingAllocations[0].amount > maxPerPeer { + return false + } + if pb.totalAllocated+pb.pendingAllocations[0].amount > maxPerPeer { + return true + } + if pa.pendingAllocations[0].allocIndex < pb.pendingAllocations[0].allocIndex { + return true + } + return false + } +} diff --git a/responsemanager/allocator/allocator_test.go b/responsemanager/allocator/allocator_test.go new file mode 100644 index 00000000..4ea6ac05 --- /dev/null +++ b/responsemanager/allocator/allocator_test.go @@ -0,0 +1,219 @@ +package allocator_test + +import ( + "context" + "testing" + + "github.com/ipfs/go-graphsync/responsemanager/allocator" + "github.com/ipfs/go-graphsync/testutil" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestAllocator(t *testing.T) { + peers := testutil.GeneratePeers(3) + ctx := context.Background() + testCases := map[string]struct { + total uint64 + maxPerPeer uint64 + allocs []alloc + totals []map[peer.ID]uint64 + }{ + "single peer against total": { + total: 1000, + maxPerPeer: 1000, + allocs: []alloc{ + {peers[0], 300, false}, + {peers[0], 300, false}, + {peers[0], 300, false}, + {peers[0], 300, false}, + {peers[0], 400, true}, + }, + totals: []map[peer.ID]uint64{ + {peers[0]: 300}, + {peers[0]: 600}, + {peers[0]: 900}, + {peers[0]: 500}, + {peers[0]: 800}, + }, + }, + "single peer against self limit": { + total: 2000, + maxPerPeer: 1000, + allocs: []alloc{ + {peers[0], 300, false}, + {peers[0], 300, false}, + {peers[0], 300, false}, + {peers[0], 300, false}, + {peers[0], 400, true}, + }, + totals: []map[peer.ID]uint64{ + {peers[0]: 300}, + {peers[0]: 600}, + {peers[0]: 900}, + {peers[0]: 500}, + {peers[0]: 800}, + }, + }, + "multiple peers against total": { + total: 2000, + maxPerPeer: 2000, + allocs: []alloc{ + {peers[0], 1000, false}, + {peers[1], 900, false}, + {peers[1], 400, false}, + {peers[0], 300, false}, + {peers[0], 500, true}, + {peers[1], 500, true}, + }, + totals: []map[peer.ID]uint64{ + {peers[0]: 1000}, + {peers[0]: 1000, peers[1]: 900}, + {peers[0]: 500, peers[1]: 900}, + {peers[0]: 500, peers[1]: 1300}, + {peers[0]: 500, peers[1]: 800}, + {peers[0]: 800, peers[1]: 800}, + }, + }, + "multiple peers against self limit": { + total: 5000, + maxPerPeer: 1000, + allocs: []alloc{ + {peers[0], 1000, false}, + {peers[1], 900, false}, + {peers[1], 400, false}, + {peers[0], 300, false}, + {peers[0], 500, true}, + {peers[1], 500, true}, + }, + totals: []map[peer.ID]uint64{ + {peers[0]: 1000}, + {peers[0]: 1000, peers[1]: 900}, + {peers[0]: 500, peers[1]: 900}, + {peers[0]: 800, peers[1]: 900}, + {peers[0]: 800, peers[1]: 400}, + {peers[0]: 800, peers[1]: 800}, + }, + }, + "multiple peers against mix of limits": { + total: 2700, + maxPerPeer: 1000, + allocs: []alloc{ + {peers[0], 800, false}, + {peers[1], 900, false}, + {peers[1], 400, false}, + {peers[0], 300, false}, + {peers[2], 1000, false}, + {peers[2], 300, false}, + {peers[0], 200, true}, + {peers[2], 200, true}, + {peers[2], 100, false}, + {peers[1], 200, true}, + {peers[2], 100, true}, + {peers[1], 100, true}, + {peers[2], 200, true}, + {peers[0], 200, true}, + }, + totals: []map[peer.ID]uint64{ + {peers[0]: 800}, + {peers[0]: 800, peers[1]: 900}, + {peers[0]: 800, peers[1]: 900, peers[2]: 1000}, + {peers[0]: 600, peers[1]: 900, peers[2]: 1000}, + {peers[0]: 600, peers[1]: 900, peers[2]: 800}, + {peers[0]: 900, peers[1]: 900, peers[2]: 800}, + {peers[0]: 900, peers[1]: 700, peers[2]: 800}, + {peers[0]: 900, peers[1]: 700, peers[2]: 700}, + {peers[0]: 900, peers[1]: 700, peers[2]: 1000}, + {peers[0]: 900, peers[1]: 600, peers[2]: 1000}, + {peers[0]: 900, peers[1]: 600, peers[2]: 800}, + {peers[0]: 900, peers[1]: 1000, peers[2]: 800}, + {peers[0]: 700, peers[1]: 1000, peers[2]: 800}, + {peers[0]: 700, peers[1]: 1000, peers[2]: 900}, + }, + }, + } + for testCase, data := range testCases { + t.Run(testCase, func(t *testing.T) { + //ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + //defer cancel() + allocator := allocator.NewAllocator(ctx, data.total, data.maxPerPeer) + allocator.Start() + totals := map[peer.ID]uint64{} + currentTotal := 0 + var pending []pendingResult + for _, alloc := range data.allocs { + var changedTotals bool + pending, changedTotals = readPending(t, pending, totals) + if changedTotals { + require.Less(t, currentTotal, len(data.totals)) + require.Equal(t, data.totals[currentTotal], totals) + currentTotal++ + } + if alloc.isDealloc { + err := allocator.ReleaseBlockMemory(alloc.p, alloc.amount) + assert.NoError(t, err) + totals[alloc.p] = totals[alloc.p] - alloc.amount + require.Less(t, currentTotal, len(data.totals)) + require.Equal(t, data.totals[currentTotal], totals) + currentTotal++ + } else { + allocated := allocator.AllocateBlockMemory(alloc.p, alloc.amount) + select { + case <-allocated: + totals[alloc.p] = totals[alloc.p] + alloc.amount + require.Less(t, currentTotal, len(data.totals)) + require.Equal(t, data.totals[currentTotal], totals) + currentTotal++ + default: + pending = append(pending, pendingResult{alloc.p, alloc.amount, allocated}) + } + } + } + var changedTotals bool + _, changedTotals = readPending(t, pending, totals) + if changedTotals { + require.Less(t, currentTotal, len(data.totals)) + require.Equal(t, data.totals[currentTotal], totals) + currentTotal++ + } + require.Equal(t, len(data.totals), currentTotal) + }) + } +} + +func readPending(t *testing.T, pending []pendingResult, totals map[peer.ID]uint64) ([]pendingResult, bool) { + morePending := true + changedTotals := false + for morePending && len(pending) > 0 { + morePending = false + doneIter: + for i, next := range pending { + select { + case err := <-next.response: + require.NoError(t, err) + copy(pending[i:], pending[i+1:]) + pending[len(pending)-1] = pendingResult{} + pending = pending[:len(pending)-1] + totals[next.p] = totals[next.p] + next.amount + changedTotals = true + morePending = true + break doneIter + default: + } + } + } + return pending, changedTotals +} + +type alloc struct { + p peer.ID + amount uint64 + isDealloc bool +} + +type pendingResult struct { + p peer.ID + amount uint64 + response <-chan error +} diff --git a/responsemanager/peerresponsemanager/peerresponsesender.go b/responsemanager/peerresponsemanager/peerresponsesender.go index 74105ecf..3d671df6 100644 --- a/responsemanager/peerresponsemanager/peerresponsesender.go +++ b/responsemanager/peerresponsemanager/peerresponsesender.go @@ -49,6 +49,11 @@ type PeerMessageHandler interface { SendResponse(peer.ID, []gsmsg.GraphSyncResponse, []blocks.Block, ...notifications.Notifee) } +type Allocator interface { + AllocateBlockMemory(p peer.ID, amount uint64) <-chan error + ReleaseBlockMemory(p peer.ID, amount uint64) error +} + // Transaction is a series of operations that should be send together in a single response type Transaction func(PeerResponseTransactionSender) error @@ -57,18 +62,20 @@ type peerResponseSender struct { ctx context.Context cancel context.CancelFunc peerHandler PeerMessageHandler + allocator Allocator outgoingWork chan struct{} - linkTrackerLk sync.RWMutex - linkTracker *linktracker.LinkTracker - altTrackers map[string]*linktracker.LinkTracker - dedupKeys map[graphsync.RequestID]string - responseBuildersLk sync.RWMutex - responseBuilders []*responsebuilder.ResponseBuilder - nextBuilderTopic responsebuilder.Topic - queuedMessages chan responsebuilder.Topic - subscriber notifications.MappableSubscriber - publisher notifications.Publisher + linkTrackerLk sync.RWMutex + linkTracker *linktracker.LinkTracker + altTrackers map[string]*linktracker.LinkTracker + dedupKeys map[graphsync.RequestID]string + responseBuildersLk sync.RWMutex + responseBuilders []*responsebuilder.ResponseBuilder + nextBuilderTopic responsebuilder.Topic + queuedMessages chan responsebuilder.Topic + subscriber notifications.MappableSubscriber + allocatorSubscriber notifications.MappableSubscriber + publisher notifications.Publisher } // PeerResponseSender handles batching, deduping, and sending responses for @@ -109,7 +116,7 @@ type PeerResponseTransactionSender interface { // NewResponseSender generates a new PeerResponseSender for the given context, peer ID, // using the given peer message handler. -func NewResponseSender(ctx context.Context, p peer.ID, peerHandler PeerMessageHandler) PeerResponseSender { +func NewResponseSender(ctx context.Context, p peer.ID, peerHandler PeerMessageHandler, allocator Allocator) PeerResponseSender { ctx, cancel := context.WithCancel(ctx) prs := &peerResponseSender{ p: p, @@ -122,8 +129,10 @@ func NewResponseSender(ctx context.Context, p peer.ID, peerHandler PeerMessageHa altTrackers: make(map[string]*linktracker.LinkTracker), queuedMessages: make(chan responsebuilder.Topic, 1), publisher: notifications.NewPublisher(), + allocator: allocator, } prs.subscriber = notifications.NewMappableSubscriber(&subscriber{prs}, notifications.IdentityTransform) + prs.allocatorSubscriber = notifications.NewMappableSubscriber(&allocatorSubscriber{prs}, notifications.IdentityTransform) return prs } @@ -383,6 +392,12 @@ func (prs *peerResponseSender) FinishWithCancel(requestID graphsync.RequestID) { } func (prs *peerResponseSender) buildResponse(blkSize uint64, buildResponseFn func(*responsebuilder.ResponseBuilder), notifees []notifications.Notifee) bool { + allocResponse := prs.allocator.AllocateBlockMemory(prs.p, blkSize) + select { + case <-prs.ctx.Done(): + return false + case <-allocResponse: + } prs.responseBuildersLk.Lock() defer prs.responseBuildersLk.Unlock() if shouldBeginNewResponse(prs.responseBuilders, blkSize) { @@ -438,6 +453,10 @@ func (prs *peerResponseSender) sendResponseMessages() { if builder.Empty() { continue } + notifications.SubscribeOn(prs.publisher, builder.Topic(), notifications.Notifee{ + Topic: builder.BlockSize(), + Subscriber: prs.allocatorSubscriber, + }) responses, blks, err := builder.Build() if err != nil { log.Errorf("Unable to assemble GraphSync response: %s", err.Error()) @@ -495,3 +514,22 @@ func (s *subscriber) OnNext(topic notifications.Topic, event notifications.Event func (s *subscriber) OnClose(topic notifications.Topic) { s.prs.publisher.Close(topic) } + +type allocatorSubscriber struct { + prs *peerResponseSender +} + +func (as *allocatorSubscriber) OnNext(topic notifications.Topic, event notifications.Event) { + blkSize, ok := topic.(uint64) + if !ok { + return + } + _, ok = event.(Event) + if !ok { + return + } + _ = as.prs.allocator.ReleaseBlockMemory(as.prs.p, blkSize) +} + +func (as *allocatorSubscriber) OnClose(topic notifications.Topic) { +} diff --git a/responsemanager/peerresponsemanager/peerresponsesender_test.go b/responsemanager/peerresponsemanager/peerresponsesender_test.go index ecdf91e8..70ef3ce0 100644 --- a/responsemanager/peerresponsemanager/peerresponsesender_test.go +++ b/responsemanager/peerresponsemanager/peerresponsesender_test.go @@ -18,6 +18,7 @@ import ( gsmsg "github.com/ipfs/go-graphsync/message" "github.com/ipfs/go-graphsync/messagequeue" "github.com/ipfs/go-graphsync/notifications" + "github.com/ipfs/go-graphsync/responsemanager/allocator" "github.com/ipfs/go-graphsync/testutil" ) @@ -41,7 +42,9 @@ func TestPeerResponseSenderSendsResponses(t *testing.T) { links = append(links, cidlink.Link{Cid: block.Cid()}) } fph := newFakePeerHandler(ctx, t) - peerResponseSender := NewResponseSender(ctx, p, fph) + allocator := allocator.NewAllocator(ctx, 1<<30, 1<<30) + allocator.Start() + peerResponseSender := NewResponseSender(ctx, p, fph, allocator) peerResponseSender.Startup() bd := peerResponseSender.SendResponse(requestID1, links[0], blks[0].RawData(), sendResponseNotifee1) @@ -125,7 +128,9 @@ func TestPeerResponseSenderSendsVeryLargeBlocksResponses(t *testing.T) { links = append(links, cidlink.Link{Cid: block.Cid()}) } fph := newFakePeerHandler(ctx, t) - peerResponseSender := NewResponseSender(ctx, p, fph) + allocator := allocator.NewAllocator(ctx, 1<<30, 1<<30) + allocator.Start() + peerResponseSender := NewResponseSender(ctx, p, fph, allocator) peerResponseSender.Startup() peerResponseSender.SendResponse(requestID1, links[0], blks[0].RawData()) @@ -185,7 +190,9 @@ func TestPeerResponseSenderSendsExtensionData(t *testing.T) { links = append(links, cidlink.Link{Cid: block.Cid()}) } fph := newFakePeerHandler(ctx, t) - peerResponseSender := NewResponseSender(ctx, p, fph) + allocator := allocator.NewAllocator(ctx, 1<<30, 1<<30) + allocator.Start() + peerResponseSender := NewResponseSender(ctx, p, fph, allocator) peerResponseSender.Startup() peerResponseSender.SendResponse(requestID1, links[0], blks[0].RawData()) @@ -228,7 +235,9 @@ func TestPeerResponseSenderSendsResponsesInTransaction(t *testing.T) { links = append(links, cidlink.Link{Cid: block.Cid()}) } fph := newFakePeerHandler(ctx, t) - peerResponseSender := NewResponseSender(ctx, p, fph) + allocator := allocator.NewAllocator(ctx, 1<<30, 1<<30) + allocator.Start() + peerResponseSender := NewResponseSender(ctx, p, fph, allocator) peerResponseSender.Startup() notifee, notifeeVerifier := testutil.NewTestNotifee("transaction", 10) err := peerResponseSender.Transaction(requestID1, func(peerResponseSender PeerResponseTransactionSender) error { @@ -270,7 +279,9 @@ func TestPeerResponseSenderIgnoreBlocks(t *testing.T) { links = append(links, cidlink.Link{Cid: block.Cid()}) } fph := newFakePeerHandler(ctx, t) - peerResponseSender := NewResponseSender(ctx, p, fph) + allocator := allocator.NewAllocator(ctx, 1<<30, 1<<30) + allocator.Start() + peerResponseSender := NewResponseSender(ctx, p, fph, allocator) peerResponseSender.Startup() peerResponseSender.IgnoreBlocks(requestID1, links) @@ -326,7 +337,9 @@ func TestPeerResponseSenderDupKeys(t *testing.T) { links = append(links, cidlink.Link{Cid: block.Cid()}) } fph := newFakePeerHandler(ctx, t) - peerResponseSender := NewResponseSender(ctx, p, fph) + allocator := allocator.NewAllocator(ctx, 1<<30, 1<<30) + allocator.Start() + peerResponseSender := NewResponseSender(ctx, p, fph, allocator) peerResponseSender.Startup() peerResponseSender.DedupKey(requestID1, "applesauce") @@ -382,6 +395,65 @@ func TestPeerResponseSenderDupKeys(t *testing.T) { } +func TestPeerResponseSenderSendsResponsesMemoryPressure(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + p := testutil.GeneratePeers(1)[0] + requestID1 := graphsync.RequestID(rand.Int31()) + blks := testutil.GenerateBlocksOfSize(5, 100) + links := make([]ipld.Link, 0, len(blks)) + for _, block := range blks { + links = append(links, cidlink.Link{Cid: block.Cid()}) + } + fph := newFakePeerHandler(ctx, t) + allocator := allocator.NewAllocator(ctx, 300, 300) + allocator.Start() + peerResponseSender := NewResponseSender(ctx, p, fph, allocator) + peerResponseSender.Startup() + + bd := peerResponseSender.SendResponse(requestID1, links[0], blks[0].RawData()) + assertSentOnWire(t, bd, blks[0]) + fph.AssertHasMessage("did not send first message") + + fph.AssertBlocks(blks[0]) + fph.AssertResponses(expectedResponses{requestID1: graphsync.PartialResponse}) + + finishes := make(chan string, 2) + go func() { + _ = peerResponseSender.Transaction(requestID1, func(peerResponseSender PeerResponseTransactionSender) error { + bd = peerResponseSender.SendResponse(links[1], blks[1].RawData()) + assertSentOnWire(t, bd, blks[1]) + bd = peerResponseSender.SendResponse(links[2], blks[2].RawData()) + assertSentOnWire(t, bd, blks[2]) + bd = peerResponseSender.SendResponse(links[3], blks[3].RawData()) + assertSentOnWire(t, bd, blks[3]) + peerResponseSender.FinishRequest() + return nil + }) + finishes <- "sent message" + }() + go func() { + time.Sleep(100 * time.Millisecond) + // let peer reponse manager know last message was sent so message sending can continue + finishes <- "freed memory" + fph.notifySuccess() + }() + + var finishMessages []string + for i := 0; i < 2; i++ { + var finishMessage string + testutil.AssertReceive(ctx, t, finishes, &finishMessage, "should have completed") + finishMessages = append(finishMessages, finishMessage) + } + require.Equal(t, []string{"freed memory", "sent message"}, finishMessages) + fph.AssertHasMessage("did not send second message") + fph.AssertBlocks(blks[1], blks[2], blks[3]) + fph.AssertResponses(expectedResponses{ + requestID1: graphsync.RequestCompletedFull, + }) +} + func findResponseForRequestID(responses []gsmsg.GraphSyncResponse, requestID graphsync.RequestID) (gsmsg.GraphSyncResponse, error) { for _, response := range responses { if response.RequestID() == requestID { From 6b1db1a81b8b688d785ee2336e63344915d9eb0d Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Mon, 26 Oct 2020 14:12:16 -0700 Subject: [PATCH 3/6] feat(allocator): add method to release all peer memory --- benchmarks/benchmark_test.go | 4 +- responsemanager/allocator/allocator.go | 69 +++++++++++++++---- responsemanager/allocator/allocator_test.go | 42 +++++++++-- .../peerresponsemanager/peerresponsesender.go | 19 +++-- 4 files changed, 107 insertions(+), 27 deletions(-) diff --git a/benchmarks/benchmark_test.go b/benchmarks/benchmark_test.go index 6c8634a8..95875c24 100644 --- a/benchmarks/benchmark_test.go +++ b/benchmarks/benchmark_test.go @@ -64,10 +64,10 @@ func BenchmarkRoundtripSuccess(b *testing.B) { p2pStrestTest(ctx, b, 10, allFilesUniformSize(128*(1<<20), 1<<10, 1024, true), tdm, nil, false) }) b.Run("test-p2p-stress-1-1GB-memory-pressure", func(b *testing.B) { - p2pStrestTest(ctx, b, 1, allFilesUniformSize(1*(1<<30), 1<<20, 1024, true), tdm, []graphsync.Option{}, true) + p2pStrestTest(ctx, b, 1, allFilesUniformSize(1*(1<<30), 1<<20, 1024, true), tdm, []graphsync.Option{graphsync.MaxMemoryResponder(1 << 27)}, true) }) b.Run("test-p2p-stress-1-1GB-memory-pressure-no-raw-nodes", func(b *testing.B) { - p2pStrestTest(ctx, b, 1, allFilesUniformSize(1*(1<<30), 1<<20, 1024, false), tdm, []graphsync.Option{}, true) + p2pStrestTest(ctx, b, 1, allFilesUniformSize(1*(1<<30), 1<<20, 1024, false), tdm, []graphsync.Option{graphsync.MaxMemoryResponder(1 << 27)}, true) }) b.Run("test-repeated-disconnects-20-10000", func(b *testing.B) { benchmarkRepeatedDisconnects(ctx, b, 20, allFilesUniformSize(10000, defaultUnixfsChunkSize, defaultUnixfsLinksPerLevel, true), tdm) diff --git a/responsemanager/allocator/allocator.go b/responsemanager/allocator/allocator.go index 9ecb6d76..f8d7947e 100644 --- a/responsemanager/allocator/allocator.go +++ b/responsemanager/allocator/allocator.go @@ -37,7 +37,7 @@ func (a *Allocator) AllocateBlockMemory(p peer.ID, amount uint64) <-chan error { select { case <-a.ctx.Done(): responseChan <- errors.New("context closed") - case a.messages <- allocationRequest{p, amount, false, responseChan, done}: + case a.messages <- allocationRequest{p, amount, allocOperation, responseChan, done}: } select { case <-a.ctx.Done(): @@ -51,7 +51,22 @@ func (a *Allocator) ReleaseBlockMemory(p peer.ID, amount uint64) error { select { case <-a.ctx.Done(): responseChan <- errors.New("context closed") - case a.messages <- allocationRequest{p, amount, true, responseChan, nil}: + case a.messages <- allocationRequest{p, amount, deallocOperation, responseChan, nil}: + } + select { + case <-a.ctx.Done(): + return errors.New("context closed") + case err := <-responseChan: + return err + } +} + +func (a *Allocator) ReleasePeerMemory(p peer.ID) error { + responseChan := make(chan error, 1) + select { + case <-a.ctx.Done(): + responseChan <- errors.New("context closed") + case a.messages <- allocationRequest{p, 0, deallocPeerOperation, responseChan, nil}: } select { case <-a.ctx.Done(): @@ -75,13 +90,8 @@ func (a *Allocator) run() { return case request := <-a.messages: status, ok := a.peerStatuses[request.p] - if request.isDelloc { - if !ok { - request.response <- errors.New("cannot deallocate from peer with no allocations") - continue - } - a.handleDeallocRequest(request, status) - } else { + switch request.operation { + case allocOperation: if !ok { status = &peerStatus{ p: request.p, @@ -91,6 +101,18 @@ func (a *Allocator) run() { a.peerStatuses[request.p] = status } a.handleAllocRequest(request, status) + case deallocOperation: + if !ok { + request.response <- errors.New("cannot deallocate from peer with no allocations") + continue + } + a.handleDeallocRequest(request, status) + case deallocPeerOperation: + if !ok { + request.response <- errors.New("cannot deallocate from peer with no allocations") + continue + } + a.handleDeallocPeerRequest(request, status) } } } @@ -138,6 +160,17 @@ func (a *Allocator) handleDeallocRequest(request allocationRequest, status *peer request.response <- nil } +func (a *Allocator) handleDeallocPeerRequest(request allocationRequest, status *peerStatus) { + a.peerStatusQueue.Remove(status.Index()) + for _, pendingAllocation := range status.pendingAllocations { + pendingAllocation.response <- errors.New("Peer has been deallocated") + } + a.total -= status.totalAllocated + for a.processNextPendingAllocation() { + } + request.response <- nil +} + func (a *Allocator) processNextPendingAllocation() bool { if a.peerStatusQueue.Len() == 0 { return false @@ -175,12 +208,20 @@ func (a *Allocator) processNextPendingAllocationForPeer(nextPeer *peerStatus) bo return true } +type operationType uint64 + +const ( + allocOperation operationType = iota + deallocOperation + deallocPeerOperation +) + type allocationRequest struct { - p peer.ID - amount uint64 - isDelloc bool - response chan error - done chan struct{} + p peer.ID + amount uint64 + operation operationType + response chan error + done chan struct{} } type peerStatus struct { diff --git a/responsemanager/allocator/allocator_test.go b/responsemanager/allocator/allocator_test.go index 4ea6ac05..d72754ec 100644 --- a/responsemanager/allocator/allocator_test.go +++ b/responsemanager/allocator/allocator_test.go @@ -3,6 +3,7 @@ package allocator_test import ( "context" "testing" + "time" "github.com/ipfs/go-graphsync/responsemanager/allocator" "github.com/ipfs/go-graphsync/testutil" @@ -132,11 +133,35 @@ func TestAllocator(t *testing.T) { {peers[0]: 700, peers[1]: 1000, peers[2]: 900}, }, }, + "multiple peers, peer drops off": { + total: 2000, + maxPerPeer: 1000, + allocs: []alloc{ + {peers[0], 1000, false}, + {peers[1], 500, false}, + {peers[2], 500, false}, + {peers[1], 100, false}, + {peers[2], 100, false}, + {peers[2], 200, false}, + {peers[1], 200, false}, + {peers[2], 100, false}, + {peers[1], 300, false}, + // free peer 0 completely + {peers[0], 0, true}, + }, + totals: []map[peer.ID]uint64{ + {peers[0]: 1000}, + {peers[0]: 1000, peers[1]: 500}, + {peers[0]: 1000, peers[1]: 500, peers[2]: 500}, + {peers[0]: 0, peers[1]: 500, peers[2]: 500}, + {peers[0]: 0, peers[1]: 800, peers[2]: 900}, + }, + }, } for testCase, data := range testCases { t.Run(testCase, func(t *testing.T) { - //ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - //defer cancel() + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() allocator := allocator.NewAllocator(ctx, data.total, data.maxPerPeer) allocator.Start() totals := map[peer.ID]uint64{} @@ -151,9 +176,15 @@ func TestAllocator(t *testing.T) { currentTotal++ } if alloc.isDealloc { - err := allocator.ReleaseBlockMemory(alloc.p, alloc.amount) - assert.NoError(t, err) - totals[alloc.p] = totals[alloc.p] - alloc.amount + if alloc.amount == 0 { + err := allocator.ReleasePeerMemory(alloc.p) + assert.NoError(t, err) + totals[alloc.p] = 0 + } else { + err := allocator.ReleaseBlockMemory(alloc.p, alloc.amount) + assert.NoError(t, err) + totals[alloc.p] = totals[alloc.p] - alloc.amount + } require.Less(t, currentTotal, len(data.totals)) require.Equal(t, data.totals[currentTotal], totals) currentTotal++ @@ -206,6 +237,7 @@ func readPending(t *testing.T, pending []pendingResult, totals map[peer.ID]uint6 return pending, changedTotals } +// amount 0 + isDealloc = true shuts down the whole peer type alloc struct { p peer.ID amount uint64 diff --git a/responsemanager/peerresponsemanager/peerresponsesender.go b/responsemanager/peerresponsemanager/peerresponsesender.go index 3d671df6..9ae4f6eb 100644 --- a/responsemanager/peerresponsemanager/peerresponsesender.go +++ b/responsemanager/peerresponsemanager/peerresponsesender.go @@ -49,8 +49,10 @@ type PeerMessageHandler interface { SendResponse(peer.ID, []gsmsg.GraphSyncResponse, []blocks.Block, ...notifications.Notifee) } +// Allocator is an interface that can manage memory allocated for blocks type Allocator interface { AllocateBlockMemory(p peer.ID, amount uint64) <-chan error + ReleasePeerMemory(p peer.ID) error ReleaseBlockMemory(p peer.ID, amount uint64) error } @@ -392,11 +394,13 @@ func (prs *peerResponseSender) FinishWithCancel(requestID graphsync.RequestID) { } func (prs *peerResponseSender) buildResponse(blkSize uint64, buildResponseFn func(*responsebuilder.ResponseBuilder), notifees []notifications.Notifee) bool { - allocResponse := prs.allocator.AllocateBlockMemory(prs.p, blkSize) - select { - case <-prs.ctx.Done(): - return false - case <-allocResponse: + if blkSize > 0 { + allocResponse := prs.allocator.AllocateBlockMemory(prs.p, blkSize) + select { + case <-prs.ctx.Done(): + return false + case <-allocResponse: + } } prs.responseBuildersLk.Lock() defer prs.responseBuildersLk.Unlock() @@ -431,7 +435,10 @@ func (prs *peerResponseSender) signalWork() { } func (prs *peerResponseSender) run() { - defer prs.publisher.Shutdown() + defer func() { + prs.publisher.Shutdown() + prs.allocator.ReleasePeerMemory(prs.p) + }() prs.publisher.Startup() for { select { From 5bf1d94ae2b0c78063a1a141821b6e87ef72f85e Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Mon, 26 Oct 2020 22:42:49 -0700 Subject: [PATCH 4/6] feat(allocator): refactor w/ mutex refactor allocator to remove go routine and address a few PR comments --- impl/graphsync.go | 7 +- responsemanager/allocator/allocator.go | 224 +++++------------- responsemanager/allocator/allocator_test.go | 8 +- .../peerresponsemanager/peerresponsesender.go | 3 +- .../peerresponsesender_test.go | 21 +- 5 files changed, 76 insertions(+), 187 deletions(-) diff --git a/impl/graphsync.go b/impl/graphsync.go index 2d06698d..670cb4db 100644 --- a/impl/graphsync.go +++ b/impl/graphsync.go @@ -27,8 +27,8 @@ import ( var log = logging.Logger("graphsync") const maxRecursionDepth = 100 -const defaultTotalMaxMemory = uint64(4 * 1 << 30) -const defaultMaxMemoryPerPeer = uint64(1 << 30) +const defaultTotalMaxMemory = uint64(1 << 28) +const defaultMaxMemoryPerPeer = uint64(1 << 24) // GraphSync is an instance of a GraphSync exchange that implements // the graphsync protocol. @@ -140,7 +140,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, for _, option := range options { option(graphSync) } - allocator := allocator.NewAllocator(ctx, graphSync.totalMaxMemory, graphSync.maxMemoryPerPeer) + allocator := allocator.NewAllocator(graphSync.totalMaxMemory, graphSync.maxMemoryPerPeer) graphSync.allocator = allocator createdResponseQueue := func(ctx context.Context, p peer.ID) peerresponsemanager.PeerResponseSender { return peerresponsemanager.NewResponseSender(ctx, p, peerManager, allocator) @@ -150,7 +150,6 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork, responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners) graphSync.responseManager = responseManager - allocator.Start() asyncLoader.Startup() requestManager.SetDelegate(peerManager) requestManager.Startup() diff --git a/responsemanager/allocator/allocator.go b/responsemanager/allocator/allocator.go index f8d7947e..f9cdbf5d 100644 --- a/responsemanager/allocator/allocator.go +++ b/responsemanager/allocator/allocator.go @@ -1,196 +1,114 @@ package allocator import ( - "context" "errors" + "sync" pq "github.com/ipfs/go-ipfs-pq" peer "github.com/libp2p/go-libp2p-peer" ) type Allocator struct { - ctx context.Context - totalMemoryMax uint64 - perPeerMax uint64 + totalMemoryMax uint64 + perPeerMax uint64 + + allocLk sync.Mutex total uint64 nextAllocIndex uint64 - messages chan allocationRequest peerStatuses map[peer.ID]*peerStatus peerStatusQueue pq.PQ } -func NewAllocator(ctx context.Context, totalMemoryMax uint64, perPeerMax uint64) *Allocator { +func NewAllocator(totalMemoryMax uint64, perPeerMax uint64) *Allocator { return &Allocator{ - ctx: ctx, totalMemoryMax: totalMemoryMax, perPeerMax: perPeerMax, total: 0, peerStatuses: make(map[peer.ID]*peerStatus), peerStatusQueue: pq.New(makePeerStatusCompare(perPeerMax)), - messages: make(chan allocationRequest, 16), } } func (a *Allocator) AllocateBlockMemory(p peer.ID, amount uint64) <-chan error { responseChan := make(chan error, 1) - done := make(chan struct{}, 1) - select { - case <-a.ctx.Done(): - responseChan <- errors.New("context closed") - case a.messages <- allocationRequest{p, amount, allocOperation, responseChan, done}: - } - select { - case <-a.ctx.Done(): - case <-done: - } - return responseChan -} - -func (a *Allocator) ReleaseBlockMemory(p peer.ID, amount uint64) error { - responseChan := make(chan error, 1) - select { - case <-a.ctx.Done(): - responseChan <- errors.New("context closed") - case a.messages <- allocationRequest{p, amount, deallocOperation, responseChan, nil}: - } - select { - case <-a.ctx.Done(): - return errors.New("context closed") - case err := <-responseChan: - return err - } -} - -func (a *Allocator) ReleasePeerMemory(p peer.ID) error { - responseChan := make(chan error, 1) - select { - case <-a.ctx.Done(): - responseChan <- errors.New("context closed") - case a.messages <- allocationRequest{p, 0, deallocPeerOperation, responseChan, nil}: - } - select { - case <-a.ctx.Done(): - return errors.New("context closed") - case err := <-responseChan: - return err - } -} - -func (a *Allocator) Start() { - go func() { - a.run() - a.cleanup() - }() -} - -func (a *Allocator) run() { - for { - select { - case <-a.ctx.Done(): - return - case request := <-a.messages: - status, ok := a.peerStatuses[request.p] - switch request.operation { - case allocOperation: - if !ok { - status = &peerStatus{ - p: request.p, - totalAllocated: 0, - } - a.peerStatusQueue.Push(status) - a.peerStatuses[request.p] = status - } - a.handleAllocRequest(request, status) - case deallocOperation: - if !ok { - request.response <- errors.New("cannot deallocate from peer with no allocations") - continue - } - a.handleDeallocRequest(request, status) - case deallocPeerOperation: - if !ok { - request.response <- errors.New("cannot deallocate from peer with no allocations") - continue - } - a.handleDeallocPeerRequest(request, status) - } - } - } -} - -func (a *Allocator) cleanup() { - for { - if a.peerStatusQueue.Len() == 0 { - return - } - nextPeer := a.peerStatusQueue.Peek().(*peerStatus) - if len(nextPeer.pendingAllocations) == 0 { - return + a.allocLk.Lock() + defer a.allocLk.Unlock() + + status, ok := a.peerStatuses[p] + if !ok { + status = &peerStatus{ + p: p, + totalAllocated: 0, } - pendingAllocation := nextPeer.pendingAllocations[0] - nextPeer.pendingAllocations = nextPeer.pendingAllocations[1:] - pendingAllocation.response <- errors.New("never allocated") - a.peerStatusQueue.Update(nextPeer.Index()) + a.peerStatusQueue.Push(status) + a.peerStatuses[p] = status } -} -func (a *Allocator) handleAllocRequest(request allocationRequest, status *peerStatus) { - if (a.total+request.amount <= a.totalMemoryMax) && (status.totalAllocated+request.amount <= a.perPeerMax) && len(status.pendingAllocations) == 0 { - a.total += request.amount - status.totalAllocated += request.amount - request.response <- nil + if (a.total+amount <= a.totalMemoryMax) && (status.totalAllocated+amount <= a.perPeerMax) && len(status.pendingAllocations) == 0 { + a.total += amount + status.totalAllocated += amount + responseChan <- nil } else { - pendingAllocation := pendingAllocation{ - allocationRequest: request, - allocIndex: a.nextAllocIndex, - } + pendingAllocation := pendingAllocation{p, amount, responseChan, a.nextAllocIndex} a.nextAllocIndex++ status.pendingAllocations = append(status.pendingAllocations, pendingAllocation) } a.peerStatusQueue.Update(status.Index()) - request.done <- struct{}{} + return responseChan } -func (a *Allocator) handleDeallocRequest(request allocationRequest, status *peerStatus) { - status.totalAllocated -= request.amount - a.total -= request.amount - a.peerStatusQueue.Update(status.Index()) - for a.processNextPendingAllocation() { +func (a *Allocator) ReleaseBlockMemory(p peer.ID, amount uint64) error { + a.allocLk.Lock() + defer a.allocLk.Unlock() + + status, ok := a.peerStatuses[p] + if !ok { + return errors.New("cannot deallocate from peer with no allocations") } - request.response <- nil + status.totalAllocated -= amount + a.total -= amount + a.peerStatusQueue.Update(status.Index()) + a.processPendingAllocations() + return nil } -func (a *Allocator) handleDeallocPeerRequest(request allocationRequest, status *peerStatus) { +func (a *Allocator) ReleasePeerMemory(p peer.ID) error { + a.allocLk.Lock() + defer a.allocLk.Unlock() + status, ok := a.peerStatuses[p] + if !ok { + return errors.New("cannot deallocate peer with no allocations") + } a.peerStatusQueue.Remove(status.Index()) for _, pendingAllocation := range status.pendingAllocations { pendingAllocation.response <- errors.New("Peer has been deallocated") } a.total -= status.totalAllocated - for a.processNextPendingAllocation() { - } - request.response <- nil + a.processPendingAllocations() + return nil } -func (a *Allocator) processNextPendingAllocation() bool { - if a.peerStatusQueue.Len() == 0 { - return false - } - nextPeer := a.peerStatusQueue.Peek().(*peerStatus) - - if len(nextPeer.pendingAllocations) > 0 { - if !a.processNextPendingAllocationForPeer(nextPeer) { - return false +func (a *Allocator) processPendingAllocations() { + for { + if a.peerStatusQueue.Len() == 0 { + return } - a.peerStatusQueue.Update(nextPeer.Index()) - } else { - if nextPeer.totalAllocated > 0 { - return false + nextPeer := a.peerStatusQueue.Peek().(*peerStatus) + + if len(nextPeer.pendingAllocations) > 0 { + if !a.processNextPendingAllocationForPeer(nextPeer) { + return + } + a.peerStatusQueue.Update(nextPeer.Index()) + } else { + if nextPeer.totalAllocated > 0 { + return + } + a.peerStatusQueue.Pop() + target := nextPeer.p + delete(a.peerStatuses, target) } - a.peerStatusQueue.Pop() - target := nextPeer.p - delete(a.peerStatuses, target) } - return true } func (a *Allocator) processNextPendingAllocationForPeer(nextPeer *peerStatus) bool { @@ -208,22 +126,6 @@ func (a *Allocator) processNextPendingAllocationForPeer(nextPeer *peerStatus) bo return true } -type operationType uint64 - -const ( - allocOperation operationType = iota - deallocOperation - deallocPeerOperation -) - -type allocationRequest struct { - p peer.ID - amount uint64 - operation operationType - response chan error - done chan struct{} -} - type peerStatus struct { p peer.ID totalAllocated uint64 @@ -232,7 +134,9 @@ type peerStatus struct { } type pendingAllocation struct { - allocationRequest + p peer.ID + amount uint64 + response chan error allocIndex uint64 } diff --git a/responsemanager/allocator/allocator_test.go b/responsemanager/allocator/allocator_test.go index d72754ec..556dde0b 100644 --- a/responsemanager/allocator/allocator_test.go +++ b/responsemanager/allocator/allocator_test.go @@ -1,9 +1,7 @@ package allocator_test import ( - "context" "testing" - "time" "github.com/ipfs/go-graphsync/responsemanager/allocator" "github.com/ipfs/go-graphsync/testutil" @@ -14,7 +12,6 @@ import ( func TestAllocator(t *testing.T) { peers := testutil.GeneratePeers(3) - ctx := context.Background() testCases := map[string]struct { total uint64 maxPerPeer uint64 @@ -160,10 +157,7 @@ func TestAllocator(t *testing.T) { } for testCase, data := range testCases { t.Run(testCase, func(t *testing.T) { - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - allocator := allocator.NewAllocator(ctx, data.total, data.maxPerPeer) - allocator.Start() + allocator := allocator.NewAllocator(data.total, data.maxPerPeer) totals := map[peer.ID]uint64{} currentTotal := 0 var pending []pendingResult diff --git a/responsemanager/peerresponsemanager/peerresponsesender.go b/responsemanager/peerresponsemanager/peerresponsesender.go index 9ae4f6eb..a679fcc6 100644 --- a/responsemanager/peerresponsemanager/peerresponsesender.go +++ b/responsemanager/peerresponsemanager/peerresponsesender.go @@ -395,11 +395,10 @@ func (prs *peerResponseSender) FinishWithCancel(requestID graphsync.RequestID) { func (prs *peerResponseSender) buildResponse(blkSize uint64, buildResponseFn func(*responsebuilder.ResponseBuilder), notifees []notifications.Notifee) bool { if blkSize > 0 { - allocResponse := prs.allocator.AllocateBlockMemory(prs.p, blkSize) select { + case <-prs.allocator.AllocateBlockMemory(prs.p, blkSize): case <-prs.ctx.Done(): return false - case <-allocResponse: } } prs.responseBuildersLk.Lock() diff --git a/responsemanager/peerresponsemanager/peerresponsesender_test.go b/responsemanager/peerresponsemanager/peerresponsesender_test.go index 70ef3ce0..0fce6181 100644 --- a/responsemanager/peerresponsemanager/peerresponsesender_test.go +++ b/responsemanager/peerresponsemanager/peerresponsesender_test.go @@ -42,8 +42,7 @@ func TestPeerResponseSenderSendsResponses(t *testing.T) { links = append(links, cidlink.Link{Cid: block.Cid()}) } fph := newFakePeerHandler(ctx, t) - allocator := allocator.NewAllocator(ctx, 1<<30, 1<<30) - allocator.Start() + allocator := allocator.NewAllocator(1<<30, 1<<30) peerResponseSender := NewResponseSender(ctx, p, fph, allocator) peerResponseSender.Startup() @@ -128,8 +127,7 @@ func TestPeerResponseSenderSendsVeryLargeBlocksResponses(t *testing.T) { links = append(links, cidlink.Link{Cid: block.Cid()}) } fph := newFakePeerHandler(ctx, t) - allocator := allocator.NewAllocator(ctx, 1<<30, 1<<30) - allocator.Start() + allocator := allocator.NewAllocator(1<<30, 1<<30) peerResponseSender := NewResponseSender(ctx, p, fph, allocator) peerResponseSender.Startup() @@ -190,8 +188,7 @@ func TestPeerResponseSenderSendsExtensionData(t *testing.T) { links = append(links, cidlink.Link{Cid: block.Cid()}) } fph := newFakePeerHandler(ctx, t) - allocator := allocator.NewAllocator(ctx, 1<<30, 1<<30) - allocator.Start() + allocator := allocator.NewAllocator(1<<30, 1<<30) peerResponseSender := NewResponseSender(ctx, p, fph, allocator) peerResponseSender.Startup() @@ -235,8 +232,7 @@ func TestPeerResponseSenderSendsResponsesInTransaction(t *testing.T) { links = append(links, cidlink.Link{Cid: block.Cid()}) } fph := newFakePeerHandler(ctx, t) - allocator := allocator.NewAllocator(ctx, 1<<30, 1<<30) - allocator.Start() + allocator := allocator.NewAllocator(1<<30, 1<<30) peerResponseSender := NewResponseSender(ctx, p, fph, allocator) peerResponseSender.Startup() notifee, notifeeVerifier := testutil.NewTestNotifee("transaction", 10) @@ -279,8 +275,7 @@ func TestPeerResponseSenderIgnoreBlocks(t *testing.T) { links = append(links, cidlink.Link{Cid: block.Cid()}) } fph := newFakePeerHandler(ctx, t) - allocator := allocator.NewAllocator(ctx, 1<<30, 1<<30) - allocator.Start() + allocator := allocator.NewAllocator(1<<30, 1<<30) peerResponseSender := NewResponseSender(ctx, p, fph, allocator) peerResponseSender.Startup() @@ -337,8 +332,7 @@ func TestPeerResponseSenderDupKeys(t *testing.T) { links = append(links, cidlink.Link{Cid: block.Cid()}) } fph := newFakePeerHandler(ctx, t) - allocator := allocator.NewAllocator(ctx, 1<<30, 1<<30) - allocator.Start() + allocator := allocator.NewAllocator(1<<30, 1<<30) peerResponseSender := NewResponseSender(ctx, p, fph, allocator) peerResponseSender.Startup() @@ -407,8 +401,7 @@ func TestPeerResponseSenderSendsResponsesMemoryPressure(t *testing.T) { links = append(links, cidlink.Link{Cid: block.Cid()}) } fph := newFakePeerHandler(ctx, t) - allocator := allocator.NewAllocator(ctx, 300, 300) - allocator.Start() + allocator := allocator.NewAllocator(300, 300) peerResponseSender := NewResponseSender(ctx, p, fph, allocator) peerResponseSender.Startup() From 1e2d1c489ed11308d20f930edb707fa4db1748d6 Mon Sep 17 00:00:00 2001 From: Hannah Howard Date: Tue, 27 Oct 2020 11:07:54 -0700 Subject: [PATCH 5/6] Update responsemanager/allocator/allocator.go Co-authored-by: Steven Allen --- responsemanager/allocator/allocator.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/responsemanager/allocator/allocator.go b/responsemanager/allocator/allocator.go index f9cdbf5d..1690542c 100644 --- a/responsemanager/allocator/allocator.go +++ b/responsemanager/allocator/allocator.go @@ -89,10 +89,7 @@ func (a *Allocator) ReleasePeerMemory(p peer.ID) error { } func (a *Allocator) processPendingAllocations() { - for { - if a.peerStatusQueue.Len() == 0 { - return - } + for a.peerStatusQueue.Len() > 0 { nextPeer := a.peerStatusQueue.Peek().(*peerStatus) if len(nextPeer.pendingAllocations) > 0 { From ab30be70dd27317499d2f407b8123e58b7a40983 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 27 Oct 2020 12:06:46 -0700 Subject: [PATCH 6/6] fix(impl): update constants in readable form --- impl/graphsync.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/impl/graphsync.go b/impl/graphsync.go index 670cb4db..502ee4f7 100644 --- a/impl/graphsync.go +++ b/impl/graphsync.go @@ -27,8 +27,8 @@ import ( var log = logging.Logger("graphsync") const maxRecursionDepth = 100 -const defaultTotalMaxMemory = uint64(1 << 28) -const defaultMaxMemoryPerPeer = uint64(1 << 24) +const defaultTotalMaxMemory = uint64(256 << 20) +const defaultMaxMemoryPerPeer = uint64(16 << 20) // GraphSync is an instance of a GraphSync exchange that implements // the graphsync protocol.