Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add allocator for memory backpressure #108

Merged
merged 6 commits into from
Oct 27, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 25 additions & 34 deletions benchmarks/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

"github.com/ipfs/go-graphsync/benchmarks/testinstance"
tn "github.com/ipfs/go-graphsync/benchmarks/testnet"
graphsync "github.com/ipfs/go-graphsync/impl"
)

const stdBlockSize = 8000
Expand All @@ -51,27 +52,33 @@ func BenchmarkRoundtripSuccess(b *testing.B) {
tdm, err := newTempDirMaker(b)
require.NoError(b, err)
b.Run("test-20-10000", func(b *testing.B) {
subtestDistributeAndFetch(ctx, b, 20, delay.Fixed(0), time.Duration(0), allFilesUniformSize(10000, defaultUnixfsChunkSize, defaultUnixfsLinksPerLevel), tdm)
subtestDistributeAndFetch(ctx, b, 20, delay.Fixed(0), time.Duration(0), allFilesUniformSize(10000, defaultUnixfsChunkSize, defaultUnixfsLinksPerLevel, true), tdm)
})
b.Run("test-20-128MB", func(b *testing.B) {
subtestDistributeAndFetch(ctx, b, 10, delay.Fixed(0), time.Duration(0), allFilesUniformSize(128*(1<<20), defaultUnixfsChunkSize, defaultUnixfsLinksPerLevel), tdm)
subtestDistributeAndFetch(ctx, b, 20, delay.Fixed(0), time.Duration(0), allFilesUniformSize(128*(1<<20), defaultUnixfsChunkSize, defaultUnixfsLinksPerLevel, true), tdm)
})
b.Run("test-p2p-stress-10-128MB", func(b *testing.B) {
p2pStrestTest(ctx, b, 20, allFilesUniformSize(128*(1<<20), 1<<20, 1024), tdm)
p2pStrestTest(ctx, b, 10, allFilesUniformSize(128*(1<<20), 1<<20, 1024, true), tdm, nil, false)
})
b.Run("test-p2p-stress-10-128MB-1KB-chunks", func(b *testing.B) {
p2pStrestTest(ctx, b, 10, allFilesUniformSize(128*(1<<20), 1<<10, 1024), tdm)
p2pStrestTest(ctx, b, 10, allFilesUniformSize(128*(1<<20), 1<<10, 1024, true), tdm, nil, false)
})
b.Run("test-p2p-stress-1-1GB-memory-pressure", func(b *testing.B) {
p2pStrestTest(ctx, b, 1, allFilesUniformSize(1*(1<<30), 1<<20, 1024, true), tdm, []graphsync.Option{graphsync.MaxMemoryResponder(1 << 27)}, true)
})
b.Run("test-p2p-stress-1-1GB-memory-pressure-no-raw-nodes", func(b *testing.B) {
p2pStrestTest(ctx, b, 1, allFilesUniformSize(1*(1<<30), 1<<20, 1024, false), tdm, []graphsync.Option{graphsync.MaxMemoryResponder(1 << 27)}, true)
})
b.Run("test-repeated-disconnects-20-10000", func(b *testing.B) {
benchmarkRepeatedDisconnects(ctx, b, 20, allFilesUniformSize(10000, defaultUnixfsChunkSize, defaultUnixfsLinksPerLevel), tdm)
benchmarkRepeatedDisconnects(ctx, b, 20, allFilesUniformSize(10000, defaultUnixfsChunkSize, defaultUnixfsLinksPerLevel, true), tdm)
})
}

