From 5b212478e982617763a542ae4dafcf87b7b22a4f Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 29 Aug 2018 21:46:41 -0700 Subject: [PATCH 01/12] add a test to reproduce duplicate blocks issue --- dup_blocks_test.go | 49 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 dup_blocks_test.go diff --git a/dup_blocks_test.go b/dup_blocks_test.go new file mode 100644 index 00000000..c16e9508 --- /dev/null +++ b/dup_blocks_test.go @@ -0,0 +1,49 @@ +package bitswap + +import ( + "context" + "fmt" + "testing" + "time" + + tn "github.com/ipfs/go-bitswap/testnet" + + blocksutil "github.com/ipfs/go-ipfs-blocksutil" + delay "github.com/ipfs/go-ipfs-delay" + mockrouting "github.com/ipfs/go-ipfs-routing/mock" +) + +func TestDuplicateBlocksIssues(t *testing.T) { + net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(10*time.Millisecond)) + sg := NewTestSessionGenerator(net) + defer sg.Close() + + bg := blocksutil.NewBlockGenerator() + + instances := sg.Instances(3) + blocks := bg.Blocks(100) + + bill := instances[0] + jeff := instances[1] + steve := instances[2] + + if err := bill.Blockstore().PutMany(blocks); err != nil { + t.Fatal(err) + } + if err := jeff.Blockstore().PutMany(blocks); err != nil { + t.Fatal(err) + } + + ses := steve.Exchange.NewSession(context.Background()) + for i, blk := range blocks { + fmt.Println("fetch block: ", i) + ses.GetBlock(context.Background(), blk.Cid()) + } + + st, err := steve.Exchange.Stat() + if err != nil { + t.Fatal(err) + } + + fmt.Println("duplicate blocks: ", st.DupBlksReceived) +} From 17eecb4964770bfaff38d781d455dba010ce6a76 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 29 Aug 2018 21:58:24 -0700 Subject: [PATCH 02/12] more test layouts --- dup_blocks_test.go | 91 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 88 insertions(+), 3 deletions(-) diff --git a/dup_blocks_test.go b/dup_blocks_test.go index c16e9508..2bc40360 100644 --- a/dup_blocks_test.go +++ b/dup_blocks_test.go @@ -2,7 +2,6 @@ package bitswap import ( "context" - "fmt" "testing" "time" @@ -13,6 +12,7 @@ import ( mockrouting "github.com/ipfs/go-ipfs-routing/mock" ) +// in this test, each of the data-having peers has all the data func TestDuplicateBlocksIssues(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(10*time.Millisecond)) sg := NewTestSessionGenerator(net) @@ -35,8 +35,91 @@ func TestDuplicateBlocksIssues(t *testing.T) { } ses := steve.Exchange.NewSession(context.Background()) + for _, blk := range blocks { + ses.GetBlock(context.Background(), blk.Cid()) + } + + st, err := steve.Exchange.Stat() + if err != nil { + t.Fatal(err) + } + + if st.DupBlksReceived != 0 { + t.Fatalf("got %d duplicate blocks!", st.DupBlksReceived) + } +} + +// in this test, each of the 'other peers' has 3/4 of the data. and a 1/2 +// overlap in blocks with the other data-having peer +// interestingly, because of the way sessions currently work, this results in zero wasted data +func TestDupBlocksOverlap1(t *testing.T) { + net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(10*time.Millisecond)) + sg := NewTestSessionGenerator(net) + defer sg.Close() + + bg := blocksutil.NewBlockGenerator() + + instances := sg.Instances(3) + blocks := bg.Blocks(100) + + bill := instances[0] + jeff := instances[1] + steve := instances[2] + + if err := bill.Blockstore().PutMany(blocks[:75]); err != nil { + t.Fatal(err) + } + if err := jeff.Blockstore().PutMany(blocks[25:]); err != nil { + t.Fatal(err) + } + + ses := steve.Exchange.NewSession(context.Background()) + for _, blk := range blocks { + ses.GetBlock(context.Background(), blk.Cid()) + } + + st, err := steve.Exchange.Stat() + if err != nil { + t.Fatal(err) + } + + if st.DupBlksReceived != 0 { + t.Fatal("got duplicate blocks!") + } +} + +// in this test, each of the 'other peers' some of the data, with an overlap +// different from the previous test, both peers have the 'first' block, which triggers sessions +// into behaving poorly +func TestDupBlocksOverlap2(t *testing.T) { + net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(10*time.Millisecond)) + sg := NewTestSessionGenerator(net) + defer sg.Close() + + bg := blocksutil.NewBlockGenerator() + + instances := sg.Instances(3) + blocks := bg.Blocks(100) + + bill := instances[0] + jeff := instances[1] + steve := instances[2] + + bill.Blockstore().Put(blocks[0]) + jeff.Blockstore().Put(blocks[0]) for i, blk := range blocks { - fmt.Println("fetch block: ", i) + if i%3 == 0 { + bill.Blockstore().Put(blk) + jeff.Blockstore().Put(blk) + } else if i%2 == 1 { + bill.Blockstore().Put(blk) + } else { + jeff.Blockstore().Put(blk) + } + } + + ses := steve.Exchange.NewSession(context.Background()) + for _, blk := range blocks { ses.GetBlock(context.Background(), blk.Cid()) } @@ -45,5 +128,7 @@ func TestDuplicateBlocksIssues(t *testing.T) { t.Fatal(err) } - fmt.Println("duplicate blocks: ", st.DupBlksReceived) + if st.DupBlksReceived != 0 { + t.Fatalf("got %d duplicate blocks!", st.DupBlksReceived) + } } From 1c24ea17e064f33c1e013260befefbcbd74e564f Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 29 Aug 2018 23:48:29 -0700 Subject: [PATCH 03/12] more test layouts, again --- dup_blocks_test.go | 163 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 163 insertions(+) diff --git a/dup_blocks_test.go b/dup_blocks_test.go index 2bc40360..5d2c287b 100644 --- a/dup_blocks_test.go +++ b/dup_blocks_test.go @@ -2,11 +2,13 @@ package bitswap import ( "context" + "sync" "testing" "time" tn "github.com/ipfs/go-bitswap/testnet" + cid "github.com/ipfs/go-cid" blocksutil "github.com/ipfs/go-ipfs-blocksutil" delay "github.com/ipfs/go-ipfs-delay" mockrouting "github.com/ipfs/go-ipfs-routing/mock" @@ -132,3 +134,164 @@ func TestDupBlocksOverlap2(t *testing.T) { t.Fatalf("got %d duplicate blocks!", st.DupBlksReceived) } } + +// in this test, each of the 'other peers' some of the data, with an overlap +// The data is fetched in bulk, with a single 'getBlocks' call +func TestDupBlocksOverlapBatch1(t *testing.T) { + net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(10*time.Millisecond)) + sg := NewTestSessionGenerator(net) + defer sg.Close() + + bg := blocksutil.NewBlockGenerator() + + instances := sg.Instances(3) + blocks := bg.Blocks(100) + + bill := instances[0] + jeff := instances[1] + steve := instances[2] + + bill.Blockstore().Put(blocks[0]) + jeff.Blockstore().Put(blocks[0]) + for i, blk := range blocks { + if i%3 == 0 { + bill.Blockstore().Put(blk) + jeff.Blockstore().Put(blk) + } else if i%2 == 1 { + bill.Blockstore().Put(blk) + } else { + jeff.Blockstore().Put(blk) + } + } + + ses := steve.Exchange.NewSession(context.Background()) + + var ks []*cid.Cid + for _, blk := range blocks { + ks = append(ks, blk.Cid()) + } + out, err := ses.GetBlocks(context.Background(), ks) + if err != nil { + t.Fatal(err) + } + for range out { + } + + st, err := steve.Exchange.Stat() + if err != nil { + t.Fatal(err) + } + + if st.DupBlksReceived != 0 { + t.Fatalf("got %d duplicate blocks!", st.DupBlksReceived) + } +} + +// in this test, each of the 'other peers' some of the data, with an overlap +// The data is fetched in bulk, with N concurrent calls to 'getBlock' +func TestDupBlocksOverlapBatch2(t *testing.T) { + net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(10*time.Millisecond)) + sg := NewTestSessionGenerator(net) + defer sg.Close() + + bg := blocksutil.NewBlockGenerator() + + instances := sg.Instances(3) + blocks := bg.Blocks(100) + + bill := instances[0] + jeff := instances[1] + steve := instances[2] + + bill.Blockstore().Put(blocks[0]) + jeff.Blockstore().Put(blocks[0]) + for i, blk := range blocks { + if i%3 == 0 { + bill.Blockstore().Put(blk) + jeff.Blockstore().Put(blk) + } else if i%2 == 1 { + bill.Blockstore().Put(blk) + } else { + jeff.Blockstore().Put(blk) + } + } + + ses := steve.Exchange.NewSession(context.Background()) + + var wg sync.WaitGroup + for _, blk := range blocks { + wg.Add(1) + go func(c *cid.Cid) { + defer wg.Done() + _, err := ses.GetBlock(context.Background(), c) + if err != nil { + t.Fatal(err) + } + }(blk.Cid()) + } + wg.Wait() + + st, err := steve.Exchange.Stat() + if err != nil { + t.Fatal(err) + } + + if st.DupBlksReceived != 0 { + t.Fatalf("got %d duplicate blocks!", st.DupBlksReceived) + } +} + +// in this test, each of the 'other peers' some of the data, with an overlap +// The data is fetched in bulk, fetching ten blocks at a time with 'getBlocks' +func TestDupBlocksOverlapBatch3(t *testing.T) { + net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(10*time.Millisecond)) + sg := NewTestSessionGenerator(net) + defer sg.Close() + + bg := blocksutil.NewBlockGenerator() + + instances := sg.Instances(3) + blocks := bg.Blocks(100) + + bill := instances[0] + jeff := instances[1] + steve := instances[2] + + bill.Blockstore().Put(blocks[0]) + jeff.Blockstore().Put(blocks[0]) + for i, blk := range blocks { + if i%3 == 0 { + bill.Blockstore().Put(blk) + jeff.Blockstore().Put(blk) + } else if i%2 == 1 { + bill.Blockstore().Put(blk) + } else { + jeff.Blockstore().Put(blk) + } + } + + ses := steve.Exchange.NewSession(context.Background()) + + var ks []*cid.Cid + for _, blk := range blocks { + ks = append(ks, blk.Cid()) + } + + for i := 0; i < len(ks); i += 10 { + out, err := ses.GetBlocks(context.Background(), ks[i:i+10]) + if err != nil { + t.Fatal(err) + } + for range out { + } + } + + st, err := steve.Exchange.Stat() + if err != nil { + t.Fatal(err) + } + + if st.DupBlksReceived != 0 { + t.Fatalf("got %d duplicate blocks!", st.DupBlksReceived) + } +} From 6d17e7f34adea9bbe3a564450db545882f8475d8 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 30 Aug 2018 00:13:53 -0700 Subject: [PATCH 04/12] refactor tests, make it easier to run tables --- dup_blocks_test.go | 310 ++++++++++++++++----------------------------- 1 file changed, 108 insertions(+), 202 deletions(-) diff --git a/dup_blocks_test.go b/dup_blocks_test.go index 5d2c287b..2c2d75aa 100644 --- a/dup_blocks_test.go +++ b/dup_blocks_test.go @@ -8,53 +8,67 @@ import ( tn "github.com/ipfs/go-bitswap/testnet" + "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" blocksutil "github.com/ipfs/go-ipfs-blocksutil" delay "github.com/ipfs/go-ipfs-delay" mockrouting "github.com/ipfs/go-ipfs-routing/mock" ) -// in this test, each of the data-having peers has all the data -func TestDuplicateBlocksIssues(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(10*time.Millisecond)) - sg := NewTestSessionGenerator(net) - defer sg.Close() - - bg := blocksutil.NewBlockGenerator() +type fetchFunc func(t *testing.T, bs *Bitswap, ks []*cid.Cid) - instances := sg.Instances(3) - blocks := bg.Blocks(100) - - bill := instances[0] - jeff := instances[1] - steve := instances[2] - - if err := bill.Blockstore().PutMany(blocks); err != nil { - t.Fatal(err) - } - if err := jeff.Blockstore().PutMany(blocks); err != nil { - t.Fatal(err) +func oneAtATime(t *testing.T, bs *Bitswap, ks []*cid.Cid) { + ses := bs.NewSession(context.Background()) + for _, c := range ks { + _, err := ses.GetBlock(context.Background(), c) + if err != nil { + t.Fatal(err) + } } +} - ses := steve.Exchange.NewSession(context.Background()) - for _, blk := range blocks { - ses.GetBlock(context.Background(), blk.Cid()) - } +type distFunc func(t *testing.T, provs []Instance, blocks []blocks.Block) - st, err := steve.Exchange.Stat() - if err != nil { - t.Fatal(err) +func allToAll(t *testing.T, provs []Instance, blocks []blocks.Block) { + for _, p := range provs { + if err := p.Blockstore().PutMany(blocks); err != nil { + t.Fatal(err) + } } +} - if st.DupBlksReceived != 0 { - t.Fatalf("got %d duplicate blocks!", st.DupBlksReceived) - } +// in this test, each of the data-having peers has all the data +func TestDups(t *testing.T) { + t.Run("AllToAll-OneAtATime", func(t *testing.T) { + subtestDistributeAndFetch(t, allToAll, oneAtATime) + }) + t.Run("AllToAll-BigBatch", func(t *testing.T) { + subtestDistributeAndFetch(t, allToAll, batchFetchAll) + }) + + t.Run("Overlap1-OneAtATime", func(t *testing.T) { + subtestDistributeAndFetch(t, overlap1, oneAtATime) + }) + + t.Run("Overlap2-BatchBy10", func(t *testing.T) { + subtestDistributeAndFetch(t, overlap2, batchFetchBy10) + }) + + t.Run("Overlap3-OneAtATime", func(t *testing.T) { + subtestDistributeAndFetch(t, overlap3, oneAtATime) + }) + t.Run("Overlap3-BatchBy10", func(t *testing.T) { + subtestDistributeAndFetch(t, overlap3, batchFetchBy10) + }) + t.Run("Overlap3-AllConcurrent", func(t *testing.T) { + subtestDistributeAndFetch(t, overlap3, fetchAllConcurrent) + }) + t.Run("Overlap3-BigBatch", func(t *testing.T) { + subtestDistributeAndFetch(t, overlap3, batchFetchAll) + }) } -// in this test, each of the 'other peers' has 3/4 of the data. and a 1/2 -// overlap in blocks with the other data-having peer -// interestingly, because of the way sessions currently work, this results in zero wasted data -func TestDupBlocksOverlap1(t *testing.T) { +func subtestDistributeAndFetch(t *testing.T, df distFunc, ff fetchFunc) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(10*time.Millisecond)) sg := NewTestSessionGenerator(net) defer sg.Close() @@ -64,96 +78,56 @@ func TestDupBlocksOverlap1(t *testing.T) { instances := sg.Instances(3) blocks := bg.Blocks(100) - bill := instances[0] - jeff := instances[1] - steve := instances[2] + fetcher := instances[2] - if err := bill.Blockstore().PutMany(blocks[:75]); err != nil { - t.Fatal(err) - } - if err := jeff.Blockstore().PutMany(blocks[25:]); err != nil { - t.Fatal(err) - } + df(t, instances[:2], blocks) - ses := steve.Exchange.NewSession(context.Background()) + var ks []*cid.Cid for _, blk := range blocks { - ses.GetBlock(context.Background(), blk.Cid()) + ks = append(ks, blk.Cid()) } - st, err := steve.Exchange.Stat() + ff(t, fetcher.Exchange, ks) + + st, err := fetcher.Exchange.Stat() if err != nil { t.Fatal(err) } if st.DupBlksReceived != 0 { - t.Fatal("got duplicate blocks!") + t.Fatalf("got %d duplicate blocks!", st.DupBlksReceived) } } -// in this test, each of the 'other peers' some of the data, with an overlap -// different from the previous test, both peers have the 'first' block, which triggers sessions -// into behaving poorly -func TestDupBlocksOverlap2(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(10*time.Millisecond)) - sg := NewTestSessionGenerator(net) - defer sg.Close() - - bg := blocksutil.NewBlockGenerator() - - instances := sg.Instances(3) - blocks := bg.Blocks(100) - - bill := instances[0] - jeff := instances[1] - steve := instances[2] - - bill.Blockstore().Put(blocks[0]) - jeff.Blockstore().Put(blocks[0]) - for i, blk := range blocks { - if i%3 == 0 { - bill.Blockstore().Put(blk) - jeff.Blockstore().Put(blk) - } else if i%2 == 1 { - bill.Blockstore().Put(blk) - } else { - jeff.Blockstore().Put(blk) - } - } - - ses := steve.Exchange.NewSession(context.Background()) - for _, blk := range blocks { - ses.GetBlock(context.Background(), blk.Cid()) +// overlap1 gives the first 75 blocks to the first peer, and the last 75 blocks +// to the second peer. This means both peers have the middle 50 blocks +func overlap1(t *testing.T, provs []Instance, blks []blocks.Block) { + if len(provs) != 2 { + t.Fatal("overlap1 only works with 2 provs") } + bill := provs[0] + jeff := provs[1] - st, err := steve.Exchange.Stat() - if err != nil { + if err := bill.Blockstore().PutMany(blks[:75]); err != nil { t.Fatal(err) } - - if st.DupBlksReceived != 0 { - t.Fatalf("got %d duplicate blocks!", st.DupBlksReceived) + if err := jeff.Blockstore().PutMany(blks[25:]); err != nil { + t.Fatal(err) } } -// in this test, each of the 'other peers' some of the data, with an overlap -// The data is fetched in bulk, with a single 'getBlocks' call -func TestDupBlocksOverlapBatch1(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(10*time.Millisecond)) - sg := NewTestSessionGenerator(net) - defer sg.Close() - - bg := blocksutil.NewBlockGenerator() - - instances := sg.Instances(3) - blocks := bg.Blocks(100) - - bill := instances[0] - jeff := instances[1] - steve := instances[2] +// overlap2 gives every even numbered block to the first peer, odd numbered +// blocks to the second. it also gives every third block to both peers +func overlap2(t *testing.T, provs []Instance, blks []blocks.Block) { + if len(provs) != 2 { + t.Fatal("overlap2 only works with 2 provs") + } + bill := provs[0] + jeff := provs[1] - bill.Blockstore().Put(blocks[0]) - jeff.Blockstore().Put(blocks[0]) - for i, blk := range blocks { + bill.Blockstore().Put(blks[0]) + jeff.Blockstore().Put(blks[0]) + for i, blk := range blks { if i%3 == 0 { bill.Blockstore().Put(blk) jeff.Blockstore().Put(blk) @@ -163,49 +137,19 @@ func TestDupBlocksOverlapBatch1(t *testing.T) { jeff.Blockstore().Put(blk) } } - - ses := steve.Exchange.NewSession(context.Background()) - - var ks []*cid.Cid - for _, blk := range blocks { - ks = append(ks, blk.Cid()) - } - out, err := ses.GetBlocks(context.Background(), ks) - if err != nil { - t.Fatal(err) - } - for range out { - } - - st, err := steve.Exchange.Stat() - if err != nil { - t.Fatal(err) - } - - if st.DupBlksReceived != 0 { - t.Fatalf("got %d duplicate blocks!", st.DupBlksReceived) - } } -// in this test, each of the 'other peers' some of the data, with an overlap -// The data is fetched in bulk, with N concurrent calls to 'getBlock' -func TestDupBlocksOverlapBatch2(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(10*time.Millisecond)) - sg := NewTestSessionGenerator(net) - defer sg.Close() - - bg := blocksutil.NewBlockGenerator() - - instances := sg.Instances(3) - blocks := bg.Blocks(100) +func overlap3(t *testing.T, provs []Instance, blks []blocks.Block) { + if len(provs) != 2 { + t.Fatal("overlap3 only works with 2 provs") + } - bill := instances[0] - jeff := instances[1] - steve := instances[2] + bill := provs[0] + jeff := provs[1] - bill.Blockstore().Put(blocks[0]) - jeff.Blockstore().Put(blocks[0]) - for i, blk := range blocks { + bill.Blockstore().Put(blks[0]) + jeff.Blockstore().Put(blks[0]) + for i, blk := range blks { if i%3 == 0 { bill.Blockstore().Put(blk) jeff.Blockstore().Put(blk) @@ -215,11 +159,27 @@ func TestDupBlocksOverlapBatch2(t *testing.T) { jeff.Blockstore().Put(blk) } } +} + +// fetch data in batches, 10 at a time +func batchFetchBy10(t *testing.T, bs *Bitswap, ks []*cid.Cid) { + ses := bs.NewSession(context.Background()) + for i := 0; i < len(ks); i += 10 { + out, err := ses.GetBlocks(context.Background(), ks[i:i+10]) + if err != nil { + t.Fatal(err) + } + for range out { + } + } +} - ses := steve.Exchange.NewSession(context.Background()) +// fetch each block at the same time concurrently +func fetchAllConcurrent(t *testing.T, bs *Bitswap, ks []*cid.Cid) { + ses := bs.NewSession(context.Background()) var wg sync.WaitGroup - for _, blk := range blocks { + for _, c := range ks { wg.Add(1) go func(c *cid.Cid) { defer wg.Done() @@ -227,71 +187,17 @@ func TestDupBlocksOverlapBatch2(t *testing.T) { if err != nil { t.Fatal(err) } - }(blk.Cid()) + }(c) } wg.Wait() - - st, err := steve.Exchange.Stat() - if err != nil { - t.Fatal(err) - } - - if st.DupBlksReceived != 0 { - t.Fatalf("got %d duplicate blocks!", st.DupBlksReceived) - } } -// in this test, each of the 'other peers' some of the data, with an overlap -// The data is fetched in bulk, fetching ten blocks at a time with 'getBlocks' -func TestDupBlocksOverlapBatch3(t *testing.T) { - net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(10*time.Millisecond)) - sg := NewTestSessionGenerator(net) - defer sg.Close() - - bg := blocksutil.NewBlockGenerator() - - instances := sg.Instances(3) - blocks := bg.Blocks(100) - - bill := instances[0] - jeff := instances[1] - steve := instances[2] - - bill.Blockstore().Put(blocks[0]) - jeff.Blockstore().Put(blocks[0]) - for i, blk := range blocks { - if i%3 == 0 { - bill.Blockstore().Put(blk) - jeff.Blockstore().Put(blk) - } else if i%2 == 1 { - bill.Blockstore().Put(blk) - } else { - jeff.Blockstore().Put(blk) - } - } - - ses := steve.Exchange.NewSession(context.Background()) - - var ks []*cid.Cid - for _, blk := range blocks { - ks = append(ks, blk.Cid()) - } - - for i := 0; i < len(ks); i += 10 { - out, err := ses.GetBlocks(context.Background(), ks[i:i+10]) - if err != nil { - t.Fatal(err) - } - for range out { - } - } - - st, err := steve.Exchange.Stat() +func batchFetchAll(t *testing.T, bs *Bitswap, ks []*cid.Cid) { + ses := bs.NewSession(context.Background()) + out, err := ses.GetBlocks(context.Background(), ks) if err != nil { t.Fatal(err) } - - if st.DupBlksReceived != 0 { - t.Fatalf("got %d duplicate blocks!", st.DupBlksReceived) + for range out { } } From 54bce9382b64eaed51eaf5a8f40bbbe8162fd1f8 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 30 Aug 2018 18:30:50 -0700 Subject: [PATCH 05/12] More test scenarios! --- dup_blocks_test.go | 92 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 77 insertions(+), 15 deletions(-) diff --git a/dup_blocks_test.go b/dup_blocks_test.go index 2c2d75aa..61d796ec 100644 --- a/dup_blocks_test.go +++ b/dup_blocks_test.go @@ -2,6 +2,7 @@ package bitswap import ( "context" + "math/rand" "sync" "testing" "time" @@ -37,50 +38,79 @@ func allToAll(t *testing.T, provs []Instance, blocks []blocks.Block) { } } -// in this test, each of the data-having peers has all the data -func TestDups(t *testing.T) { +func TestDups2Nodes(t *testing.T) { t.Run("AllToAll-OneAtATime", func(t *testing.T) { - subtestDistributeAndFetch(t, allToAll, oneAtATime) + subtestDistributeAndFetch(t, 3, 100, allToAll, oneAtATime) }) t.Run("AllToAll-BigBatch", func(t *testing.T) { - subtestDistributeAndFetch(t, allToAll, batchFetchAll) + subtestDistributeAndFetch(t, 3, 100, allToAll, batchFetchAll) }) t.Run("Overlap1-OneAtATime", func(t *testing.T) { - subtestDistributeAndFetch(t, overlap1, oneAtATime) + subtestDistributeAndFetch(t, 3, 100, overlap1, oneAtATime) }) t.Run("Overlap2-BatchBy10", func(t *testing.T) { - subtestDistributeAndFetch(t, overlap2, batchFetchBy10) + subtestDistributeAndFetch(t, 3, 100, overlap2, batchFetchBy10) }) t.Run("Overlap3-OneAtATime", func(t *testing.T) { - subtestDistributeAndFetch(t, overlap3, oneAtATime) + subtestDistributeAndFetch(t, 3, 100, overlap3, oneAtATime) }) t.Run("Overlap3-BatchBy10", func(t *testing.T) { - subtestDistributeAndFetch(t, overlap3, batchFetchBy10) + subtestDistributeAndFetch(t, 3, 100, overlap3, batchFetchBy10) }) t.Run("Overlap3-AllConcurrent", func(t *testing.T) { - subtestDistributeAndFetch(t, overlap3, fetchAllConcurrent) + subtestDistributeAndFetch(t, 3, 100, overlap3, fetchAllConcurrent) }) t.Run("Overlap3-BigBatch", func(t *testing.T) { - subtestDistributeAndFetch(t, overlap3, batchFetchAll) + subtestDistributeAndFetch(t, 3, 100, overlap3, batchFetchAll) + }) + t.Run("Overlap3-UnixfsFetch", func(t *testing.T) { + subtestDistributeAndFetch(t, 3, 100, overlap3, unixfsFileFetch) + }) +} + +func TestDupsManyNodes(t *testing.T) { + t.Run("10Nodes-AllToAll-OneAtATime", func(t *testing.T) { + subtestDistributeAndFetch(t, 10, 100, allToAll, oneAtATime) + }) + t.Run("10Nodes-AllToAll-BatchFetchBy10", func(t *testing.T) { + subtestDistributeAndFetch(t, 10, 100, allToAll, batchFetchBy10) + }) + t.Run("10Nodes-AllToAll-BigBatch", func(t *testing.T) { + subtestDistributeAndFetch(t, 10, 100, allToAll, batchFetchAll) + }) + t.Run("10Nodes-AllToAll-AllConcurrent", func(t *testing.T) { + subtestDistributeAndFetch(t, 10, 100, allToAll, fetchAllConcurrent) + }) + t.Run("10Nodes-AllToAll-UnixfsFetch", func(t *testing.T) { + subtestDistributeAndFetch(t, 10, 100, allToAll, unixfsFileFetch) + }) + t.Run("10Nodes-OnePeerPerBlock-OneAtATime", func(t *testing.T) { + subtestDistributeAndFetch(t, 10, 100, onePeerPerBlock, oneAtATime) + }) + t.Run("10Nodes-OnePeerPerBlock-BigBatch", func(t *testing.T) { + subtestDistributeAndFetch(t, 10, 100, onePeerPerBlock, batchFetchAll) + }) + t.Run("10Nodes-OnePeerPerBlock-UnixfsFetch", func(t *testing.T) { + subtestDistributeAndFetch(t, 10, 100, onePeerPerBlock, unixfsFileFetch) }) } -func subtestDistributeAndFetch(t *testing.T, df distFunc, ff fetchFunc) { +func subtestDistributeAndFetch(t *testing.T, numnodes, numblks int, df distFunc, ff fetchFunc) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(10*time.Millisecond)) sg := NewTestSessionGenerator(net) defer sg.Close() bg := blocksutil.NewBlockGenerator() - instances := sg.Instances(3) - blocks := bg.Blocks(100) + instances := sg.Instances(numnodes) + blocks := bg.Blocks(numblks) - fetcher := instances[2] + fetcher := instances[numnodes-1] - df(t, instances[:2], blocks) + df(t, instances[:numnodes-1], blocks) var ks []*cid.Cid for _, blk := range blocks { @@ -161,6 +191,15 @@ func overlap3(t *testing.T, provs []Instance, blks []blocks.Block) { } } +// onePeerPerBlock picks a random peer to hold each block +// with this layout, we shouldnt actually ever see any duplicate blocks +// but we're mostly just testing performance of the sync algorithm +func onePeerPerBlock(t *testing.T, provs []Instance, blks []blocks.Block) { + for _, blk := range blks { + provs[rand.Intn(len(provs))].Blockstore().Put(blk) + } +} + // fetch data in batches, 10 at a time func batchFetchBy10(t *testing.T, bs *Bitswap, ks []*cid.Cid) { ses := bs.NewSession(context.Background()) @@ -201,3 +240,26 @@ func batchFetchAll(t *testing.T, bs *Bitswap, ks []*cid.Cid) { for range out { } } + +// simulates the fetch pattern of trying to sync a unixfs file graph as fast as possible +func unixfsFileFetch(t *testing.T, bs *Bitswap, ks []*cid.Cid) { + ses := bs.NewSession(context.Background()) + _, err := ses.GetBlock(context.Background(), ks[0]) + if err != nil { + t.Fatal(err) + } + + out, err := ses.GetBlocks(context.Background(), ks[1:11]) + if err != nil { + t.Fatal(err) + } + for range out { + } + + out, err = ses.GetBlocks(context.Background(), ks[11:]) + if err != nil { + t.Fatal(err) + } + for range out { + } +} From 60e79921bcc0c9a83559d2b93920e3b938c670d3 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sun, 2 Sep 2018 22:39:41 -0700 Subject: [PATCH 06/12] reorg in dup tests --- dup_blocks_test.go | 37 +++++++++++++++++++------------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/dup_blocks_test.go b/dup_blocks_test.go index 61d796ec..84c4fcc5 100644 --- a/dup_blocks_test.go +++ b/dup_blocks_test.go @@ -18,26 +18,8 @@ import ( type fetchFunc func(t *testing.T, bs *Bitswap, ks []*cid.Cid) -func oneAtATime(t *testing.T, bs *Bitswap, ks []*cid.Cid) { - ses := bs.NewSession(context.Background()) - for _, c := range ks { - _, err := ses.GetBlock(context.Background(), c) - if err != nil { - t.Fatal(err) - } - } -} - type distFunc func(t *testing.T, provs []Instance, blocks []blocks.Block) -func allToAll(t *testing.T, provs []Instance, blocks []blocks.Block) { - for _, p := range provs { - if err := p.Blockstore().PutMany(blocks); err != nil { - t.Fatal(err) - } - } -} - func TestDups2Nodes(t *testing.T) { t.Run("AllToAll-OneAtATime", func(t *testing.T) { subtestDistributeAndFetch(t, 3, 100, allToAll, oneAtATime) @@ -129,6 +111,14 @@ func subtestDistributeAndFetch(t *testing.T, numnodes, numblks int, df distFunc, } } +func allToAll(t *testing.T, provs []Instance, blocks []blocks.Block) { + for _, p := range provs { + if err := p.Blockstore().PutMany(blocks); err != nil { + t.Fatal(err) + } + } +} + // overlap1 gives the first 75 blocks to the first peer, and the last 75 blocks // to the second peer. This means both peers have the middle 50 blocks func overlap1(t *testing.T, provs []Instance, blks []blocks.Block) { @@ -200,6 +190,17 @@ func onePeerPerBlock(t *testing.T, provs []Instance, blks []blocks.Block) { } } +func oneAtATime(t *testing.T, bs *Bitswap, ks []*cid.Cid) { + ses := bs.NewSession(context.Background()) + for _, c := range ks { + _, err := ses.GetBlock(context.Background(), c) + if err != nil { + t.Fatal(err) + } + } + t.Logf("Session fetch latency: %s", ses.latTotal/time.Duration(ses.fetchcnt)) +} + // fetch data in batches, 10 at a time func batchFetchBy10(t *testing.T, bs *Bitswap, ks []*cid.Cid) { ses := bs.NewSession(context.Background()) From 832b3a30874f3a012967be6786c84c7f207dd436 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 29 Aug 2018 22:09:09 -0700 Subject: [PATCH 07/12] improvement attempt number 1 --- session.go | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/session.go b/session.go index 063a40d9..d8311d1b 100644 --- a/session.go +++ b/session.go @@ -3,6 +3,7 @@ package bitswap import ( "context" "fmt" + "math/rand" "time" notifications "github.com/ipfs/go-bitswap/notifications" @@ -280,7 +281,22 @@ func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) { for _, c := range ks { s.liveWants[c] = now } - s.bs.wm.WantBlocks(ctx, ks, s.activePeersArr, s.id) + if len(s.activePeers) == 0 { + s.bs.wm.WantBlocks(ctx, ks, s.activePeersArr, s.id) + } else { + spl := divvy(ks, len(s.activePeersArr)) + for ki, pi := range rand.Perm(len(s.activePeersArr)) { + s.bs.wm.WantBlocks(ctx, spl[ki], []peer.ID{s.activePeersArr[pi]}, s.id) + } + } +} + +func divvy(ks []*cid.Cid, n int) [][]*cid.Cid { + out := make([][]*cid.Cid, n) + for i, c := range ks { + out[i%n] = append(out[i%n], c) + } + return out } func (s *Session) cancel(keys []cid.Cid) { From 1ddefaeeca24bc4bdb0cb84784fbd31d1a518247 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 31 Aug 2018 18:34:40 -0700 Subject: [PATCH 08/12] add statistics for network messages sent/recvd --- dup_blocks_test.go | 2 ++ network/interface.go | 10 ++++++++++ network/ipfs_impl.go | 13 +++++++++++++ stat.go | 20 +++++++++++--------- testnet/virtual.go | 36 +++++++++++++++++++----------------- testutils.go | 2 +- 6 files changed, 56 insertions(+), 27 deletions(-) diff --git a/dup_blocks_test.go b/dup_blocks_test.go index 84c4fcc5..3476d1f2 100644 --- a/dup_blocks_test.go +++ b/dup_blocks_test.go @@ -106,6 +106,8 @@ func subtestDistributeAndFetch(t *testing.T, numnodes, numblks int, df distFunc, t.Fatal(err) } + nst := fetcher.Exchange.network.Stats() + t.Logf("send/recv: %d / %d", nst.MessagesSent, nst.MessagesRecvd) if st.DupBlksReceived != 0 { t.Fatalf("got %d duplicate blocks!", st.DupBlksReceived) } diff --git a/network/interface.go b/network/interface.go index fd5622c1..6c325b1c 100644 --- a/network/interface.go +++ b/network/interface.go @@ -38,6 +38,8 @@ type BitSwapNetwork interface { ConnectionManager() ifconnmgr.ConnManager + Stats() NetworkStats + Routing } @@ -68,3 +70,11 @@ type Routing interface { // Provide provides the key to the network Provide(context.Context, cid.Cid) error } + +// NetworkStats is a container for statistics about the bitswap network +// the numbers inside are specific to bitswap, and not any other protocols +// using the same underlying network. +type NetworkStats struct { + MessagesSent uint64 + MessagesRecvd uint64 +} diff --git a/network/ipfs_impl.go b/network/ipfs_impl.go index 78dee0dc..f6c04e35 100644 --- a/network/ipfs_impl.go +++ b/network/ipfs_impl.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "io" + "sync/atomic" "time" bsmsg "github.com/ipfs/go-bitswap/message" @@ -48,6 +49,8 @@ type impl struct { // inbound messages from the network are forwarded to the receiver receiver Receiver + + stats NetworkStats } type streamMessageSender struct { @@ -130,6 +133,8 @@ func (bsnet *impl) SendMessage( s.Reset() return err } + atomic.AddUint64(&bsnet.stats.MessagesSent, 1) + // TODO(https://github.com/libp2p/go-libp2p-net/issues/28): Avoid this goroutine. go inet.AwaitEOF(s) return s.Close() @@ -210,6 +215,7 @@ func (bsnet *impl) handleNewStream(s inet.Stream) { ctx := context.Background() log.Debugf("bitswap net handleNewStream from %s", s.Conn().RemotePeer()) bsnet.receiver.ReceiveMessage(ctx, p, received) + atomic.AddUint64(&bsnet.stats.MessagesRecvd, 1) } } @@ -217,6 +223,13 @@ func (bsnet *impl) ConnectionManager() ifconnmgr.ConnManager { return bsnet.host.ConnManager() } +func (bsnet *impl) Stats() NetworkStats { + return NetworkStats{ + MessagesRecvd: atomic.LoadUint64(&bsnet.stats.MessagesRecvd), + MessagesSent: atomic.LoadUint64(&bsnet.stats.MessagesSent), + } +} + type netNotifiee impl func (nn *netNotifiee) impl() *impl { diff --git a/stat.go b/stat.go index d01d1717..99b2def1 100644 --- a/stat.go +++ b/stat.go @@ -7,15 +7,16 @@ import ( ) type Stat struct { - ProvideBufLen int - Wantlist []cid.Cid - Peers []string - BlocksReceived uint64 - DataReceived uint64 - BlocksSent uint64 - DataSent uint64 - DupBlksReceived uint64 - DupDataReceived uint64 + ProvideBufLen int + Wantlist []cid.Cid + Peers []string + BlocksReceived uint64 + DataReceived uint64 + BlocksSent uint64 + DataSent uint64 + DupBlksReceived uint64 + DupDataReceived uint64 + MessagesReceived uint64 } func (bs *Bitswap) Stat() (*Stat, error) { @@ -30,6 +31,7 @@ func (bs *Bitswap) Stat() (*Stat, error) { st.BlocksSent = c.blocksSent st.DataSent = c.dataSent st.DataReceived = c.dataRecvd + st.MessagesReceived = c.messagesRecvd bs.counterLk.Unlock() peers := bs.engine.Peers() diff --git a/testnet/virtual.go b/testnet/virtual.go index 004dd66c..7a6257e7 100644 --- a/testnet/virtual.go +++ b/testnet/virtual.go @@ -4,6 +4,7 @@ import ( "context" "errors" "sync" + "sync/atomic" "time" bsmsg "github.com/ipfs/go-bitswap/message" @@ -48,7 +49,7 @@ type message struct { // order* with their delays respected as much as sending them in order allows // for type receiverQueue struct { - receiver bsnet.Receiver + receiver *networkClient queue []*message active bool lk sync.Mutex @@ -104,30 +105,30 @@ func (n *network) SendMessage( return nil } -func (n *network) deliver( - r bsnet.Receiver, from peer.ID, message bsmsg.BitSwapMessage) error { - if message == nil || from == "" { - return errors.New("invalid input") - } - - n.delay.Wait() - - r.ReceiveMessage(context.TODO(), from, message) - return nil -} - type networkClient struct { local peer.ID bsnet.Receiver network *network routing routing.IpfsRouting + stats bsnet.NetworkStats } func (nc *networkClient) SendMessage( ctx context.Context, to peer.ID, message bsmsg.BitSwapMessage) error { - return nc.network.SendMessage(ctx, nc.local, to, message) + if err := nc.network.SendMessage(ctx, nc.local, to, message); err != nil { + return err + } + atomic.AddUint64(&nc.stats.MessagesSent, 1) + return nil +} + +func (nc *networkClient) Stats() bsnet.NetworkStats { + return bsnet.NetworkStats{ + MessagesRecvd: atomic.LoadUint64(&nc.stats.MessagesRecvd), + MessagesSent: atomic.LoadUint64(&nc.stats.MessagesSent), + } } // FindProvidersAsync returns a channel of providers for the given key @@ -157,14 +158,14 @@ func (nc *networkClient) ConnectionManager() ifconnmgr.ConnManager { } type messagePasser struct { - net *network + net *networkClient target peer.ID local peer.ID ctx context.Context } func (mp *messagePasser) SendMsg(ctx context.Context, m bsmsg.BitSwapMessage) error { - return mp.net.SendMessage(ctx, mp.local, mp.target, m) + return mp.net.SendMessage(ctx, mp.target, m) } func (mp *messagePasser) Close() error { @@ -177,7 +178,7 @@ func (mp *messagePasser) Reset() error { func (n *networkClient) NewMessageSender(ctx context.Context, p peer.ID) (bsnet.MessageSender, error) { return &messagePasser{ - net: n.network, + net: n, target: p, local: n.local, ctx: ctx, @@ -241,6 +242,7 @@ func (rq *receiverQueue) process() { rq.lk.Unlock() time.Sleep(time.Until(m.shouldSend)) + atomic.AddUint64(&rq.receiver.stats.MessagesRecvd, 1) rq.receiver.ReceiveMessage(context.TODO(), m.from, m.msg) } } diff --git a/testutils.go b/testutils.go index aa4ffa9f..f9be6943 100644 --- a/testutils.go +++ b/testutils.go @@ -81,7 +81,7 @@ func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration { return i.blockstoreDelay.Set(t) } -// session creates a test bitswap session. +// session creates a test bitswap instance. // // NB: It's easy make mistakes by providing the same peer ID to two different // sessions. To safeguard, use the SessionGenerator to generate sessions. It's From ea5e626f8246adf066841fedc6ef84ad00031169 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sun, 2 Sep 2018 22:39:58 -0700 Subject: [PATCH 09/12] WIP: working on dupl factor scaling --- dup_blocks_test.go | 16 ++++----- session.go | 82 +++++++++++++++++++++++++++++++++------------- 2 files changed, 67 insertions(+), 31 deletions(-) diff --git a/dup_blocks_test.go b/dup_blocks_test.go index 3476d1f2..c159a52a 100644 --- a/dup_blocks_test.go +++ b/dup_blocks_test.go @@ -16,7 +16,7 @@ import ( mockrouting "github.com/ipfs/go-ipfs-routing/mock" ) -type fetchFunc func(t *testing.T, bs *Bitswap, ks []*cid.Cid) +type fetchFunc func(t *testing.T, bs *Bitswap, ks []cid.Cid) type distFunc func(t *testing.T, provs []Instance, blocks []blocks.Block) @@ -94,7 +94,7 @@ func subtestDistributeAndFetch(t *testing.T, numnodes, numblks int, df distFunc, df(t, instances[:numnodes-1], blocks) - var ks []*cid.Cid + var ks []cid.Cid for _, blk := range blocks { ks = append(ks, blk.Cid()) } @@ -192,7 +192,7 @@ func onePeerPerBlock(t *testing.T, provs []Instance, blks []blocks.Block) { } } -func oneAtATime(t *testing.T, bs *Bitswap, ks []*cid.Cid) { +func oneAtATime(t *testing.T, bs *Bitswap, ks []cid.Cid) { ses := bs.NewSession(context.Background()) for _, c := range ks { _, err := ses.GetBlock(context.Background(), c) @@ -204,7 +204,7 @@ func oneAtATime(t *testing.T, bs *Bitswap, ks []*cid.Cid) { } // fetch data in batches, 10 at a time -func batchFetchBy10(t *testing.T, bs *Bitswap, ks []*cid.Cid) { +func batchFetchBy10(t *testing.T, bs *Bitswap, ks []cid.Cid) { ses := bs.NewSession(context.Background()) for i := 0; i < len(ks); i += 10 { out, err := ses.GetBlocks(context.Background(), ks[i:i+10]) @@ -217,13 +217,13 @@ func batchFetchBy10(t *testing.T, bs *Bitswap, ks []*cid.Cid) { } // fetch each block at the same time concurrently -func fetchAllConcurrent(t *testing.T, bs *Bitswap, ks []*cid.Cid) { +func fetchAllConcurrent(t *testing.T, bs *Bitswap, ks []cid.Cid) { ses := bs.NewSession(context.Background()) var wg sync.WaitGroup for _, c := range ks { wg.Add(1) - go func(c *cid.Cid) { + go func(c cid.Cid) { defer wg.Done() _, err := ses.GetBlock(context.Background(), c) if err != nil { @@ -234,7 +234,7 @@ func fetchAllConcurrent(t *testing.T, bs *Bitswap, ks []*cid.Cid) { wg.Wait() } -func batchFetchAll(t *testing.T, bs *Bitswap, ks []*cid.Cid) { +func batchFetchAll(t *testing.T, bs *Bitswap, ks []cid.Cid) { ses := bs.NewSession(context.Background()) out, err := ses.GetBlocks(context.Background(), ks) if err != nil { @@ -245,7 +245,7 @@ func batchFetchAll(t *testing.T, bs *Bitswap, ks []*cid.Cid) { } // simulates the fetch pattern of trying to sync a unixfs file graph as fast as possible -func unixfsFileFetch(t *testing.T, bs *Bitswap, ks []*cid.Cid) { +func unixfsFileFetch(t *testing.T, bs *Bitswap, ks []cid.Cid) { ses := bs.NewSession(context.Background()) _, err := ses.GetBlock(context.Background(), ks[0]) if err != nil { diff --git a/session.go b/session.go index d8311d1b..273b6c3e 100644 --- a/session.go +++ b/session.go @@ -16,7 +16,8 @@ import ( peer "github.com/libp2p/go-libp2p-peer" ) -const activeWantsLimit = 16 +const broadcastLiveWantsLimit = 4 +const targetedLiveWantsLimit = 32 // Session holds state for an individual bitswap transfer operation. // This allows bitswap to make smarter decisions about who to send wantlist @@ -33,14 +34,31 @@ type Session struct { cancelKeys chan []cid.Cid interestReqs chan interestReq - interest *lru.Cache + // interest is a cache of cids that are of interest to this session + // this may include blocks we've already received, but still might want + // to know about the peer that sent it + interest *lru.Cache + + // liveWants keeps track of all the current requests we have out, and when + // they were last requested liveWants map[cid.Cid]time.Time + // liveWantsLimit keeps track of how many live wants we should have out at + // a time this number should start out low, and grow as we gain more + // certainty on the sources of our data + liveWantsLimit int tick *time.Timer baseTickDelay time.Duration latTotal time.Duration + // fetchcnt is the number of blocks received fetchcnt int + // broadcasts is the number of times we have broadcasted a set of wants + broadcasts int + // dupl is the number of peers to send each want to. In most situations + // this should be 1, but if the session falls back to broadcasting for + // blocks too often, this value will be increased to compensate + dupl int notif notifications.PubSub @@ -54,19 +72,21 @@ type Session struct { // given context func (bs *Bitswap) NewSession(ctx context.Context) *Session { s := &Session{ - activePeers: make(map[peer.ID]struct{}), - liveWants: make(map[cid.Cid]time.Time), - newReqs: make(chan []cid.Cid), - cancelKeys: make(chan []cid.Cid), - tofetch: newCidQueue(), - interestReqs: make(chan interestReq), - ctx: ctx, - bs: bs, - incoming: make(chan blkRecv), - notif: notifications.New(), - uuid: loggables.Uuid("GetBlockRequest"), - baseTickDelay: time.Millisecond * 500, - id: bs.getNextSessionID(), + activePeers: make(map[peer.ID]struct{}), + liveWants: make(map[cid.Cid]time.Time), + liveWantsLimit: broadcastLiveWantsLimit, + newReqs: make(chan []cid.Cid), + cancelKeys: make(chan []cid.Cid), + tofetch: newCidQueue(), + interestReqs: make(chan interestReq), + ctx: ctx, + bs: bs, + incoming: make(chan blkRecv), + notif: notifications.New(), + uuid: loggables.Uuid("GetBlockRequest"), + baseTickDelay: time.Millisecond * 500, + id: bs.getNextSessionID(), + dupl: 1, } s.tag = fmt.Sprint("bs-ses-", s.id) @@ -153,6 +173,10 @@ func (s *Session) interestedIn(c cid.Cid) bool { const provSearchDelay = time.Second * 10 func (s *Session) addActivePeer(p peer.ID) { + if s.liveWantsLimit == broadcastLiveWantsLimit { + s.liveWantsLimit = targetedLiveWantsLimit + } + if _, ok := s.activePeers[p]; !ok { s.activePeers[p] = struct{}{} s.activePeersArr = append(s.activePeersArr, p) @@ -190,8 +214,8 @@ func (s *Session) run(ctx context.Context) { for _, k := range keys { s.interest.Add(k, nil) } - if len(s.liveWants) < activeWantsLimit { - toadd := activeWantsLimit - len(s.liveWants) + if len(s.liveWants) < s.liveWantsLimit { + toadd := s.liveWantsLimit - len(s.liveWants) if toadd > len(keys) { toadd = len(keys) } @@ -216,6 +240,8 @@ func (s *Session) run(ctx context.Context) { } // Broadcast these keys to everyone we're connected to + log.Error("broadcast!") + s.broadcasts++ s.bs.wm.WantBlocks(ctx, live, nil, s.id) if len(live) > 0 { @@ -282,19 +308,29 @@ func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) { s.liveWants[c] = now } if len(s.activePeers) == 0 { - s.bs.wm.WantBlocks(ctx, ks, s.activePeersArr, s.id) + s.broadcasts++ + s.bs.wm.WantBlocks(ctx, ks, nil, s.id) } else { - spl := divvy(ks, len(s.activePeersArr)) + if s.fetchcnt > 5 { + brcRat := float64(s.fetchcnt) / float64(s.broadcasts) + if brcRat < 2 { + s.dupl++ + } + } + spl := divvy(ks, len(s.activePeersArr), s.dupl) for ki, pi := range rand.Perm(len(s.activePeersArr)) { s.bs.wm.WantBlocks(ctx, spl[ki], []peer.ID{s.activePeersArr[pi]}, s.id) } } } -func divvy(ks []*cid.Cid, n int) [][]*cid.Cid { - out := make([][]*cid.Cid, n) - for i, c := range ks { - out[i%n] = append(out[i%n], c) +func divvy(ks []cid.Cid, n, dupl int) [][]cid.Cid { + out := make([][]cid.Cid, n) + for l := 0; l < dupl; l++ { + for i, c := range ks { + pos := (i + (len(ks) * l)) % n + out[pos] = append(out[pos], c) + } } return out } From 6d0f460027bca0bc2c807493f3b9b2700b1cedb7 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 17 Sep 2018 11:50:24 -0700 Subject: [PATCH 10/12] move incrememnting of dupl factor, write out benchmark results --- dup_blocks_test.go | 29 +++++++++++++++++++++++++---- session.go | 18 +++++++++--------- 2 files changed, 34 insertions(+), 13 deletions(-) diff --git a/dup_blocks_test.go b/dup_blocks_test.go index c159a52a..3a3f79de 100644 --- a/dup_blocks_test.go +++ b/dup_blocks_test.go @@ -2,6 +2,8 @@ package bitswap import ( "context" + "encoding/json" + "io/ioutil" "math/rand" "sync" "testing" @@ -20,6 +22,16 @@ type fetchFunc func(t *testing.T, bs *Bitswap, ks []cid.Cid) type distFunc func(t *testing.T, provs []Instance, blocks []blocks.Block) +type runStats struct { + Dups uint64 + MsgSent uint64 + MsgRecd uint64 + Time time.Duration + Name string +} + +var benchmarkLog []runStats + func TestDups2Nodes(t *testing.T) { t.Run("AllToAll-OneAtATime", func(t *testing.T) { subtestDistributeAndFetch(t, 3, 100, allToAll, oneAtATime) @@ -51,9 +63,6 @@ func TestDups2Nodes(t *testing.T) { t.Run("Overlap3-UnixfsFetch", func(t *testing.T) { subtestDistributeAndFetch(t, 3, 100, overlap3, unixfsFileFetch) }) -} - -func TestDupsManyNodes(t *testing.T) { t.Run("10Nodes-AllToAll-OneAtATime", func(t *testing.T) { subtestDistributeAndFetch(t, 10, 100, allToAll, oneAtATime) }) @@ -78,9 +87,13 @@ func TestDupsManyNodes(t *testing.T) { t.Run("10Nodes-OnePeerPerBlock-UnixfsFetch", func(t *testing.T) { subtestDistributeAndFetch(t, 10, 100, onePeerPerBlock, unixfsFileFetch) }) + + out, _ := json.MarshalIndent(benchmarkLog, "", " ") + ioutil.WriteFile("benchmark.json", out, 0666) } func subtestDistributeAndFetch(t *testing.T, numnodes, numblks int, df distFunc, ff fetchFunc) { + start := time.Now() net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(10*time.Millisecond)) sg := NewTestSessionGenerator(net) defer sg.Close() @@ -107,9 +120,17 @@ func subtestDistributeAndFetch(t *testing.T, numnodes, numblks int, df distFunc, } nst := fetcher.Exchange.network.Stats() + stats := runStats{ + Time: time.Now().Sub(start), + MsgRecd: nst.MessagesRecvd, + MsgSent: nst.MessagesSent, + Dups: st.DupBlksReceived, + Name: t.Name(), + } + benchmarkLog = append(benchmarkLog, stats) t.Logf("send/recv: %d / %d", nst.MessagesSent, nst.MessagesRecvd) if st.DupBlksReceived != 0 { - t.Fatalf("got %d duplicate blocks!", st.DupBlksReceived) + //t.Fatalf("got %d duplicate blocks!", st.DupBlksReceived) } } diff --git a/session.go b/session.go index 273b6c3e..7fba4b20 100644 --- a/session.go +++ b/session.go @@ -240,10 +240,16 @@ func (s *Session) run(ctx context.Context) { } // Broadcast these keys to everyone we're connected to - log.Error("broadcast!") s.broadcasts++ s.bs.wm.WantBlocks(ctx, live, nil, s.id) + if s.fetchcnt > 5 { + brcRat := float64(s.fetchcnt) / float64(s.broadcasts) + if brcRat < 2 { + s.dupl++ + } + } + if len(live) > 0 { go func(k cid.Cid) { // TODO: have a task queue setup for this to: @@ -307,16 +313,10 @@ func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) { for _, c := range ks { s.liveWants[c] = now } - if len(s.activePeers) == 0 { + if len(s.activePeers) == 0 || s.dupl >= len(s.activePeers) { s.broadcasts++ - s.bs.wm.WantBlocks(ctx, ks, nil, s.id) + s.bs.wm.WantBlocks(ctx, ks, s.activePeersArr, s.id) } else { - if s.fetchcnt > 5 { - brcRat := float64(s.fetchcnt) / float64(s.broadcasts) - if brcRat < 2 { - s.dupl++ - } - } spl := divvy(ks, len(s.activePeersArr), s.dupl) for ki, pi := range rand.Perm(len(s.activePeersArr)) { s.bs.wm.WantBlocks(ctx, spl[ki], []peer.ID{s.activePeersArr[pi]}, s.id) From 4447483da06b005b9265c7955bc271cff99c7109 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 19 Sep 2018 14:39:37 -0700 Subject: [PATCH 11/12] fix session exchange interface implementation --- bitswap.go | 2 ++ dup_blocks_test.go | 7 +++++-- package.json | 4 ++-- session.go | 3 ++- session_test.go | 2 +- 5 files changed, 12 insertions(+), 6 deletions(-) diff --git a/bitswap.go b/bitswap.go index 542a6d83..942679d4 100644 --- a/bitswap.go +++ b/bitswap.go @@ -30,6 +30,8 @@ import ( var log = logging.Logger("bitswap") +var _ exchange.SessionExchange = (*Bitswap)(nil) + const ( // maxProvidersPerRequest specifies the maximum number of providers desired // from the network. This value is specified because the network streams diff --git a/dup_blocks_test.go b/dup_blocks_test.go index 3a3f79de..326efc4a 100644 --- a/dup_blocks_test.go +++ b/dup_blocks_test.go @@ -87,6 +87,9 @@ func TestDups2Nodes(t *testing.T) { t.Run("10Nodes-OnePeerPerBlock-UnixfsFetch", func(t *testing.T) { subtestDistributeAndFetch(t, 10, 100, onePeerPerBlock, unixfsFileFetch) }) + t.Run("200Nodes-AllToAll-BigBatch", func(t *testing.T) { + subtestDistributeAndFetch(t, 200, 20, allToAll, batchFetchAll) + }) out, _ := json.MarshalIndent(benchmarkLog, "", " ") ioutil.WriteFile("benchmark.json", out, 0666) @@ -130,7 +133,7 @@ func subtestDistributeAndFetch(t *testing.T, numnodes, numblks int, df distFunc, benchmarkLog = append(benchmarkLog, stats) t.Logf("send/recv: %d / %d", nst.MessagesSent, nst.MessagesRecvd) if st.DupBlksReceived != 0 { - //t.Fatalf("got %d duplicate blocks!", st.DupBlksReceived) + t.Fatalf("got %d duplicate blocks!", st.DupBlksReceived) } } @@ -214,7 +217,7 @@ func onePeerPerBlock(t *testing.T, provs []Instance, blks []blocks.Block) { } func oneAtATime(t *testing.T, bs *Bitswap, ks []cid.Cid) { - ses := bs.NewSession(context.Background()) + ses := bs.NewSession(context.Background()).(*Session) for _, c := range ks { _, err := ses.GetBlock(context.Background(), c) if err != nil { diff --git a/package.json b/package.json index c1cf38b1..9058d591 100644 --- a/package.json +++ b/package.json @@ -87,9 +87,9 @@ }, { "author": "hsanjuan", - "hash": "QmR1nncPsZR14A4hWr39mq8Lm7BGgS68bHVT9nop8NpWEM", + "hash": "QmUdh9184Bozfinyn5YDhgPRg33E3KR3btfZXcVoFgTxD4", "name": "go-ipfs-exchange-interface", - "version": "0.1.0" + "version": "0.1.1" }, { "author": "whyrusleeping", diff --git a/session.go b/session.go index 7fba4b20..33d7331d 100644 --- a/session.go +++ b/session.go @@ -11,6 +11,7 @@ import ( lru "github.com/hashicorp/golang-lru" blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" + exchange "github.com/ipfs/go-ipfs-exchange-interface" logging "github.com/ipfs/go-log" loggables "github.com/libp2p/go-libp2p-loggables" peer "github.com/libp2p/go-libp2p-peer" @@ -70,7 +71,7 @@ type Session struct { // NewSession creates a new bitswap session whose lifetime is bounded by the // given context -func (bs *Bitswap) NewSession(ctx context.Context) *Session { +func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher { s := &Session{ activePeers: make(map[peer.ID]struct{}), liveWants: make(map[cid.Cid]time.Time), diff --git a/session_test.go b/session_test.go index 8769d891..c5a00a90 100644 --- a/session_test.go +++ b/session_test.go @@ -132,7 +132,7 @@ func TestSessionSplitFetch(t *testing.T) { cids = append(cids, blk.Cid()) } - ses := inst[10].Exchange.NewSession(ctx) + ses := inst[10].Exchange.NewSession(ctx).(*Session) ses.baseTickDelay = time.Millisecond * 10 for i := 0; i < 10; i++ { From 5d4e4bba5f8da1aa7f5a580030bd1aefad283102 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 23 Oct 2018 07:58:28 -0700 Subject: [PATCH 12/12] avoid using constants as both sentinel values and values. --- session.go | 52 ++++++++++++++++++++++++++++------------------------ 1 file changed, 28 insertions(+), 24 deletions(-) diff --git a/session.go b/session.go index 33d7331d..f836e31d 100644 --- a/session.go +++ b/session.go @@ -43,10 +43,6 @@ type Session struct { // liveWants keeps track of all the current requests we have out, and when // they were last requested liveWants map[cid.Cid]time.Time - // liveWantsLimit keeps track of how many live wants we should have out at - // a time this number should start out low, and grow as we gain more - // certainty on the sources of our data - liveWantsLimit int tick *time.Timer baseTickDelay time.Duration @@ -73,21 +69,20 @@ type Session struct { // given context func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher { s := &Session{ - activePeers: make(map[peer.ID]struct{}), - liveWants: make(map[cid.Cid]time.Time), - liveWantsLimit: broadcastLiveWantsLimit, - newReqs: make(chan []cid.Cid), - cancelKeys: make(chan []cid.Cid), - tofetch: newCidQueue(), - interestReqs: make(chan interestReq), - ctx: ctx, - bs: bs, - incoming: make(chan blkRecv), - notif: notifications.New(), - uuid: loggables.Uuid("GetBlockRequest"), - baseTickDelay: time.Millisecond * 500, - id: bs.getNextSessionID(), - dupl: 1, + activePeers: make(map[peer.ID]struct{}), + liveWants: make(map[cid.Cid]time.Time), + newReqs: make(chan []cid.Cid), + cancelKeys: make(chan []cid.Cid), + tofetch: newCidQueue(), + interestReqs: make(chan interestReq), + ctx: ctx, + bs: bs, + incoming: make(chan blkRecv), + notif: notifications.New(), + uuid: loggables.Uuid("GetBlockRequest"), + baseTickDelay: time.Millisecond * 500, + id: bs.getNextSessionID(), + dupl: 1, } s.tag = fmt.Sprint("bs-ses-", s.id) @@ -173,11 +168,21 @@ func (s *Session) interestedIn(c cid.Cid) bool { const provSearchDelay = time.Second * 10 -func (s *Session) addActivePeer(p peer.ID) { - if s.liveWantsLimit == broadcastLiveWantsLimit { - s.liveWantsLimit = targetedLiveWantsLimit +func (s *Session) wantBudget() int { + live := len(s.liveWants) + var budget int + if len(s.activePeers) > 0 { + budget = targetedLiveWantsLimit - live + } else { + budget = broadcastLiveWantsLimit - live + } + if budget < 0 { + budget = 0 } + return budget +} +func (s *Session) addActivePeer(p peer.ID) { if _, ok := s.activePeers[p]; !ok { s.activePeers[p] = struct{}{} s.activePeersArr = append(s.activePeersArr, p) @@ -215,8 +220,7 @@ func (s *Session) run(ctx context.Context) { for _, k := range keys { s.interest.Add(k, nil) } - if len(s.liveWants) < s.liveWantsLimit { - toadd := s.liveWantsLimit - len(s.liveWants) + if toadd := s.wantBudget(); toadd > 0 { if toadd > len(keys) { toadd = len(keys) }