Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Benchmark framework + First memory fixes #89

Merged
merged 4 commits into from
Aug 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 207 additions & 0 deletions benchmarks/benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package graphsync_test

import (
"bytes"
"context"
"crypto/rand"
"fmt"
"io/ioutil"
"os"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-graphsync/benchmarks/testinstance"
tn "github.com/ipfs/go-graphsync/benchmarks/testnet"
blockstore "github.com/ipfs/go-ipfs-blockstore"
chunker "github.com/ipfs/go-ipfs-chunker"
delay "github.com/ipfs/go-ipfs-delay"
offline "github.com/ipfs/go-ipfs-exchange-offline"
files "github.com/ipfs/go-ipfs-files"
ipldformat "github.com/ipfs/go-ipld-format"
"github.com/ipfs/go-merkledag"
"github.com/ipfs/go-unixfs/importer/balanced"
ihelper "github.com/ipfs/go-unixfs/importer/helpers"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
ipldselector "github.com/ipld/go-ipld-prime/traversal/selector"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/stretchr/testify/require"
)

const stdBlockSize = 8000

type runStats struct {
Time time.Duration
Name string
}

var benchmarkLog []runStats

func BenchmarkRoundtripSuccess(b *testing.B) {
ctx := context.Background()
tdm, err := newTempDirMaker(b)
require.NoError(b, err)
b.Run("test-20-10000", func(b *testing.B) {
subtestDistributeAndFetch(ctx, b, 20, delay.Fixed(0), time.Duration(0), allFilesUniformSize(10000), tdm)
})
}

func subtestDistributeAndFetch(ctx context.Context, b *testing.B, numnodes int, d delay.D, bstoreLatency time.Duration, df distFunc, tdm *tempDirMaker) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
net := tn.VirtualNetwork(d)
ig := testinstance.NewTestInstanceGenerator(ctx, net, nil, tdm)
instances, err := ig.Instances(numnodes + b.N)
require.NoError(b, err)
destCids := df(ctx, b, instances[:numnodes])
// Set the blockstore latency on seed nodes
if bstoreLatency > 0 {
for _, i := range instances {
i.SetBlockstoreLatency(bstoreLatency)
}
}
ssb := builder.NewSelectorSpecBuilder(basicnode.Style.Any)

allSelector := ssb.ExploreRecursive(ipldselector.RecursionLimitNone(),
ssb.ExploreAll(ssb.ExploreRecursiveEdge())).Node()

runtime.GC()
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
fetcher := instances[i+numnodes]
var wg sync.WaitGroup
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
require.NoError(b, err)
start := time.Now()
for j := 0; j < numnodes; j++ {
instance := instances[j]
_, errChan := fetcher.Exchange.Request(ctx, instance.Peer, cidlink.Link{Cid: destCids[j]}, allSelector)

wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case err, ok := <-errChan:
if !ok {
return
}
b.Fatalf("received error on request: %s", err.Error())
}
}
}()
}
wg.Wait()
result := runStats{
Time: time.Since(start),
Name: b.Name(),
}
benchmarkLog = append(benchmarkLog, result)
cancel()
fetcher.Close()
}
testinstance.Close(instances)
ig.Close()

}

type distFunc func(ctx context.Context, b *testing.B, provs []testinstance.Instance) []cid.Cid

const unixfsChunkSize uint64 = 1 << 10
const unixfsLinksPerLevel = 1024

func loadRandomUnixFxFile(ctx context.Context, b *testing.B, bs blockstore.Blockstore, size uint64) cid.Cid {

data := make([]byte, size)
_, err := rand.Read(data)
require.NoError(b, err)
buf := bytes.NewReader(data)
file := files.NewReaderFile(buf)

dagService := merkledag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))

// import to UnixFS
bufferedDS := ipldformat.NewBufferedDAG(ctx, dagService)

params := ihelper.DagBuilderParams{
Maxlinks: unixfsLinksPerLevel,
RawLeaves: true,
CidBuilder: nil,
Dagserv: bufferedDS,
}