func benchmarkRepeatedDisconnects(ctx context.Context, b *testing.B, numnodes int, df distFunc, tdm *tempDirMaker) {
ctx, cancel := context.WithCancel(ctx)
mn := mocknet.New(ctx)
net := tn.StreamNet(ctx, mn)
ig := testinstance.NewTestInstanceGenerator(ctx, net, nil, tdm)
ig := testinstance.NewTestInstanceGenerator(ctx, net, nil, tdm, false)
instances, err := ig.Instances(numnodes + 1)
require.NoError(b, err)
var allCids [][]cid.Cid
Expand Down Expand Up @@ -132,13 +139,13 @@ func benchmarkRepeatedDisconnects(ctx context.Context, b *testing.B, numnodes in
ig.Close()
}

func p2pStrestTest(ctx context.Context, b *testing.B, numfiles int, df distFunc, tdm *tempDirMaker) {
func p2pStrestTest(ctx context.Context, b *testing.B, numfiles int, df distFunc, tdm *tempDirMaker, options []graphsync.Option, diskBasedDatastore bool) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
mn := mocknet.New(ctx)
mn.SetLinkDefaults(mocknet.LinkOptions{Latency: 100 * time.Millisecond, Bandwidth: 3000000})
net := tn.StreamNet(ctx, mn)
ig := testinstance.NewTestInstanceGenerator(ctx, net, nil, tdm)
ig := testinstance.NewTestInstanceGenerator(ctx, net, options, tdm, diskBasedDatastore)
instances, err := ig.Instances(1 + b.N)
require.NoError(b, err)
var allCids []cid.Cid
Expand All @@ -160,32 +167,16 @@ func p2pStrestTest(ctx context.Context, b *testing.B, numfiles int, df distFunc,
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
require.NoError(b, err)
start := time.Now()
disconnectOn := rand.Intn(numfiles)
for j := 0; j < numfiles; j++ {
resultChan, errChan := fetcher.Exchange.Request(ctx, instances[0].Peer, cidlink.Link{Cid: allCids[j]}, allSelector)
responseChan, errChan := fetcher.Exchange.Request(ctx, instances[0].Peer, cidlink.Link{Cid: allCids[j]}, allSelector)

wg.Add(1)
go func(j int) {
defer wg.Done()
results := 0
for {
select {
case <-ctx.Done():
return
case <-resultChan:
results++
if results == 100 && j == disconnectOn {
mn.DisconnectPeers(instances[0].Peer, instances[i+1].Peer)
mn.UnlinkPeers(instances[0].Peer, instances[i+1].Peer)
time.Sleep(100 * time.Millisecond)
mn.LinkPeers(instances[0].Peer, instances[i+1].Peer)
}
case err, ok := <-errChan:
if !ok {
return
}
b.Fatalf("received error on request: %s", err.Error())
}
for _ = range responseChan {
}
for err := range errChan {
b.Fatalf("received error on request: %s", err.Error())
}
}(j)
}
Expand All @@ -205,7 +196,7 @@ func subtestDistributeAndFetch(ctx context.Context, b *testing.B, numnodes int,
ctx, cancel := context.WithCancel(ctx)
defer cancel()
net := tn.VirtualNetwork(d)
ig := testinstance.NewTestInstanceGenerator(ctx, net, nil, tdm)
ig := testinstance.NewTestInstanceGenerator(ctx, net, nil, tdm, false)
instances, err := ig.Instances(numnodes + b.N)
require.NoError(b, err)
destCids := df(ctx, b, instances[:numnodes])
Expand Down Expand Up @@ -268,7 +259,7 @@ type distFunc func(ctx context.Context, b *testing.B, provs []testinstance.Insta
const defaultUnixfsChunkSize uint64 = 1 << 10
const defaultUnixfsLinksPerLevel = 1024

func loadRandomUnixFxFile(ctx context.Context, b *testing.B, bs blockstore.Blockstore, size uint64, unixfsChunkSize uint64, unixfsLinksPerLevel int) cid.Cid {
func loadRandomUnixFxFile(ctx context.Context, b *testing.B, bs blockstore.Blockstore, size uint64, unixfsChunkSize uint64, unixfsLinksPerLevel int, useRawNodes bool) cid.Cid {

data := make([]byte, size)
_, err := rand.Read(data)
Expand All @@ -283,7 +274,7 @@ func loadRandomUnixFxFile(ctx context.Context, b *testing.B, bs blockstore.Block

params := ihelper.DagBuilderParams{
Maxlinks: unixfsLinksPerLevel,
RawLeaves: true,
RawLeaves: useRawNodes,
CidBuilder: nil,
Dagserv: bufferedDS,
}
Expand All @@ -300,11 +291,11 @@ func loadRandomUnixFxFile(ctx context.Context, b *testing.B, bs blockstore.Block
return nd.Cid()
}

func allFilesUniformSize(size uint64, unixfsChunkSize uint64, unixfsLinksPerLevel int) distFunc {
func allFilesUniformSize(size uint64, unixfsChunkSize uint64, unixfsLinksPerLevel int, useRawNodes bool) distFunc {
return func(ctx context.Context, b *testing.B, provs []testinstance.Instance) []cid.Cid {
cids := make([]cid.Cid, 0, len(provs))
for _, prov := range provs {
c := loadRandomUnixFxFile(ctx, b, prov.BlockStore, size, unixfsChunkSize, unixfsLinksPerLevel)
c := loadRandomUnixFxFile(ctx, b, prov.BlockStore, size, unixfsChunkSize, unixfsLinksPerLevel, useRawNodes)
cids = append(cids, c)
}
return cids
Expand Down
47 changes: 31 additions & 16 deletions benchmarks/testinstance/testinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/delayed"
ds_sync "github.com/ipfs/go-datastore/sync"
badgerds "github.com/ipfs/go-ds-badger"
blockstore "github.com/ipfs/go-ipfs-blockstore"
delay "github.com/ipfs/go-ipfs-delay"
"github.com/ipld/go-ipld-prime"
Expand All @@ -29,26 +30,28 @@ type TempDirGenerator interface {

// NewTestInstanceGenerator generates a new InstanceGenerator for the given
// testnet
func NewTestInstanceGenerator(ctx context.Context, net tn.Network, gsOptions []gsimpl.Option, tempDirGenerator TempDirGenerator) InstanceGenerator {
func NewTestInstanceGenerator(ctx context.Context, net tn.Network, gsOptions []gsimpl.Option, tempDirGenerator TempDirGenerator, diskBasedDatastore bool) InstanceGenerator {
ctx, cancel := context.WithCancel(ctx)
return InstanceGenerator{
net: net,
seq: 0,
ctx: ctx, // TODO take ctx as param to Next, Instances
cancel: cancel,
gsOptions: gsOptions,
tempDirGenerator: tempDirGenerator,
net: net,
seq: 0,
ctx: ctx, // TODO take ctx as param to Next, Instances
cancel: cancel,
gsOptions: gsOptions,
tempDirGenerator: tempDirGenerator,
diskBasedDatastore: diskBasedDatastore,
}
}

// InstanceGenerator generates new test instances of bitswap+dependencies
type InstanceGenerator struct {
seq int
net tn.Network
ctx context.Context
cancel context.CancelFunc
gsOptions []gsimpl.Option
tempDirGenerator TempDirGenerator
seq int
net tn.Network
ctx context.Context
cancel context.CancelFunc
gsOptions []gsimpl.Option
tempDirGenerator TempDirGenerator
diskBasedDatastore bool
}

// Close closes the clobal context, shutting down all test instances
Expand All @@ -64,7 +67,7 @@ func (g *InstanceGenerator) Next() (Instance, error) {
if err != nil {
return Instance{}, err
}
return NewInstance(g.ctx, g.net, p, g.gsOptions, g.tempDirGenerator.TempDir())
return NewInstance(g.ctx, g.net, p, g.gsOptions, g.tempDirGenerator.TempDir(), g.diskBasedDatastore)
}

// Instances creates N test instances of bitswap + dependencies and connects
Expand Down Expand Up @@ -138,11 +141,23 @@ func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration {
// NB: It's easy make mistakes by providing the same peer ID to two different
// instances. To safeguard, use the InstanceGenerator to generate instances. It's
// just a much better idea.
func NewInstance(ctx context.Context, net tn.Network, p tnet.Identity, gsOptions []gsimpl.Option, tempDir string) (Instance, error) {
func NewInstance(ctx context.Context, net tn.Network, p tnet.Identity, gsOptions []gsimpl.Option, tempDir string, diskBasedDatastore bool) (Instance, error) {
bsdelay := delay.Fixed(0)

adapter := net.Adapter(p)
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
var dstore datastore.Batching
var err error
if diskBasedDatastore {
defopts := badgerds.DefaultOptions
defopts.SyncWrites = false
defopts.Truncate = true
dstore, err = badgerds.NewDatastore(tempDir, &defopts)
if err != nil {
return Instance{}, err
}
} else {
dstore = ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
}
bstore, err := blockstore.CachedBlockstore(ctx,
blockstore.NewBlockstore(dstore),
blockstore.DefaultCacheOpts())
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ require (
github.com/ipfs/go-blockservice v0.1.3
github.com/ipfs/go-cid v0.0.6
github.com/ipfs/go-datastore v0.4.4
github.com/ipfs/go-ds-badger v0.2.1
github.com/ipfs/go-ipfs-blockstore v0.1.4
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-chunker v0.0.5
github.com/ipfs/go-ipfs-delay v0.0.1
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
github.com/ipfs/go-ipfs-files v0.0.8
github.com/ipfs/go-ipfs-pq v0.0.2
github.com/ipfs/go-ipfs-routing v0.1.0
github.com/ipfs/go-ipfs-util v0.0.1
github.com/ipfs/go-ipld-cbor v0.0.4 // indirect
Expand All @@ -32,6 +34,7 @@ require (
github.com/libp2p/go-libp2p v0.6.0
github.com/libp2p/go-libp2p-core v0.5.0
github.com/libp2p/go-libp2p-netutil v0.1.0
github.com/libp2p/go-libp2p-peer v0.2.0
github.com/libp2p/go-libp2p-record v0.1.1 // indirect
github.com/libp2p/go-libp2p-testing v0.1.1
github.com/libp2p/go-msgio v0.0.6
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/AndreasBriese/bbloom v0.0.0-20180913140656-343706a395b7/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9 h1:HD8gA2tkByhMAwYaFAX9w2l7vxvBQ5NMoxDrkhqhtn4=
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
Expand Down Expand Up @@ -168,6 +169,7 @@ github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ
github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps=
github.com/ipfs/go-ds-badger v0.0.2/go.mod h1:Y3QpeSFWQf6MopLTiZD+VT6IC1yZqaGmjvRcKeSGij8=
github.com/ipfs/go-ds-badger v0.0.5/go.mod h1:g5AuuCGmr7efyzQhLL8MzwqcauPojGPUaHzfGTzuE3s=
github.com/ipfs/go-ds-badger v0.2.1 h1:RsC9DDlwFhFdfT+s2PeC8joxbSp2YMufK8w/RBOxKtk=
github.com/ipfs/go-ds-badger v0.2.1/go.mod h1:Tx7l3aTph3FMFrRS838dcSJh+jjA7cX9DrGVwx/NOwE=
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
Expand Down
37 changes: 30 additions & 7 deletions impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/ipfs/go-graphsync/requestmanager/asyncloader"
requestorhooks "github.com/ipfs/go-graphsync/requestmanager/hooks"
"github.com/ipfs/go-graphsync/responsemanager"
"github.com/ipfs/go-graphsync/responsemanager/allocator"
responderhooks "github.com/ipfs/go-graphsync/responsemanager/hooks"
"github.com/ipfs/go-graphsync/responsemanager/peerresponsemanager"
"github.com/ipfs/go-graphsync/responsemanager/persistenceoptions"
Expand All @@ -26,6 +27,8 @@ import (
var log = logging.Logger("graphsync")

const maxRecursionDepth = 100
const defaultTotalMaxMemory = uint64(1 << 28)
const defaultMaxMemoryPerPeer = uint64(1 << 24)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Personally, I think in MiB, GiB, etc. I'd find these easier to read as uint64(256<<20) and uint64(16<<20).


// GraphSync is an instance of a GraphSync exchange that implements
// the graphsync protocol.
Expand Down Expand Up @@ -53,6 +56,9 @@ type GraphSync struct {
ctx context.Context
cancel context.CancelFunc
unregisterDefaultValidator graphsync.UnregisterHookFunc
allocator *allocator.Allocator
totalMaxMemory uint64
maxMemoryPerPeer uint64
}

// Option defines the functional option type that can be used to configure
Expand All @@ -67,6 +73,18 @@ func RejectAllRequestsByDefault() Option {
}
}

func MaxMemoryResponder(totalMaxMemory uint64) Option {
return func(gs *GraphSync) {
gs.totalMaxMemory = totalMaxMemory
}
}

func MaxMemoryPerPeerResponder(maxMemoryPerPeer uint64) Option {
return func(gs *GraphSync) {
gs.maxMemoryPerPeer = maxMemoryPerPeer
}
}

// New creates a new GraphSync Exchange on the given network,
// and the given link loader+storer.
func New(parent context.Context, network gsnet.GraphSyncNetwork,
Expand All @@ -83,10 +101,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
incomingBlockHooks := requestorhooks.NewBlockHooks()
requestManager := requestmanager.New(ctx, asyncLoader, outgoingRequestHooks, incomingResponseHooks, incomingBlockHooks)
peerTaskQueue := peertaskqueue.New()
createdResponseQueue := func(ctx context.Context, p peer.ID) peerresponsemanager.PeerResponseSender {
return peerresponsemanager.NewResponseSender(ctx, p, peerManager)
}
peerResponseManager := peerresponsemanager.New(ctx, createdResponseQueue)

persistenceOptions := persistenceoptions.New()
incomingRequestHooks := responderhooks.NewRequestHooks(persistenceOptions)
outgoingBlockHooks := responderhooks.NewBlockHooks()
Expand All @@ -95,7 +110,6 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
requestorCancelledListeners := responderhooks.NewRequestorCancelledListeners()
blockSentListeners := responderhooks.NewBlockSentListeners()
networkErrorListeners := responderhooks.NewNetworkErrorListeners()
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners)
unregisterDefaultValidator := incomingRequestHooks.Register(selectorvalidator.SelectorValidator(maxRecursionDepth))
graphSync := &GraphSync{
network: network,
Expand All @@ -116,8 +130,8 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
outgoingRequestHooks: outgoingRequestHooks,
incomingBlockHooks: incomingBlockHooks,
peerTaskQueue: peerTaskQueue,
peerResponseManager: peerResponseManager,
responseManager: responseManager,
totalMaxMemory: defaultTotalMaxMemory,
maxMemoryPerPeer: defaultMaxMemoryPerPeer,
ctx: ctx,
cancel: cancel,
unregisterDefaultValidator: unregisterDefaultValidator,
Expand All @@ -126,6 +140,15 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
for _, option := range options {
option(graphSync)
}
allocator := allocator.NewAllocator(graphSync.totalMaxMemory, graphSync.maxMemoryPerPeer)
graphSync.allocator = allocator
createdResponseQueue := func(ctx context.Context, p peer.ID) peerresponsemanager.PeerResponseSender {
return peerresponsemanager.NewResponseSender(ctx, p, peerManager, allocator)
}
peerResponseManager := peerresponsemanager.New(ctx, createdResponseQueue)
graphSync.peerResponseManager = peerResponseManager
responseManager := responsemanager.New(ctx, loader, peerResponseManager, peerTaskQueue, incomingRequestHooks, outgoingBlockHooks, requestUpdatedHooks, completedResponseListeners, requestorCancelledListeners, blockSentListeners, networkErrorListeners)
graphSync.responseManager = responseManager

asyncLoader.Startup()
requestManager.SetDelegate(peerManager)
Expand Down
Loading