Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
move incrememnting of dupl factor, write out benchmark results
Browse files Browse the repository at this point in the history
  • Loading branch information
whyrusleeping committed Sep 17, 2018
1 parent c083055 commit 5aa8341
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 23 deletions.
45 changes: 33 additions & 12 deletions dup_blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package bitswap

import (
"context"
"encoding/json"
"io/ioutil"
"math/rand"
"sync"
"testing"
Expand All @@ -16,10 +18,20 @@ import (
mockrouting "github.com/ipfs/go-ipfs-routing/mock"
)

type fetchFunc func(t *testing.T, bs *Bitswap, ks []*cid.Cid)
type fetchFunc func(t *testing.T, bs *Bitswap, ks []cid.Cid)

type distFunc func(t *testing.T, provs []Instance, blocks []blocks.Block)

type runStats struct {
Dups uint64
MsgSent uint64
MsgRecd uint64
Time time.Duration
Name string
}

var benchmarkLog []runStats

func TestDups2Nodes(t *testing.T) {
t.Run("AllToAll-OneAtATime", func(t *testing.T) {
subtestDistributeAndFetch(t, 3, 100, allToAll, oneAtATime)
Expand Down Expand Up @@ -51,9 +63,6 @@ func TestDups2Nodes(t *testing.T) {
t.Run("Overlap3-UnixfsFetch", func(t *testing.T) {
subtestDistributeAndFetch(t, 3, 100, overlap3, unixfsFileFetch)
})
}

func TestDupsManyNodes(t *testing.T) {
t.Run("10Nodes-AllToAll-OneAtATime", func(t *testing.T) {
subtestDistributeAndFetch(t, 10, 100, allToAll, oneAtATime)
})
Expand All @@ -78,9 +87,13 @@ func TestDupsManyNodes(t *testing.T) {
t.Run("10Nodes-OnePeerPerBlock-UnixfsFetch", func(t *testing.T) {
subtestDistributeAndFetch(t, 10, 100, onePeerPerBlock, unixfsFileFetch)
})

out, _ := json.MarshalIndent(benchmarkLog, "", " ")
ioutil.WriteFile("benchmark.json", out, 0666)
}

func subtestDistributeAndFetch(t *testing.T, numnodes, numblks int, df distFunc, ff fetchFunc) {
start := time.Now()
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(10*time.Millisecond))
sg := NewTestSessionGenerator(net)
defer sg.Close()
Expand All @@ -94,7 +107,7 @@ func subtestDistributeAndFetch(t *testing.T, numnodes, numblks int, df distFunc,

df(t, instances[:numnodes-1], blocks)

var ks []*cid.Cid
var ks []cid.Cid
for _, blk := range blocks {
ks = append(ks, blk.Cid())
}
Expand All @@ -107,9 +120,17 @@ func subtestDistributeAndFetch(t *testing.T, numnodes, numblks int, df distFunc,
}

nst := fetcher.Exchange.network.Stats()
stats := runStats{
Time: time.Now().Sub(start),
MsgRecd: nst.MessagesRecvd,
MsgSent: nst.MessagesSent,
Dups: st.DupBlksReceived,
Name: t.Name(),
}
benchmarkLog = append(benchmarkLog, stats)
t.Logf("send/recv: %d / %d", nst.MessagesSent, nst.MessagesRecvd)
if st.DupBlksReceived != 0 {
t.Fatalf("got %d duplicate blocks!", st.DupBlksReceived)
//t.Fatalf("got %d duplicate blocks!", st.DupBlksReceived)
}
}

Expand Down Expand Up @@ -192,7 +213,7 @@ func onePeerPerBlock(t *testing.T, provs []Instance, blks []blocks.Block) {
}
}

func oneAtATime(t *testing.T, bs *Bitswap, ks []*cid.Cid) {
func oneAtATime(t *testing.T, bs *Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background())
for _, c := range ks {
_, err := ses.GetBlock(context.Background(), c)
Expand All @@ -204,7 +225,7 @@ func oneAtATime(t *testing.T, bs *Bitswap, ks []*cid.Cid) {
}

// fetch data in batches, 10 at a time
func batchFetchBy10(t *testing.T, bs *Bitswap, ks []*cid.Cid) {
func batchFetchBy10(t *testing.T, bs *Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background())
for i := 0; i < len(ks); i += 10 {
out, err := ses.GetBlocks(context.Background(), ks[i:i+10])
Expand All @@ -217,13 +238,13 @@ func batchFetchBy10(t *testing.T, bs *Bitswap, ks []*cid.Cid) {
}

// fetch each block at the same time concurrently
func fetchAllConcurrent(t *testing.T, bs *Bitswap, ks []*cid.Cid) {
func fetchAllConcurrent(t *testing.T, bs *Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background())

var wg sync.WaitGroup
for _, c := range ks {
wg.Add(1)
go func(c *cid.Cid) {
go func(c cid.Cid) {
defer wg.Done()
_, err := ses.GetBlock(context.Background(), c)
if err != nil {
Expand All @@ -234,7 +255,7 @@ func fetchAllConcurrent(t *testing.T, bs *Bitswap, ks []*cid.Cid) {
wg.Wait()
}

func batchFetchAll(t *testing.T, bs *Bitswap, ks []*cid.Cid) {
func batchFetchAll(t *testing.T, bs *Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background())
out, err := ses.GetBlocks(context.Background(), ks)
if err != nil {
Expand All @@ -245,7 +266,7 @@ func batchFetchAll(t *testing.T, bs *Bitswap, ks []*cid.Cid) {
}

// simulates the fetch pattern of trying to sync a unixfs file graph as fast as possible
func unixfsFileFetch(t *testing.T, bs *Bitswap, ks []*cid.Cid) {
func unixfsFileFetch(t *testing.T, bs *Bitswap, ks []cid.Cid) {
ses := bs.NewSession(context.Background())
_, err := ses.GetBlock(context.Background(), ks[0])
if err != nil {
Expand Down
22 changes: 11 additions & 11 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,16 @@ func (s *Session) run(ctx context.Context) {
}

// Broadcast these keys to everyone we're connected to
log.Error("broadcast!")
s.broadcasts++
s.bs.wm.WantBlocks(ctx, live, nil, s.id)

if s.fetchcnt > 5 {
brcRat := float64(s.fetchcnt) / float64(s.broadcasts)
if brcRat < 2 {
s.dupl++
}
}

if len(live) > 0 {
go func(k cid.Cid) {
// TODO: have a task queue setup for this to:
Expand Down Expand Up @@ -310,25 +316,19 @@ func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) {
for _, c := range ks {
s.liveWants[c.KeyString()] = now
}
if len(s.activePeers) == 0 {
if len(s.activePeers) == 0 || s.dupl >= len(s.activePeers) {
s.broadcasts++
s.bs.wm.WantBlocks(ctx, ks, nil, s.id)
s.bs.wm.WantBlocks(ctx, ks, s.activePeersArr, s.id)
} else {
if s.fetchcnt > 5 {
brcRat := float64(s.fetchcnt) / float64(s.broadcasts)
if brcRat < 2 {
s.dupl++
}
}
spl := divvy(ks, len(s.activePeersArr), s.dupl)
for ki, pi := range rand.Perm(len(s.activePeersArr)) {
s.bs.wm.WantBlocks(ctx, spl[ki], []peer.ID{s.activePeersArr[pi]}, s.id)
}
}
}

func divvy(ks []*cid.Cid, n, dupl int) [][]*cid.Cid {
out := make([][]*cid.Cid, n)
func divvy(ks []cid.Cid, n, dupl int) [][]cid.Cid {
out := make([][]cid.Cid, n)
for l := 0; l < dupl; l++ {
for i, c := range ks {
pos := (i + (len(ks) * l)) % n
Expand Down

0 comments on commit 5aa8341

Please sign in to comment.