db, err := params.New(chunker.NewSizeSplitter(file, int64(unixfsChunkSize)))
require.NoError(b, err, "unable to setup dag builder")

nd, err := balanced.Layout(db)
require.NoError(b, err, "unable to create unix fs node")

err = bufferedDS.Commit()
require.NoError(b, err, "unable to commit unix fs node")

return nd.Cid()
}

func allFilesUniformSize(size uint64) distFunc {
return func(ctx context.Context, b *testing.B, provs []testinstance.Instance) []cid.Cid {
cids := make([]cid.Cid, 0, len(provs))
for _, prov := range provs {
c := loadRandomUnixFxFile(ctx, b, prov.BlockStore, size)
cids = append(cids, c)
}
return cids
}
}

type tempDirMaker struct {
tdm string
tempDirSeq int32
b *testing.B
}

var tempDirReplacer struct {
sync.Once
r *strings.Replacer
}

// Cribbed from https://github.com/golang/go/blob/master/src/testing/testing.go#L890
// and modified as needed due to https://github.com/golang/go/issues/41062
func newTempDirMaker(b *testing.B) (*tempDirMaker, error) {
c := &tempDirMaker{}
// ioutil.TempDir doesn't like path separators in its pattern,
// so mangle the name to accommodate subtests.
tempDirReplacer.Do(func() {
tempDirReplacer.r = strings.NewReplacer("/", "_", "\\", "_", ":", "_")
})
pattern := tempDirReplacer.r.Replace(b.Name())

var err error
c.tdm, err = ioutil.TempDir("", pattern)
if err != nil {
return nil, err
}
b.Cleanup(func() {
if err := os.RemoveAll(c.tdm); err != nil {
b.Errorf("TempDir RemoveAll cleanup: %v", err)
}
})
return c, nil
}

func (tdm *tempDirMaker) TempDir() string {
seq := atomic.AddInt32(&tdm.tempDirSeq, 1)
dir := fmt.Sprintf("%s%c%03d", tdm.tdm, os.PathSeparator, seq)
if err := os.Mkdir(dir, 0777); err != nil {
tdm.b.Fatalf("TempDir: %v", err)
}
return dir
}
169 changes: 169 additions & 0 deletions benchmarks/testinstance/testinstance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package testinstance

import (
"context"
"time"

"github.com/ipfs/go-datastore"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/delayed"
ds_sync "github.com/ipfs/go-datastore/sync"
graphsync "github.com/ipfs/go-graphsync"
tn "github.com/ipfs/go-graphsync/benchmarks/testnet"
gsimpl "github.com/ipfs/go-graphsync/impl"
gsnet "github.com/ipfs/go-graphsync/network"
"github.com/ipfs/go-graphsync/storeutil"
blockstore "github.com/ipfs/go-ipfs-blockstore"
delay "github.com/ipfs/go-ipfs-delay"
"github.com/ipld/go-ipld-prime"
peer "github.com/libp2p/go-libp2p-core/peer"
p2ptestutil "github.com/libp2p/go-libp2p-netutil"
tnet "github.com/libp2p/go-libp2p-testing/net"
)

// TempDirGenerator is any interface that can generate temporary directories
type TempDirGenerator interface {
TempDir() string
}

// NewTestInstanceGenerator generates a new InstanceGenerator for the given
// testnet
func NewTestInstanceGenerator(ctx context.Context, net tn.Network, gsOptions []gsimpl.Option, tempDirGenerator TempDirGenerator) InstanceGenerator {
ctx, cancel := context.WithCancel(ctx)
return InstanceGenerator{
net: net,
seq: 0,
ctx: ctx, // TODO take ctx as param to Next, Instances
cancel: cancel,
gsOptions: gsOptions,
tempDirGenerator: tempDirGenerator,
}
}

// InstanceGenerator generates new test instances of bitswap+dependencies
type InstanceGenerator struct {
seq int
net tn.Network
ctx context.Context
cancel context.CancelFunc
gsOptions []gsimpl.Option
tempDirGenerator TempDirGenerator
}

