This repository has been archived by the owner on Feb 1, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 112
[WIP] Work towards improving the duplicate blocks issue #8
Closed
Closed
Changes from all commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
5b21247
add a test to reproduce duplicate blocks issue
whyrusleeping 17eecb4
more test layouts
whyrusleeping 1c24ea1
more test layouts, again
whyrusleeping 6d17e7f
refactor tests, make it easier to run tables
whyrusleeping 54bce93
More test scenarios!
whyrusleeping 60e7992
reorg in dup tests
whyrusleeping 832b3a3
improvement attempt number 1
whyrusleeping 1ddefae
add statistics for network messages sent/recvd
whyrusleeping ea5e626
WIP: working on dupl factor scaling
whyrusleeping 6d0f460
move incrememnting of dupl factor, write out benchmark results
whyrusleeping 4447483
fix session exchange interface implementation
whyrusleeping 5d4e4bb
avoid using constants as both sentinel values and values.
Stebalien File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,292 @@ | ||
package bitswap | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"io/ioutil" | ||
"math/rand" | ||
"sync" | ||
"testing" | ||
"time" | ||
|
||
tn "github.com/ipfs/go-bitswap/testnet" | ||
|
||
"github.com/ipfs/go-block-format" | ||
cid "github.com/ipfs/go-cid" | ||
blocksutil "github.com/ipfs/go-ipfs-blocksutil" | ||
delay "github.com/ipfs/go-ipfs-delay" | ||
mockrouting "github.com/ipfs/go-ipfs-routing/mock" | ||
) | ||
|
||
type fetchFunc func(t *testing.T, bs *Bitswap, ks []cid.Cid) | ||
|
||
type distFunc func(t *testing.T, provs []Instance, blocks []blocks.Block) | ||
|
||
type runStats struct { | ||
Dups uint64 | ||
MsgSent uint64 | ||
MsgRecd uint64 | ||
Time time.Duration | ||
Name string | ||
} | ||
|
||
var benchmarkLog []runStats | ||
|
||
func TestDups2Nodes(t *testing.T) { | ||
t.Run("AllToAll-OneAtATime", func(t *testing.T) { | ||
subtestDistributeAndFetch(t, 3, 100, allToAll, oneAtATime) | ||
}) | ||
t.Run("AllToAll-BigBatch", func(t *testing.T) { | ||
subtestDistributeAndFetch(t, 3, 100, allToAll, batchFetchAll) | ||
}) | ||
|
||
t.Run("Overlap1-OneAtATime", func(t *testing.T) { | ||
subtestDistributeAndFetch(t, 3, 100, overlap1, oneAtATime) | ||
}) | ||
|
||
t.Run("Overlap2-BatchBy10", func(t *testing.T) { | ||
subtestDistributeAndFetch(t, 3, 100, overlap2, batchFetchBy10) | ||
}) | ||
|
||
t.Run("Overlap3-OneAtATime", func(t *testing.T) { | ||
subtestDistributeAndFetch(t, 3, 100, overlap3, oneAtATime) | ||
}) | ||
t.Run("Overlap3-BatchBy10", func(t *testing.T) { | ||
subtestDistributeAndFetch(t, 3, 100, overlap3, batchFetchBy10) | ||
}) | ||
t.Run("Overlap3-AllConcurrent", func(t *testing.T) { | ||
subtestDistributeAndFetch(t, 3, 100, overlap3, fetchAllConcurrent) | ||
}) | ||
t.Run("Overlap3-BigBatch", func(t *testing.T) { | ||
subtestDistributeAndFetch(t, 3, 100, overlap3, batchFetchAll) | ||
}) | ||
t.Run("Overlap3-UnixfsFetch", func(t *testing.T) { | ||
subtestDistributeAndFetch(t, 3, 100, overlap3, unixfsFileFetch) | ||
}) | ||
t.Run("10Nodes-AllToAll-OneAtATime", func(t *testing.T) { | ||
subtestDistributeAndFetch(t, 10, 100, allToAll, oneAtATime) | ||
}) | ||
t.Run("10Nodes-AllToAll-BatchFetchBy10", func(t *testing.T) { | ||
subtestDistributeAndFetch(t, 10, 100, allToAll, batchFetchBy10) | ||
}) | ||
t.Run("10Nodes-AllToAll-BigBatch", func(t *testing.T) { | ||
subtestDistributeAndFetch(t, 10, 100, allToAll, batchFetchAll) | ||
}) | ||
t.Run("10Nodes-AllToAll-AllConcurrent", func(t *testing.T) { | ||
subtestDistributeAndFetch(t, 10, 100, allToAll, fetchAllConcurrent) | ||
}) | ||
t.Run("10Nodes-AllToAll-UnixfsFetch", func(t *testing.T) { | ||
subtestDistributeAndFetch(t, 10, 100, allToAll, unixfsFileFetch) | ||
}) | ||
t.Run("10Nodes-OnePeerPerBlock-OneAtATime", func(t *testing.T) { | ||
subtestDistributeAndFetch(t, 10, 100, onePeerPerBlock, oneAtATime) | ||
}) | ||
t.Run("10Nodes-OnePeerPerBlock-BigBatch", func(t *testing.T) { | ||
subtestDistributeAndFetch(t, 10, 100, onePeerPerBlock, batchFetchAll) | ||
}) | ||
t.Run("10Nodes-OnePeerPerBlock-UnixfsFetch", func(t *testing.T) { | ||
subtestDistributeAndFetch(t, 10, 100, onePeerPerBlock, unixfsFileFetch) | ||
}) | ||
t.Run("200Nodes-AllToAll-BigBatch", func(t *testing.T) { | ||
subtestDistributeAndFetch(t, 200, 20, allToAll, batchFetchAll) | ||
}) | ||
|
||
out, _ := json.MarshalIndent(benchmarkLog, "", " ") | ||
ioutil.WriteFile("benchmark.json", out, 0666) | ||
} | ||
|
||
func subtestDistributeAndFetch(t *testing.T, numnodes, numblks int, df distFunc, ff fetchFunc) { | ||
start := time.Now() | ||
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(10*time.Millisecond)) | ||
sg := NewTestSessionGenerator(net) | ||
defer sg.Close() | ||
|
||
bg := blocksutil.NewBlockGenerator() | ||
|
||
instances := sg.Instances(numnodes) | ||
blocks := bg.Blocks(numblks) | ||
|
||
fetcher := instances[numnodes-1] | ||
|
||
df(t, instances[:numnodes-1], blocks) | ||
|
||
var ks []cid.Cid | ||
for _, blk := range blocks { | ||
ks = append(ks, blk.Cid()) | ||
} | ||
|
||
ff(t, fetcher.Exchange, ks) | ||
|
||
st, err := fetcher.Exchange.Stat() | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
nst := fetcher.Exchange.network.Stats() | ||
stats := runStats{ | ||
Time: time.Now().Sub(start), | ||
MsgRecd: nst.MessagesRecvd, | ||
MsgSent: nst.MessagesSent, | ||
Dups: st.DupBlksReceived, | ||
Name: t.Name(), | ||
} | ||
benchmarkLog = append(benchmarkLog, stats) | ||
t.Logf("send/recv: %d / %d", nst.MessagesSent, nst.MessagesRecvd) | ||
if st.DupBlksReceived != 0 { | ||
t.Fatalf("got %d duplicate blocks!", st.DupBlksReceived) | ||
} | ||
} | ||
|
||
func allToAll(t *testing.T, provs []Instance, blocks []blocks.Block) { | ||
for _, p := range provs { | ||
if err := p.Blockstore().PutMany(blocks); err != nil { | ||
t.Fatal(err) | ||
} | ||
} | ||
} | ||
|
||
// overlap1 gives the first 75 blocks to the first peer, and the last 75 blocks | ||
// to the second peer. This means both peers have the middle 50 blocks | ||
func overlap1(t *testing.T, provs []Instance, blks []blocks.Block) { | ||
if len(provs) != 2 { | ||
t.Fatal("overlap1 only works with 2 provs") | ||
} | ||
bill := provs[0] | ||
jeff := provs[1] | ||
|
||
if err := bill.Blockstore().PutMany(blks[:75]); err != nil { | ||
t.Fatal(err) | ||
} | ||
if err := jeff.Blockstore().PutMany(blks[25:]); err != nil { | ||
t.Fatal(err) | ||
} | ||
} | ||
|
||
// overlap2 gives every even numbered block to the first peer, odd numbered | ||
// blocks to the second. it also gives every third block to both peers | ||
func overlap2(t *testing.T, provs []Instance, blks []blocks.Block) { | ||
if len(provs) != 2 { | ||
t.Fatal("overlap2 only works with 2 provs") | ||
} | ||
bill := provs[0] | ||
jeff := provs[1] | ||
|
||
bill.Blockstore().Put(blks[0]) | ||
jeff.Blockstore().Put(blks[0]) | ||
for i, blk := range blks { | ||
if i%3 == 0 { | ||
bill.Blockstore().Put(blk) | ||
jeff.Blockstore().Put(blk) | ||
} else if i%2 == 1 { | ||
bill.Blockstore().Put(blk) | ||
} else { | ||
jeff.Blockstore().Put(blk) | ||
} | ||
} | ||
} | ||
|
||
func overlap3(t *testing.T, provs []Instance, blks []blocks.Block) { | ||
if len(provs) != 2 { | ||
t.Fatal("overlap3 only works with 2 provs") | ||
} | ||
|
||
bill := provs[0] | ||
jeff := provs[1] | ||
|
||
bill.Blockstore().Put(blks[0]) | ||
jeff.Blockstore().Put(blks[0]) | ||
for i, blk := range blks { | ||
if i%3 == 0 { | ||
bill.Blockstore().Put(blk) | ||
jeff.Blockstore().Put(blk) | ||
} else if i%2 == 1 { | ||
bill.Blockstore().Put(blk) | ||
} else { | ||
jeff.Blockstore().Put(blk) | ||
} | ||
} | ||
} | ||
|
||
// onePeerPerBlock picks a random peer to hold each block | ||
// with this layout, we shouldnt actually ever see any duplicate blocks | ||
// but we're mostly just testing performance of the sync algorithm | ||
func onePeerPerBlock(t *testing.T, provs []Instance, blks []blocks.Block) { | ||
for _, blk := range blks { | ||
provs[rand.Intn(len(provs))].Blockstore().Put(blk) | ||
} | ||
} | ||
|
||
func oneAtATime(t *testing.T, bs *Bitswap, ks []cid.Cid) { | ||
ses := bs.NewSession(context.Background()).(*Session) | ||
for _, c := range ks { | ||
_, err := ses.GetBlock(context.Background(), c) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
} | ||
t.Logf("Session fetch latency: %s", ses.latTotal/time.Duration(ses.fetchcnt)) | ||
} | ||
|
||
// fetch data in batches, 10 at a time | ||
func batchFetchBy10(t *testing.T, bs *Bitswap, ks []cid.Cid) { | ||
ses := bs.NewSession(context.Background()) | ||
for i := 0; i < len(ks); i += 10 { | ||
out, err := ses.GetBlocks(context.Background(), ks[i:i+10]) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
for range out { | ||
} | ||
} | ||
} | ||
|
||
// fetch each block at the same time concurrently | ||
func fetchAllConcurrent(t *testing.T, bs *Bitswap, ks []cid.Cid) { | ||
ses := bs.NewSession(context.Background()) | ||
|
||
var wg sync.WaitGroup | ||
for _, c := range ks { | ||
wg.Add(1) | ||
go func(c cid.Cid) { | ||
defer wg.Done() | ||
_, err := ses.GetBlock(context.Background(), c) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
}(c) | ||
} | ||
wg.Wait() | ||
} | ||
|
||
func batchFetchAll(t *testing.T, bs *Bitswap, ks []cid.Cid) { | ||
ses := bs.NewSession(context.Background()) | ||
out, err := ses.GetBlocks(context.Background(), ks) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
for range out { | ||
} | ||
} | ||
|
||
// simulates the fetch pattern of trying to sync a unixfs file graph as fast as possible | ||
func unixfsFileFetch(t *testing.T, bs *Bitswap, ks []cid.Cid) { | ||
ses := bs.NewSession(context.Background()) | ||
_, err := ses.GetBlock(context.Background(), ks[0]) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
out, err := ses.GetBlocks(context.Background(), ks[1:11]) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
for range out { | ||
} | ||
|
||
out, err = ses.GetBlocks(context.Background(), ks[11:]) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
for range out { | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,7 @@ import ( | |
"context" | ||
"fmt" | ||
"io" | ||
"sync/atomic" | ||
"time" | ||
|
||
bsmsg "github.com/ipfs/go-bitswap/message" | ||
|
@@ -48,6 +49,8 @@ type impl struct { | |
|
||
// inbound messages from the network are forwarded to the receiver | ||
receiver Receiver | ||
|
||
stats NetworkStats | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe this needs to go at the top of the struct as it needs to be 64-bit aligned:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The compiler should really catch that for us... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should, but it doesn't... |
||
} | ||
|
||
type streamMessageSender struct { | ||
|
@@ -130,6 +133,8 @@ func (bsnet *impl) SendMessage( | |
s.Reset() | ||
return err | ||
} | ||
atomic.AddUint64(&bsnet.stats.MessagesSent, 1) | ||
|
||
// TODO(https://github.com/libp2p/go-libp2p-net/issues/28): Avoid this goroutine. | ||
go inet.AwaitEOF(s) | ||
return s.Close() | ||
|
@@ -210,13 +215,21 @@ func (bsnet *impl) handleNewStream(s inet.Stream) { | |
ctx := context.Background() | ||
log.Debugf("bitswap net handleNewStream from %s", s.Conn().RemotePeer()) | ||
bsnet.receiver.ReceiveMessage(ctx, p, received) | ||
atomic.AddUint64(&bsnet.stats.MessagesRecvd, 1) | ||
} | ||
} | ||
|
||
func (bsnet *impl) ConnectionManager() ifconnmgr.ConnManager { | ||
return bsnet.host.ConnManager() | ||
} | ||
|
||
func (bsnet *impl) Stats() NetworkStats { | ||
return NetworkStats{ | ||
MessagesRecvd: atomic.LoadUint64(&bsnet.stats.MessagesRecvd), | ||
MessagesSent: atomic.LoadUint64(&bsnet.stats.MessagesSent), | ||
} | ||
} | ||
|
||
type netNotifiee impl | ||
|
||
func (nn *netNotifiee) impl() *impl { | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably have a test that looks more like reading entire files out of a large directory structure. I wonder if we should try building an actual dag.