// Close closes the clobal context, shutting down all test instances
func (g *InstanceGenerator) Close() error {
g.cancel()
return nil // for Closer interface
}

// Next generates a new instance of graphsync + dependencies
func (g *InstanceGenerator) Next() (Instance, error) {
g.seq++
p, err := p2ptestutil.RandTestBogusIdentity()
if err != nil {
return Instance{}, err
}
return NewInstance(g.ctx, g.net, p, g.gsOptions, g.tempDirGenerator.TempDir())
}

// Instances creates N test instances of bitswap + dependencies and connects
// them to each other
func (g *InstanceGenerator) Instances(n int) ([]Instance, error) {
var instances []Instance
for j := 0; j < n; j++ {
inst, err := g.Next()
if err != nil {
return nil, err
}
instances = append(instances, inst)
}
ConnectInstances(instances)
return instances, nil
}

// ConnectInstances connects the given instances to each other
func ConnectInstances(instances []Instance) {
for i, inst := range instances {
for j := i + 1; j < len(instances); j++ {
oinst := instances[j]
err := inst.Adapter.ConnectTo(context.Background(), oinst.Peer)
if err != nil {
panic(err.Error())
}
}
}
}

// Close closes multiple instances at once
func Close(instances []Instance) error {
for _, i := range instances {
if err := i.Close(); err != nil {
return err
}
}
return nil
}

// Instance is a test instance of bitswap + dependencies for integration testing
type Instance struct {
Peer peer.ID
Loader ipld.Loader
Storer ipld.Storer
Exchange graphsync.GraphExchange
BlockStore blockstore.Blockstore
Adapter gsnet.GraphSyncNetwork
blockstoreDelay delay.D
ds datastore.Batching
}

// Close closes the associated datastore
func (i *Instance) Close() error {
return i.ds.Close()
}

// Blockstore returns the block store for this test instance
func (i *Instance) Blockstore() blockstore.Blockstore {
return i.BlockStore
}

// SetBlockstoreLatency customizes the artificial delay on receiving blocks
// from a blockstore test instance.
func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration {
return i.blockstoreDelay.Set(t)
}

// NewInstance creates a test bitswap instance.
//
// NB: It's easy make mistakes by providing the same peer ID to two different
// instances. To safeguard, use the InstanceGenerator to generate instances. It's
// just a much better idea.
func NewInstance(ctx context.Context, net tn.Network, p tnet.Identity, gsOptions []gsimpl.Option, tempDir string) (Instance, error) {
bsdelay := delay.Fixed(0)

adapter := net.Adapter(p)
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore, err := blockstore.CachedBlockstore(ctx,
blockstore.NewBlockstore(dstore),
blockstore.DefaultCacheOpts())
if err != nil {
return Instance{}, err
}

loader := storeutil.LoaderForBlockstore(bstore)
storer := storeutil.StorerForBlockstore(bstore)
gs := gsimpl.New(ctx, adapter, loader, storer, gsOptions...)
gs.RegisterIncomingRequestHook(func(p peer.ID, request graphsync.RequestData, hookActions graphsync.IncomingRequestHookActions) {
hookActions.ValidateRequest()
})

return Instance{
Adapter: adapter,
Peer: p.ID(),
Exchange: gs,
Loader: loader,
Storer: storer,
BlockStore: bstore,
blockstoreDelay: bsdelay,
ds: dstore,
}, nil
}
16 changes: 16 additions & 0 deletions benchmarks/testnet/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package testnet

import (
gsnet "github.com/ipfs/go-graphsync/network"

"github.com/libp2p/go-libp2p-core/peer"
tnet "github.com/libp2p/go-libp2p-testing/net"
)

// Network is an interface for generating graphsync network interfaces
// based on a test network.
type Network interface {
Adapter(tnet.Identity) gsnet.GraphSyncNetwork
HasPeer(peer.ID) bool
}

Loading