diff --git a/blocks/blockstore/blockstore.go b/blocks/blockstore/blockstore.go index d4849cb43bd..e203ffc502d 100644 --- a/blocks/blockstore/blockstore.go +++ b/blocks/blockstore/blockstore.go @@ -6,14 +6,16 @@ import ( "errors" ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" - mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash" + blocks "github.com/jbenet/go-ipfs/blocks" u "github.com/jbenet/go-ipfs/util" ) var ValueTypeMismatch = errors.New("The retrieved value is not a Block") +var ErrNotFound = errors.New("blockstore: block not found") + // Blockstore wraps a ThreadSafeDatastore type Blockstore interface { DeleteBlock(u.Key) error @@ -34,6 +36,9 @@ type blockstore struct { func (bs *blockstore) Get(k u.Key) (*blocks.Block, error) { maybeData, err := bs.datastore.Get(k.DsKey()) + if err == ds.ErrNotFound { + return nil, ErrNotFound + } if err != nil { return nil, err } diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index f44eaa0f5a4..9014cab461e 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -8,8 +8,6 @@ import ( "fmt" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" - blocks "github.com/jbenet/go-ipfs/blocks" "github.com/jbenet/go-ipfs/blocks/blockstore" exchange "github.com/jbenet/go-ipfs/exchange" @@ -67,7 +65,7 @@ func (s *BlockService) GetBlock(ctx context.Context, k u.Key) (*blocks.Block, er return block, nil // TODO be careful checking ErrNotFound. If the underlying // implementation changes, this will break. - } else if err == ds.ErrNotFound && s.Exchange != nil { + } else if err == blockstore.ErrNotFound && s.Exchange != nil { log.Debug("Blockservice: Searching bitswap.") blk, err := s.Exchange.GetBlock(ctx, k) if err != nil { diff --git a/epictest/addcat_test.go b/epictest/addcat_test.go index a69660cf4fd..09ba58f7948 100644 --- a/epictest/addcat_test.go +++ b/epictest/addcat_test.go @@ -26,10 +26,6 @@ import ( const kSeed = 1 func Test100MBInstantaneous(t *testing.T) { - t.Log("a sanity check") - - t.Parallel() - conf := Config{ NetworkLatency: 0, RoutingLatency: 0, @@ -41,10 +37,7 @@ func Test100MBInstantaneous(t *testing.T) { func TestDegenerateSlowBlockstore(t *testing.T) { SkipUnlessEpic(t) - t.Parallel() - conf := Config{BlockstoreLatency: 50 * time.Millisecond} - if err := AddCatPowers(conf, 128); err != nil { t.Fatal(err) } @@ -52,10 +45,7 @@ func TestDegenerateSlowBlockstore(t *testing.T) { func TestDegenerateSlowNetwork(t *testing.T) { SkipUnlessEpic(t) - t.Parallel() - conf := Config{NetworkLatency: 400 * time.Millisecond} - if err := AddCatPowers(conf, 128); err != nil { t.Fatal(err) } @@ -63,10 +53,7 @@ func TestDegenerateSlowNetwork(t *testing.T) { func TestDegenerateSlowRouting(t *testing.T) { SkipUnlessEpic(t) - t.Parallel() - conf := Config{RoutingLatency: 400 * time.Millisecond} - if err := AddCatPowers(conf, 128); err != nil { t.Fatal(err) } @@ -74,10 +61,7 @@ func TestDegenerateSlowRouting(t *testing.T) { func Test100MBMacbookCoastToCoast(t *testing.T) { SkipUnlessEpic(t) - t.Parallel() - conf := Config{}.Network_NYtoSF().Blockstore_SlowSSD2014().Routing_Slow() - if err := AddCatBytes(RandomBytes(100*1024*1024), conf); err != nil { t.Fatal(err) } diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 64f29352819..912ed1210c6 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -3,36 +3,46 @@ package bitswap import ( + "math" "sync" "time" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - blocks "github.com/jbenet/go-ipfs/blocks" blockstore "github.com/jbenet/go-ipfs/blocks/blockstore" exchange "github.com/jbenet/go-ipfs/exchange" + decision "github.com/jbenet/go-ipfs/exchange/bitswap/decision" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network" notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications" - strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy" + wantlist "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist" peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" eventlog "github.com/jbenet/go-ipfs/util/eventlog" + pset "github.com/jbenet/go-ipfs/util/peerset" ) var log = eventlog.Logger("bitswap") -// Number of providers to request for sending a wantlist to -// TODO: if a 'non-nice' strategy is implemented, consider increasing this value -const maxProvidersPerRequest = 3 +const ( + // Number of providers to request for sending a wantlist to + // TODO: if a 'non-nice' strategy is implemented, consider increasing this value + maxProvidersPerRequest = 3 + providerRequestTimeout = time.Second * 10 + hasBlockTimeout = time.Second * 15 + sizeBatchRequestChan = 32 + // kMaxPriority is the max priority as defined by the bitswap protocol + kMaxPriority = math.MaxInt32 +) -const providerRequestTimeout = time.Second * 10 -const hasBlockTimeout = time.Second * 15 +var ( + rebroadcastDelay = time.Second * 10 +) -// New initializes a BitSwap instance that communicates over the -// provided BitSwapNetwork. This function registers the returned instance as -// the network delegate. -// Runs until context is cancelled +// New initializes a BitSwap instance that communicates over the provided +// BitSwapNetwork. This function registers the returned instance as the network +// delegate. +// Runs until context is cancelled. func New(parent context.Context, p peer.Peer, network bsnet.BitSwapNetwork, routing bsnet.Routing, bstore blockstore.Blockstore, nice bool) exchange.Interface { @@ -41,6 +51,7 @@ func New(parent context.Context, p peer.Peer, network bsnet.BitSwapNetwork, rout notif := notifications.New() go func() { <-ctx.Done() + cancelFunc() notif.Shutdown() }() @@ -48,14 +59,15 @@ func New(parent context.Context, p peer.Peer, network bsnet.BitSwapNetwork, rout blockstore: bstore, cancelFunc: cancelFunc, notifications: notif, - strategy: strategy.New(nice), + engine: decision.NewEngine(ctx, bstore), routing: routing, sender: network, - wantlist: u.NewKeySet(), - batchRequests: make(chan []u.Key, 32), + wantlist: wantlist.NewThreadSafe(), + batchRequests: make(chan []u.Key, sizeBatchRequestChan), } network.SetDelegate(bs) - go bs.loop(ctx) + go bs.clientWorker(ctx) + go bs.taskWorker(ctx) return bs } @@ -80,12 +92,9 @@ type bitswap struct { // have more than a single block in the set batchRequests chan []u.Key - // strategy listens to network traffic and makes decisions about how to - // interact with partners. - // TODO(brian): save the strategy's state to the datastore - strategy strategy.Strategy + engine *decision.Engine - wantlist u.KeySet + wantlist *wantlist.ThreadSafe // cancelFunc signals cancellation to the bitswap event loop cancelFunc func() @@ -153,12 +162,7 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { } bs.wantlist.Remove(blk.Key()) bs.notifications.Publish(blk) - child, _ := context.WithTimeout(ctx, hasBlockTimeout) - if err := bs.sendToPeersThatWant(child, blk); err != nil { - return err - } - child, _ = context.WithTimeout(ctx, hasBlockTimeout) - return bs.routing.Provide(child, blk.Key()) + return bs.routing.Provide(ctx, blk.Key()) } func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) error { @@ -166,13 +170,15 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e panic("Cant send wantlist to nil peerchan") } message := bsmsg.New() - for _, wanted := range bs.wantlist.Keys() { - message.AddWanted(wanted) + for _, wanted := range bs.wantlist.Entries() { + message.AddEntry(wanted.Key, wanted.Priority) } + wg := sync.WaitGroup{} for peerToQuery := range peers { - log.Debug("sending query to: %s", peerToQuery) log.Event(ctx, "PeerToQuery", peerToQuery) + wg.Add(1) go func(p peer.Peer) { + defer wg.Done() log.Event(ctx, "DialPeer", p) err := bs.sender.DialPeer(ctx, p) @@ -189,57 +195,76 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e // FIXME ensure accounting is handled correctly when // communication fails. May require slightly different API to // get better guarantees. May need shared sequence numbers. - bs.strategy.MessageSent(p, message) + bs.engine.MessageSent(p, message) }(peerToQuery) } + wg.Wait() return nil } -func (bs *bitswap) sendWantlistToProviders(ctx context.Context, ks []u.Key) { +func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wantlist.ThreadSafe) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + message := bsmsg.New() + message.SetFull(true) + for _, e := range bs.wantlist.Entries() { + message.AddEntry(e.Key, e.Priority) + } + + ps := pset.New() + + // Get providers for all entries in wantlist (could take a while) wg := sync.WaitGroup{} - for _, k := range ks { + for _, e := range wantlist.Entries() { wg.Add(1) go func(k u.Key) { + defer wg.Done() child, _ := context.WithTimeout(ctx, providerRequestTimeout) providers := bs.routing.FindProvidersAsync(child, k, maxProvidersPerRequest) - err := bs.sendWantListTo(ctx, providers) - if err != nil { - log.Errorf("error sending wantlist: %s", err) + for prov := range providers { + if ps.TryAdd(prov) { //Do once per peer + bs.send(ctx, prov, message) + } } - wg.Done() - }(k) + }(e.Key) } wg.Wait() } +func (bs *bitswap) taskWorker(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case envelope := <-bs.engine.Outbox(): + bs.send(ctx, envelope.Peer, envelope.Message) + } + } +} + // TODO ensure only one active request per key -func (bs *bitswap) loop(parent context.Context) { +func (bs *bitswap) clientWorker(parent context.Context) { ctx, cancel := context.WithCancel(parent) - broadcastSignal := time.NewTicker(bs.strategy.GetRebroadcastDelay()) - defer func() { - cancel() // signal to derived async functions - broadcastSignal.Stop() - }() + broadcastSignal := time.After(rebroadcastDelay) + defer cancel() for { select { - case <-broadcastSignal.C: + case <-broadcastSignal: // Resend unfulfilled wantlist keys - bs.sendWantlistToProviders(ctx, bs.wantlist.Keys()) + bs.sendWantlistToProviders(ctx, bs.wantlist) + broadcastSignal = time.After(rebroadcastDelay) case ks := <-bs.batchRequests: - // TODO: implement batching on len(ks) > X for some X - // i.e. if given 20 keys, fetch first five, then next - // five, and so on, so we are more likely to be able to - // effectively stream the data if len(ks) == 0 { log.Warning("Received batch request for zero blocks") continue } - for _, k := range ks { - bs.wantlist.Add(k) + for i, k := range ks { + bs.wantlist.Add(k, kMaxPriority-i) } // NB: send want list to providers for the first peer in this list. // the assumption is made that the providers of the first key in @@ -277,45 +302,45 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm return nil, nil } - // Record message bytes in ledger - // TODO: this is bad, and could be easily abused. - // Should only track *useful* messages in ledger // This call records changes to wantlists, blocks received, // and number of bytes transfered. - bs.strategy.MessageReceived(p, incoming) + bs.engine.MessageReceived(p, incoming) + // TODO: this is bad, and could be easily abused. + // Should only track *useful* messages in ledger for _, block := range incoming.Blocks() { - if err := bs.HasBlock(ctx, block); err != nil { + hasBlockCtx, _ := context.WithTimeout(ctx, hasBlockTimeout) + if err := bs.HasBlock(hasBlockCtx, block); err != nil { log.Error(err) } } - - for _, key := range incoming.Wantlist() { - if bs.strategy.ShouldSendBlockToPeer(key, p) { - if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil { - continue - } else { - // Create a separate message to send this block in - blkmsg := bsmsg.New() - - // TODO: only send this the first time - // no sense in sending our wantlist to the - // same peer multiple times - for _, k := range bs.wantlist.Keys() { - blkmsg.AddWanted(k) - } - - blkmsg.AddBlock(block) - bs.send(ctx, p, blkmsg) - bs.strategy.BlockSentToPeer(block.Key(), p) - } - } + var keys []u.Key + for _, block := range incoming.Blocks() { + keys = append(keys, block.Key()) } + bs.cancelBlocks(ctx, keys) // TODO: consider changing this function to not return anything return nil, nil } +func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) { + if len(bkeys) < 1 { + return + } + message := bsmsg.New() + message.SetFull(false) + for _, k := range bkeys { + message.Cancel(k) + } + for _, p := range bs.engine.Peers() { + err := bs.send(ctx, p, message) + if err != nil { + log.Errorf("Error sending message: %s", err) + } + } +} + func (bs *bitswap) ReceiveError(err error) { log.Errorf("Bitswap ReceiveError: %s", err) // TODO log the network error @@ -328,25 +353,7 @@ func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage if err := bs.sender.SendMessage(ctx, p, m); err != nil { return err } - return bs.strategy.MessageSent(p, m) -} - -func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block *blocks.Block) error { - for _, p := range bs.strategy.Peers() { - if bs.strategy.BlockIsWantedByPeer(block.Key(), p) { - if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) { - message := bsmsg.New() - message.AddBlock(block) - for _, wanted := range bs.wantlist.Keys() { - message.AddWanted(wanted) - } - if err := bs.send(ctx, p, message); err != nil { - return err - } - } - } - } - return nil + return bs.engine.MessageSent(p, m) } func (bs *bitswap) Close() error { diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index d58ff596a1e..2c04b05082e 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -11,6 +11,7 @@ import ( blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil" tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet" mockrouting "github.com/jbenet/go-ipfs/routing/mock" + u "github.com/jbenet/go-ipfs/util" delay "github.com/jbenet/go-ipfs/util/delay" testutil "github.com/jbenet/go-ipfs/util/testutil" ) @@ -25,6 +26,7 @@ func TestClose(t *testing.T) { vnet := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) rout := mockrouting.NewServer() sesgen := NewSessionGenerator(vnet, rout) + defer sesgen.Close() bgen := blocksutil.NewBlockGenerator() block := bgen.Next() @@ -39,6 +41,7 @@ func TestGetBlockTimeout(t *testing.T) { net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) rs := mockrouting.NewServer() g := NewSessionGenerator(net, rs) + defer g.Close() self := g.Next() @@ -56,11 +59,13 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) rs := mockrouting.NewServer() g := NewSessionGenerator(net, rs) + defer g.Close() block := blocks.NewBlock([]byte("block")) rs.Client(testutil.NewPeerWithIDString("testing")).Provide(context.Background(), block.Key()) // but not on network solo := g.Next() + defer solo.Exchange.Close() ctx, _ := context.WithTimeout(context.Background(), time.Nanosecond) _, err := solo.Exchange.GetBlock(ctx, block.Key()) @@ -78,8 +83,10 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { rs := mockrouting.NewServer() block := blocks.NewBlock([]byte("block")) g := NewSessionGenerator(net, rs) + defer g.Close() hasBlock := g.Next() + defer hasBlock.Exchange.Close() if err := hasBlock.Blockstore().Put(block); err != nil { t.Fatal(err) @@ -89,6 +96,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { } wantsBlock := g.Next() + defer wantsBlock.Exchange.Close() ctx, _ := context.WithTimeout(context.Background(), time.Second) received, err := wantsBlock.Exchange.GetBlock(ctx, block.Key()) @@ -107,7 +115,7 @@ func TestLargeSwarm(t *testing.T) { t.SkipNow() } t.Parallel() - numInstances := 5 + numInstances := 500 numBlocks := 2 PerformDistributionTest(t, numInstances, numBlocks) } @@ -129,6 +137,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) rs := mockrouting.NewServer() sg := NewSessionGenerator(net, rs) + defer sg.Close() bg := blocksutil.NewBlockGenerator() t.Log("Test a few nodes trying to get one file with a lot of blocks") @@ -138,24 +147,29 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { t.Log("Give the blocks to the first instance") + var blkeys []u.Key first := instances[0] for _, b := range blocks { first.Blockstore().Put(b) + blkeys = append(blkeys, b.Key()) first.Exchange.HasBlock(context.Background(), b) rs.Client(first.Peer).Provide(context.Background(), b.Key()) } t.Log("Distribute!") - var wg sync.WaitGroup - + wg := sync.WaitGroup{} for _, inst := range instances { - for _, b := range blocks { - wg.Add(1) - // NB: executing getOrFail concurrently puts tremendous pressure on - // the goroutine scheduler - getOrFail(inst, b, t, &wg) - } + wg.Add(1) + go func(inst Instance) { + defer wg.Done() + outch, err := inst.Exchange.GetBlocks(context.TODO(), blkeys) + if err != nil { + t.Fatal(err) + } + for _ = range outch { + } + }(inst) } wg.Wait() @@ -189,60 +203,75 @@ func TestSendToWantingPeer(t *testing.T) { net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) rs := mockrouting.NewServer() sg := NewSessionGenerator(net, rs) + defer sg.Close() bg := blocksutil.NewBlockGenerator() - me := sg.Next() - w := sg.Next() - o := sg.Next() + oldVal := rebroadcastDelay + rebroadcastDelay = time.Second / 2 + defer func() { rebroadcastDelay = oldVal }() - t.Logf("Session %v\n", me.Peer) - t.Logf("Session %v\n", w.Peer) - t.Logf("Session %v\n", o.Peer) + peerA := sg.Next() + peerB := sg.Next() - alpha := bg.Next() + t.Logf("Session %v\n", peerA.Peer) + t.Logf("Session %v\n", peerB.Peer) - const timeout = 100 * time.Millisecond // FIXME don't depend on time + timeout := time.Second + waitTime := time.Second * 5 - t.Logf("Peer %v attempts to get %v. NB: not available\n", w.Peer, alpha.Key()) - ctx, _ := context.WithTimeout(context.Background(), timeout) - _, err := w.Exchange.GetBlock(ctx, alpha.Key()) - if err == nil { - t.Fatalf("Expected %v to NOT be available", alpha.Key()) + alpha := bg.Next() + // peerA requests and waits for block alpha + ctx, _ := context.WithTimeout(context.TODO(), waitTime) + alphaPromise, err := peerA.Exchange.GetBlocks(ctx, []u.Key{alpha.Key()}) + if err != nil { + t.Fatal(err) } - beta := bg.Next() - t.Logf("Peer %v announes availability of %v\n", w.Peer, beta.Key()) - ctx, _ = context.WithTimeout(context.Background(), timeout) - if err := w.Blockstore().Put(beta); err != nil { + // peerB announces to the network that he has block alpha + ctx, _ = context.WithTimeout(context.TODO(), timeout) + err = peerB.Exchange.HasBlock(ctx, alpha) + if err != nil { t.Fatal(err) } - w.Exchange.HasBlock(ctx, beta) - t.Logf("%v gets %v from %v and discovers it wants %v\n", me.Peer, beta.Key(), w.Peer, alpha.Key()) - ctx, _ = context.WithTimeout(context.Background(), timeout) - if _, err := me.Exchange.GetBlock(ctx, beta.Key()); err != nil { - t.Fatal(err) + // At some point, peerA should get alpha (or timeout) + blkrecvd, ok := <-alphaPromise + if !ok { + t.Fatal("context timed out and broke promise channel!") } - t.Logf("%v announces availability of %v\n", o.Peer, alpha.Key()) - ctx, _ = context.WithTimeout(context.Background(), timeout) - if err := o.Blockstore().Put(alpha); err != nil { - t.Fatal(err) + if blkrecvd.Key() != alpha.Key() { + t.Fatal("Wrong block!") } - o.Exchange.HasBlock(ctx, alpha) - t.Logf("%v requests %v\n", me.Peer, alpha.Key()) - ctx, _ = context.WithTimeout(context.Background(), timeout) - if _, err := me.Exchange.GetBlock(ctx, alpha.Key()); err != nil { +} + +func TestBasicBitswap(t *testing.T) { + net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay)) + rs := mockrouting.NewServer() + sg := NewSessionGenerator(net, rs) + bg := blocksutil.NewBlockGenerator() + + t.Log("Test a few nodes trying to get one file with a lot of blocks") + + instances := sg.Instances(2) + blocks := bg.Blocks(1) + err := instances[0].Exchange.HasBlock(context.TODO(), blocks[0]) + if err != nil { t.Fatal(err) } - t.Logf("%v should now have %v\n", w.Peer, alpha.Key()) - block, err := w.Blockstore().Get(alpha.Key()) + ctx, _ := context.WithTimeout(context.TODO(), time.Second*5) + blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Key()) if err != nil { - t.Fatalf("Should not have received an error: %s", err) + t.Fatal(err) } - if block.Key() != alpha.Key() { - t.Fatal("Expected to receive alpha from me") + + t.Log(blk) + for _, inst := range instances { + err := inst.Exchange.Close() + if err != nil { + t.Fatal(err) + } } } diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go new file mode 100644 index 00000000000..ea453943731 --- /dev/null +++ b/exchange/bitswap/decision/engine.go @@ -0,0 +1,224 @@ +package decision + +import ( + "sync" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + bstore "github.com/jbenet/go-ipfs/blocks/blockstore" + bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" + wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist" + peer "github.com/jbenet/go-ipfs/peer" + u "github.com/jbenet/go-ipfs/util" +) + +// TODO consider taking responsibility for other types of requests. For +// example, there could be a |cancelQueue| for all of the cancellation +// messages that need to go out. There could also be a |wantlistQueue| for +// the local peer's wantlists. Alternatively, these could all be bundled +// into a single, intelligent global queue that efficiently +// batches/combines and takes all of these into consideration. +// +// Right now, messages go onto the network for four reasons: +// 1. an initial `sendwantlist` message to a provider of the first key in a request +// 2. a periodic full sweep of `sendwantlist` messages to all providers +// 3. upon receipt of blocks, a `cancel` message to all peers +// 4. draining the priority queue of `blockrequests` from peers +// +// Presently, only `blockrequests` are handled by the decision engine. +// However, there is an opportunity to give it more responsibility! If the +// decision engine is given responsibility for all of the others, it can +// intelligently decide how to combine requests efficiently. +// +// Some examples of what would be possible: +// +// * when sending out the wantlists, include `cancel` requests +// * when handling `blockrequests`, include `sendwantlist` and `cancel` as appropriate +// * when handling `cancel`, if we recently received a wanted block from a +// peer, include a partial wantlist that contains a few other high priority +// blocks +// +// In a sense, if we treat the decision engine as a black box, it could do +// whatever it sees fit to produce desired outcomes (get wanted keys +// quickly, maintain good relationships with peers, etc). + +var log = u.Logger("engine") + +const ( + sizeOutboxChan = 4 +) + +// Envelope contains a message for a Peer +type Envelope struct { + // Peer is the intended recipient + Peer peer.Peer + // Message is the payload + Message bsmsg.BitSwapMessage +} + +type Engine struct { + // peerRequestQueue is a priority queue of requests received from peers. + // Requests are popped from the queue, packaged up, and placed in the + // outbox. + peerRequestQueue *taskQueue + + // FIXME it's a bit odd for the client and the worker to both share memory + // (both modify the peerRequestQueue) and also to communicate over the + // workSignal channel. consider sending requests over the channel and + // allowing the worker to have exclusive access to the peerRequestQueue. In + // that case, no lock would be required. + workSignal chan struct{} + + // outbox contains outgoing messages to peers + outbox chan Envelope + + bs bstore.Blockstore + + lock sync.RWMutex // protects the fields immediatly below + // ledgerMap lists Ledgers by their Partner key. + ledgerMap map[u.Key]*ledger +} + +func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine { + e := &Engine{ + ledgerMap: make(map[u.Key]*ledger), + bs: bs, + peerRequestQueue: newTaskQueue(), + outbox: make(chan Envelope, sizeOutboxChan), + workSignal: make(chan struct{}), + } + go e.taskWorker(ctx) + return e +} + +func (e *Engine) taskWorker(ctx context.Context) { + for { + nextTask := e.peerRequestQueue.Pop() + if nextTask == nil { + // No tasks in the list? + // Wait until there are! + select { + case <-ctx.Done(): + return + case <-e.workSignal: + } + continue + } + block, err := e.bs.Get(nextTask.Entry.Key) + if err != nil { + log.Warning("engine: task exists to send block, but block is not in blockstore") + continue + } + // construct message here so we can make decisions about any additional + // information we may want to include at this time. + m := bsmsg.New() + m.AddBlock(block) + // TODO: maybe add keys from our wantlist? + select { + case <-ctx.Done(): + return + case e.outbox <- Envelope{Peer: nextTask.Target, Message: m}: + } + } +} + +func (e *Engine) Outbox() <-chan Envelope { + return e.outbox +} + +// Returns a slice of Peers with whom the local node has active sessions +func (e *Engine) Peers() []peer.Peer { + e.lock.RLock() + defer e.lock.RUnlock() + + response := make([]peer.Peer, 0) + for _, ledger := range e.ledgerMap { + response = append(response, ledger.Partner) + } + return response +} + +// MessageReceived performs book-keeping. Returns error if passed invalid +// arguments. +func (e *Engine) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error { + newWorkExists := false + defer func() { + if newWorkExists { + // Signal task generation to restart (if stopped!) + select { + case e.workSignal <- struct{}{}: + default: + } + } + }() + e.lock.Lock() + defer e.lock.Unlock() + + l := e.findOrCreate(p) + if m.Full() { + l.wantList = wl.New() + } + for _, entry := range m.Wantlist() { + if entry.Cancel { + l.CancelWant(entry.Key) + e.peerRequestQueue.Remove(entry.Key, p) + } else { + l.Wants(entry.Key, entry.Priority) + if exists, err := e.bs.Has(entry.Key); err == nil && exists { + newWorkExists = true + e.peerRequestQueue.Push(entry.Entry, p) + } + } + } + + for _, block := range m.Blocks() { + // FIXME extract blocks.NumBytes(block) or block.NumBytes() method + l.ReceivedBytes(len(block.Data)) + for _, l := range e.ledgerMap { + if l.WantListContains(block.Key()) { + newWorkExists = true + e.peerRequestQueue.Push(wl.Entry{block.Key(), 1}, l.Partner) + } + } + } + return nil +} + +// TODO add contents of m.WantList() to my local wantlist? NB: could introduce +// race conditions where I send a message, but MessageSent gets handled after +// MessageReceived. The information in the local wantlist could become +// inconsistent. Would need to ensure that Sends and acknowledgement of the +// send happen atomically + +func (e *Engine) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error { + e.lock.Lock() + defer e.lock.Unlock() + + l := e.findOrCreate(p) + for _, block := range m.Blocks() { + l.SentBytes(len(block.Data)) + l.wantList.Remove(block.Key()) + e.peerRequestQueue.Remove(block.Key(), p) + } + + return nil +} + +func (e *Engine) numBytesSentTo(p peer.Peer) uint64 { + // NB not threadsafe + return e.findOrCreate(p).Accounting.BytesSent +} + +func (e *Engine) numBytesReceivedFrom(p peer.Peer) uint64 { + // NB not threadsafe + return e.findOrCreate(p).Accounting.BytesRecv +} + +// ledger lazily instantiates a ledger +func (e *Engine) findOrCreate(p peer.Peer) *ledger { + l, ok := e.ledgerMap[p.Key()] + if !ok { + l = newLedger(p) + e.ledgerMap[p.Key()] = l + } + return l +} diff --git a/exchange/bitswap/decision/engine_test.go b/exchange/bitswap/decision/engine_test.go new file mode 100644 index 00000000000..14893757303 --- /dev/null +++ b/exchange/bitswap/decision/engine_test.go @@ -0,0 +1,93 @@ +package decision + +import ( + "strings" + "testing" + + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" + blocks "github.com/jbenet/go-ipfs/blocks" + blockstore "github.com/jbenet/go-ipfs/blocks/blockstore" + message "github.com/jbenet/go-ipfs/exchange/bitswap/message" + peer "github.com/jbenet/go-ipfs/peer" + testutil "github.com/jbenet/go-ipfs/util/testutil" +) + +type peerAndEngine struct { + peer.Peer + Engine *Engine +} + +func newPeerAndLedgermanager(idStr string) peerAndEngine { + return peerAndEngine{ + Peer: testutil.NewPeerWithIDString(idStr), + //Strategy: New(true), + Engine: NewEngine(context.TODO(), + blockstore.NewBlockstore(sync.MutexWrap(ds.NewMapDatastore()))), + } +} + +func TestConsistentAccounting(t *testing.T) { + sender := newPeerAndLedgermanager("Ernie") + receiver := newPeerAndLedgermanager("Bert") + + // Send messages from Ernie to Bert + for i := 0; i < 1000; i++ { + + m := message.New() + content := []string{"this", "is", "message", "i"} + m.AddBlock(blocks.NewBlock([]byte(strings.Join(content, " ")))) + + sender.Engine.MessageSent(receiver.Peer, m) + receiver.Engine.MessageReceived(sender.Peer, m) + } + + // Ensure sender records the change + if sender.Engine.numBytesSentTo(receiver.Peer) == 0 { + t.Fatal("Sent bytes were not recorded") + } + + // Ensure sender and receiver have the same values + if sender.Engine.numBytesSentTo(receiver.Peer) != receiver.Engine.numBytesReceivedFrom(sender.Peer) { + t.Fatal("Inconsistent book-keeping. Strategies don't agree") + } + + // Ensure sender didn't record receving anything. And that the receiver + // didn't record sending anything + if receiver.Engine.numBytesSentTo(sender.Peer) != 0 || sender.Engine.numBytesReceivedFrom(receiver.Peer) != 0 { + t.Fatal("Bert didn't send bytes to Ernie") + } +} + +func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) { + + sanfrancisco := newPeerAndLedgermanager("sf") + seattle := newPeerAndLedgermanager("sea") + + m := message.New() + + sanfrancisco.Engine.MessageSent(seattle.Peer, m) + seattle.Engine.MessageReceived(sanfrancisco.Peer, m) + + if seattle.Peer.Key() == sanfrancisco.Peer.Key() { + t.Fatal("Sanity Check: Peers have same Key!") + } + + if !peerIsPartner(seattle.Peer, sanfrancisco.Engine) { + t.Fatal("Peer wasn't added as a Partner") + } + + if !peerIsPartner(sanfrancisco.Peer, seattle.Engine) { + t.Fatal("Peer wasn't added as a Partner") + } +} + +func peerIsPartner(p peer.Peer, e *Engine) bool { + for _, partner := range e.Peers() { + if partner.Key() == p.Key() { + return true + } + } + return false +} diff --git a/exchange/bitswap/strategy/ledger.go b/exchange/bitswap/decision/ledger.go similarity index 74% rename from exchange/bitswap/strategy/ledger.go rename to exchange/bitswap/decision/ledger.go index 84e92d0356e..eea87af1fcf 100644 --- a/exchange/bitswap/strategy/ledger.go +++ b/exchange/bitswap/decision/ledger.go @@ -1,8 +1,9 @@ -package strategy +package decision import ( "time" + wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist" peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" ) @@ -11,10 +12,9 @@ import ( // access/lookups. type keySet map[u.Key]struct{} -func newLedger(p peer.Peer, strategy strategyFunc) *ledger { +func newLedger(p peer.Peer) *ledger { return &ledger{ - wantList: keySet{}, - Strategy: strategy, + wantList: wl.New(), Partner: p, sentToPeer: make(map[u.Key]time.Time), } @@ -39,17 +39,20 @@ type ledger struct { exchangeCount uint64 // wantList is a (bounded, small) set of keys that Partner desires. - wantList keySet + wantList *wl.Wantlist // sentToPeer is a set of keys to ensure we dont send duplicate blocks // to a given peer sentToPeer map[u.Key]time.Time +} - Strategy strategyFunc +type debtRatio struct { + BytesSent uint64 + BytesRecv uint64 } -func (l *ledger) ShouldSend() bool { - return l.Strategy(l) +func (dr *debtRatio) Value() float64 { + return float64(dr.BytesSent) / float64(dr.BytesRecv+1) } func (l *ledger) SentBytes(n int) { @@ -65,14 +68,17 @@ func (l *ledger) ReceivedBytes(n int) { } // TODO: this needs to be different. We need timeouts. -func (l *ledger) Wants(k u.Key) { +func (l *ledger) Wants(k u.Key, priority int) { log.Debugf("peer %s wants %s", l.Partner, k) - l.wantList[k] = struct{}{} + l.wantList.Add(k, priority) +} + +func (l *ledger) CancelWant(k u.Key) { + l.wantList.Remove(k) } func (l *ledger) WantListContains(k u.Key) bool { - _, ok := l.wantList[k] - return ok + return l.wantList.Contains(k) } func (l *ledger) ExchangeCount() uint64 { diff --git a/exchange/bitswap/decision/taskqueue.go b/exchange/bitswap/decision/taskqueue.go new file mode 100644 index 00000000000..a76c56e9ba3 --- /dev/null +++ b/exchange/bitswap/decision/taskqueue.go @@ -0,0 +1,84 @@ +package decision + +import ( + "sync" + + wantlist "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist" + peer "github.com/jbenet/go-ipfs/peer" + u "github.com/jbenet/go-ipfs/util" +) + +// TODO: at some point, the strategy needs to plug in here +// to help decide how to sort tasks (on add) and how to select +// tasks (on getnext). For now, we are assuming a dumb/nice strategy. +type taskQueue struct { + // TODO: make this into a priority queue + lock sync.Mutex + tasks []*task + taskmap map[string]*task +} + +func newTaskQueue() *taskQueue { + return &taskQueue{ + taskmap: make(map[string]*task), + } +} + +type task struct { + Entry wantlist.Entry + Target peer.Peer + Trash bool +} + +// Push currently adds a new task to the end of the list +func (tl *taskQueue) Push(entry wantlist.Entry, to peer.Peer) { + tl.lock.Lock() + defer tl.lock.Unlock() + if task, ok := tl.taskmap[taskKey(to, entry.Key)]; ok { + // TODO: when priority queue is implemented, + // rearrange this task + task.Entry.Priority = entry.Priority + return + } + task := &task{ + Entry: entry, + Target: to, + } + tl.tasks = append(tl.tasks, task) + tl.taskmap[taskKey(to, entry.Key)] = task +} + +// Pop 'pops' the next task to be performed. Returns nil no task exists. +func (tl *taskQueue) Pop() *task { + tl.lock.Lock() + defer tl.lock.Unlock() + var out *task + for len(tl.tasks) > 0 { + // TODO: instead of zero, use exponential distribution + // it will help reduce the chance of receiving + // the same block from multiple peers + out = tl.tasks[0] + tl.tasks = tl.tasks[1:] + delete(tl.taskmap, taskKey(out.Target, out.Entry.Key)) + if out.Trash { + continue // discarding tasks that have been removed + } + break // and return |out| + } + return out +} + +// Remove lazily removes a task from the queue +func (tl *taskQueue) Remove(k u.Key, p peer.Peer) { + tl.lock.Lock() + t, ok := tl.taskmap[taskKey(p, k)] + if ok { + t.Trash = true + } + tl.lock.Unlock() +} + +// taskKey returns a key that uniquely identifies a task. +func taskKey(p peer.Peer, k u.Key) string { + return string(p.Key() + k) +} diff --git a/exchange/bitswap/message/internal/pb/message.pb.go b/exchange/bitswap/message/internal/pb/message.pb.go index f6f8a9bbc35..4ddfc56f756 100644 --- a/exchange/bitswap/message/internal/pb/message.pb.go +++ b/exchange/bitswap/message/internal/pb/message.pb.go @@ -21,16 +21,16 @@ var _ = proto.Marshal var _ = math.Inf type Message struct { - Wantlist []string `protobuf:"bytes,1,rep,name=wantlist" json:"wantlist,omitempty"` - Blocks [][]byte `protobuf:"bytes,2,rep,name=blocks" json:"blocks,omitempty"` - XXX_unrecognized []byte `json:"-"` + Wantlist *Message_Wantlist `protobuf:"bytes,1,opt,name=wantlist" json:"wantlist,omitempty"` + Blocks [][]byte `protobuf:"bytes,2,rep,name=blocks" json:"blocks,omitempty"` + XXX_unrecognized []byte `json:"-"` } func (m *Message) Reset() { *m = Message{} } func (m *Message) String() string { return proto.CompactTextString(m) } func (*Message) ProtoMessage() {} -func (m *Message) GetWantlist() []string { +func (m *Message) GetWantlist() *Message_Wantlist { if m != nil { return m.Wantlist } @@ -44,5 +44,61 @@ func (m *Message) GetBlocks() [][]byte { return nil } +type Message_Wantlist struct { + Entries []*Message_Wantlist_Entry `protobuf:"bytes,1,rep,name=entries" json:"entries,omitempty"` + Full *bool `protobuf:"varint,2,opt,name=full" json:"full,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Message_Wantlist) Reset() { *m = Message_Wantlist{} } +func (m *Message_Wantlist) String() string { return proto.CompactTextString(m) } +func (*Message_Wantlist) ProtoMessage() {} + +func (m *Message_Wantlist) GetEntries() []*Message_Wantlist_Entry { + if m != nil { + return m.Entries + } + return nil +} + +func (m *Message_Wantlist) GetFull() bool { + if m != nil && m.Full != nil { + return *m.Full + } + return false +} + +type Message_Wantlist_Entry struct { + Block *string `protobuf:"bytes,1,opt,name=block" json:"block,omitempty"` + Priority *int32 `protobuf:"varint,2,opt,name=priority" json:"priority,omitempty"` + Cancel *bool `protobuf:"varint,3,opt,name=cancel" json:"cancel,omitempty"` + XXX_unrecognized []byte `json:"-"` +} + +func (m *Message_Wantlist_Entry) Reset() { *m = Message_Wantlist_Entry{} } +func (m *Message_Wantlist_Entry) String() string { return proto.CompactTextString(m) } +func (*Message_Wantlist_Entry) ProtoMessage() {} + +func (m *Message_Wantlist_Entry) GetBlock() string { + if m != nil && m.Block != nil { + return *m.Block + } + return "" +} + +func (m *Message_Wantlist_Entry) GetPriority() int32 { + if m != nil && m.Priority != nil { + return *m.Priority + } + return 0 +} + +func (m *Message_Wantlist_Entry) GetCancel() bool { + if m != nil && m.Cancel != nil { + return *m.Cancel + } + return false +} + func init() { } diff --git a/exchange/bitswap/message/internal/pb/message.proto b/exchange/bitswap/message/internal/pb/message.proto index a8c6c72520b..7c44f3a6b59 100644 --- a/exchange/bitswap/message/internal/pb/message.proto +++ b/exchange/bitswap/message/internal/pb/message.proto @@ -1,6 +1,19 @@ package bitswap.message.pb; message Message { - repeated string wantlist = 1; - repeated bytes blocks = 2; + + message Wantlist { + + message Entry { + optional string block = 1; // the block key + optional int32 priority = 2; // the priority (normalized). default to 1 + optional bool cancel = 3; // whether this revokes an entry + } + + repeated Entry entries = 1; // a list of wantlist entries + optional bool full = 2; // whether this is the full wantlist. default to false + } + + optional Wantlist wantlist = 1; + repeated bytes blocks = 2; } diff --git a/exchange/bitswap/message/message.go b/exchange/bitswap/message/message.go index 62a39be91b0..7f7f1d08e35 100644 --- a/exchange/bitswap/message/message.go +++ b/exchange/bitswap/message/message.go @@ -5,10 +5,12 @@ import ( blocks "github.com/jbenet/go-ipfs/blocks" pb "github.com/jbenet/go-ipfs/exchange/bitswap/message/internal/pb" + wantlist "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist" inet "github.com/jbenet/go-ipfs/net" u "github.com/jbenet/go-ipfs/util" ggio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io" + proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" ) // TODO move message.go into the bitswap package @@ -17,21 +19,23 @@ import ( type BitSwapMessage interface { // Wantlist returns a slice of unique keys that represent data wanted by // the sender. - Wantlist() []u.Key + Wantlist() []Entry // Blocks returns a slice of unique blocks Blocks() []*blocks.Block - // AddWanted adds the key to the Wantlist. - // - // Insertion order determines priority. That is, earlier insertions are - // deemed higher priority than keys inserted later. - // - // t = 0, msg.AddWanted(A) - // t = 1, msg.AddWanted(B) - // - // implies Priority(A) > Priority(B) - AddWanted(u.Key) + // AddEntry adds an entry to the Wantlist. + AddEntry(key u.Key, priority int) + + Cancel(key u.Key) + + // Sets whether or not the contained wantlist represents the entire wantlist + // true = full wantlist + // false = wantlist 'patch' + // default: true + SetFull(isFull bool) + + Full() bool AddBlock(*blocks.Block) Exportable @@ -43,23 +47,33 @@ type Exportable interface { } type impl struct { - existsInWantlist map[u.Key]struct{} // map to detect duplicates - wantlist []u.Key // slice to preserve ordering - blocks map[u.Key]*blocks.Block // map to detect duplicates + full bool + wantlist map[u.Key]Entry + blocks map[u.Key]*blocks.Block // map to detect duplicates } func New() BitSwapMessage { + return newMsg() +} + +func newMsg() *impl { return &impl{ - blocks: make(map[u.Key]*blocks.Block), - existsInWantlist: make(map[u.Key]struct{}), - wantlist: make([]u.Key, 0), + blocks: make(map[u.Key]*blocks.Block), + wantlist: make(map[u.Key]Entry), + full: true, } } +type Entry struct { + wantlist.Entry + Cancel bool +} + func newMessageFromProto(pbm pb.Message) BitSwapMessage { - m := New() - for _, s := range pbm.GetWantlist() { - m.AddWanted(u.Key(s)) + m := newMsg() + m.SetFull(pbm.GetWantlist().GetFull()) + for _, e := range pbm.GetWantlist().GetEntries() { + m.addEntry(u.Key(e.GetBlock()), int(e.GetPriority()), e.GetCancel()) } for _, d := range pbm.GetBlocks() { b := blocks.NewBlock(d) @@ -68,8 +82,20 @@ func newMessageFromProto(pbm pb.Message) BitSwapMessage { return m } -func (m *impl) Wantlist() []u.Key { - return m.wantlist +func (m *impl) SetFull(full bool) { + m.full = full +} + +func (m *impl) Full() bool { + return m.full +} + +func (m *impl) Wantlist() []Entry { + var out []Entry + for _, e := range m.wantlist { + out = append(out, e) + } + return out } func (m *impl) Blocks() []*blocks.Block { @@ -80,13 +106,28 @@ func (m *impl) Blocks() []*blocks.Block { return bs } -func (m *impl) AddWanted(k u.Key) { - _, exists := m.existsInWantlist[k] +func (m *impl) Cancel(k u.Key) { + m.addEntry(k, 0, true) +} + +func (m *impl) AddEntry(k u.Key, priority int) { + m.addEntry(k, priority, false) +} + +func (m *impl) addEntry(k u.Key, priority int, cancel bool) { + e, exists := m.wantlist[k] if exists { - return + e.Priority = priority + e.Cancel = cancel + } else { + m.wantlist[k] = Entry{ + Entry: wantlist.Entry{ + Key: k, + Priority: priority, + }, + Cancel: cancel, + } } - m.existsInWantlist[k] = struct{}{} - m.wantlist = append(m.wantlist, k) } func (m *impl) AddBlock(b *blocks.Block) { @@ -106,14 +147,19 @@ func FromNet(r io.Reader) (BitSwapMessage, error) { } func (m *impl) ToProto() *pb.Message { - pb := new(pb.Message) - for _, k := range m.Wantlist() { - pb.Wantlist = append(pb.Wantlist, string(k)) + pbm := new(pb.Message) + pbm.Wantlist = new(pb.Message_Wantlist) + for _, e := range m.wantlist { + pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, &pb.Message_Wantlist_Entry{ + Block: proto.String(string(e.Key)), + Priority: proto.Int32(int32(e.Priority)), + Cancel: &e.Cancel, + }) } for _, b := range m.Blocks() { - pb.Blocks = append(pb.Blocks, b.Data) + pbm.Blocks = append(pbm.Blocks, b.Data) } - return pb + return pbm } func (m *impl) ToNet(w io.Writer) error { diff --git a/exchange/bitswap/message/message_test.go b/exchange/bitswap/message/message_test.go index 681b60a6f30..a0df38c0b21 100644 --- a/exchange/bitswap/message/message_test.go +++ b/exchange/bitswap/message/message_test.go @@ -4,6 +4,8 @@ import ( "bytes" "testing" + proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" + blocks "github.com/jbenet/go-ipfs/blocks" pb "github.com/jbenet/go-ipfs/exchange/bitswap/message/internal/pb" u "github.com/jbenet/go-ipfs/util" @@ -12,22 +14,26 @@ import ( func TestAppendWanted(t *testing.T) { const str = "foo" m := New() - m.AddWanted(u.Key(str)) + m.AddEntry(u.Key(str), 1) - if !contains(m.ToProto().GetWantlist(), str) { + if !wantlistContains(m.ToProto().GetWantlist(), str) { t.Fail() } + m.ToProto().GetWantlist().GetEntries() } func TestNewMessageFromProto(t *testing.T) { const str = "a_key" protoMessage := new(pb.Message) - protoMessage.Wantlist = []string{string(str)} - if !contains(protoMessage.Wantlist, str) { + protoMessage.Wantlist = new(pb.Message_Wantlist) + protoMessage.Wantlist.Entries = []*pb.Message_Wantlist_Entry{ + &pb.Message_Wantlist_Entry{Block: proto.String(str)}, + } + if !wantlistContains(protoMessage.Wantlist, str) { t.Fail() } m := newMessageFromProto(*protoMessage) - if !contains(m.ToProto().GetWantlist(), str) { + if !wantlistContains(m.ToProto().GetWantlist(), str) { t.Fail() } } @@ -57,7 +63,7 @@ func TestWantlist(t *testing.T) { keystrs := []string{"foo", "bar", "baz", "bat"} m := New() for _, s := range keystrs { - m.AddWanted(u.Key(s)) + m.AddEntry(u.Key(s), 1) } exported := m.Wantlist() @@ -65,12 +71,12 @@ func TestWantlist(t *testing.T) { present := false for _, s := range keystrs { - if s == string(k) { + if s == string(k.Key) { present = true } } if !present { - t.Logf("%v isn't in original list", string(k)) + t.Logf("%v isn't in original list", k.Key) t.Fail() } } @@ -80,19 +86,19 @@ func TestCopyProtoByValue(t *testing.T) { const str = "foo" m := New() protoBeforeAppend := m.ToProto() - m.AddWanted(u.Key(str)) - if contains(protoBeforeAppend.GetWantlist(), str) { + m.AddEntry(u.Key(str), 1) + if wantlistContains(protoBeforeAppend.GetWantlist(), str) { t.Fail() } } func TestToNetFromNetPreservesWantList(t *testing.T) { original := New() - original.AddWanted(u.Key("M")) - original.AddWanted(u.Key("B")) - original.AddWanted(u.Key("D")) - original.AddWanted(u.Key("T")) - original.AddWanted(u.Key("F")) + original.AddEntry(u.Key("M"), 1) + original.AddEntry(u.Key("B"), 1) + original.AddEntry(u.Key("D"), 1) + original.AddEntry(u.Key("T"), 1) + original.AddEntry(u.Key("F"), 1) var buf bytes.Buffer if err := original.ToNet(&buf); err != nil { @@ -106,11 +112,11 @@ func TestToNetFromNetPreservesWantList(t *testing.T) { keys := make(map[u.Key]bool) for _, k := range copied.Wantlist() { - keys[k] = true + keys[k.Key] = true } for _, k := range original.Wantlist() { - if _, ok := keys[k]; !ok { + if _, ok := keys[k.Key]; !ok { t.Fatalf("Key Missing: \"%v\"", k) } } @@ -146,9 +152,18 @@ func TestToAndFromNetMessage(t *testing.T) { } } -func contains(s []string, x string) bool { - for _, a := range s { - if a == x { +func wantlistContains(wantlist *pb.Message_Wantlist, x string) bool { + for _, e := range wantlist.GetEntries() { + if e.GetBlock() == x { + return true + } + } + return false +} + +func contains(strs []string, x string) bool { + for _, s := range strs { + if s == x { return true } } @@ -159,8 +174,8 @@ func TestDuplicates(t *testing.T) { b := blocks.NewBlock([]byte("foo")) msg := New() - msg.AddWanted(b.Key()) - msg.AddWanted(b.Key()) + msg.AddEntry(b.Key(), 1) + msg.AddEntry(b.Key(), 1) if len(msg.Wantlist()) != 1 { t.Fatal("Duplicate in BitSwapMessage") } diff --git a/exchange/bitswap/strategy/interface.go b/exchange/bitswap/strategy/interface.go deleted file mode 100644 index 58385f5b76c..00000000000 --- a/exchange/bitswap/strategy/interface.go +++ /dev/null @@ -1,40 +0,0 @@ -package strategy - -import ( - "time" - - bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" - peer "github.com/jbenet/go-ipfs/peer" - u "github.com/jbenet/go-ipfs/util" -) - -type Strategy interface { - // Returns a slice of Peers with whom the local node has active sessions - Peers() []peer.Peer - - // BlockIsWantedByPeer returns true if peer wants the block given by this - // key - BlockIsWantedByPeer(u.Key, peer.Peer) bool - - // ShouldSendTo(Peer) decides whether to send data to this Peer - ShouldSendBlockToPeer(u.Key, peer.Peer) bool - - // Seed initializes the decider to a deterministic state - Seed(int64) - - // MessageReceived records receipt of message for accounting purposes - MessageReceived(peer.Peer, bsmsg.BitSwapMessage) error - - // MessageSent records sending of message for accounting purposes - MessageSent(peer.Peer, bsmsg.BitSwapMessage) error - - NumBytesSentTo(peer.Peer) uint64 - - NumBytesReceivedFrom(peer.Peer) uint64 - - BlockSentToPeer(u.Key, peer.Peer) - - // Values determining bitswap behavioural patterns - GetBatchSize() int - GetRebroadcastDelay() time.Duration -} diff --git a/exchange/bitswap/strategy/ledger_test.go b/exchange/bitswap/strategy/ledger_test.go deleted file mode 100644 index 4271d525c20..00000000000 --- a/exchange/bitswap/strategy/ledger_test.go +++ /dev/null @@ -1 +0,0 @@ -package strategy diff --git a/exchange/bitswap/strategy/math.go b/exchange/bitswap/strategy/math.go deleted file mode 100644 index c5339e5b357..00000000000 --- a/exchange/bitswap/strategy/math.go +++ /dev/null @@ -1,34 +0,0 @@ -package strategy - -import ( - "math" - "math/rand" -) - -type strategyFunc func(*ledger) bool - -// TODO avoid using rand.Float64 method. it uses a singleton lock and may cause -// performance issues. Instead, instantiate a rand struct and use that to call -// Float64() -func standardStrategy(l *ledger) bool { - return rand.Float64() <= probabilitySend(l.Accounting.Value()) -} - -func yesManStrategy(l *ledger) bool { - return true -} - -func probabilitySend(ratio float64) float64 { - x := 1 + math.Exp(6-3*ratio) - y := 1 / x - return 1 - y -} - -type debtRatio struct { - BytesSent uint64 - BytesRecv uint64 -} - -func (dr *debtRatio) Value() float64 { - return float64(dr.BytesSent) / float64(dr.BytesRecv+1) -} diff --git a/exchange/bitswap/strategy/math_test.go b/exchange/bitswap/strategy/math_test.go deleted file mode 100644 index 58092bc09df..00000000000 --- a/exchange/bitswap/strategy/math_test.go +++ /dev/null @@ -1,17 +0,0 @@ -package strategy - -import ( - "testing" -) - -func TestProbabilitySendDecreasesAsRatioIncreases(t *testing.T) { - grateful := debtRatio{BytesSent: 0, BytesRecv: 10000} - pWhenGrateful := probabilitySend(grateful.Value()) - - abused := debtRatio{BytesSent: 10000, BytesRecv: 0} - pWhenAbused := probabilitySend(abused.Value()) - - if pWhenGrateful < pWhenAbused { - t.Fail() - } -} diff --git a/exchange/bitswap/strategy/strategy.go b/exchange/bitswap/strategy/strategy.go deleted file mode 100644 index fe7414caa73..00000000000 --- a/exchange/bitswap/strategy/strategy.go +++ /dev/null @@ -1,169 +0,0 @@ -package strategy - -import ( - "errors" - "sync" - "time" - - bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" - peer "github.com/jbenet/go-ipfs/peer" - u "github.com/jbenet/go-ipfs/util" -) - -const resendTimeoutPeriod = time.Minute - -var log = u.Logger("strategy") - -// TODO niceness should be on a per-peer basis. Use-case: Certain peers are -// "trusted" and/or controlled by a single human user. The user may want for -// these peers to exchange data freely -func New(nice bool) Strategy { - var stratFunc strategyFunc - if nice { - stratFunc = yesManStrategy - } else { - stratFunc = standardStrategy - } - return &strategist{ - ledgerMap: ledgerMap{}, - strategyFunc: stratFunc, - } -} - -type strategist struct { - lock sync.RWMutex - ledgerMap - strategyFunc -} - -// LedgerMap lists Ledgers by their Partner key. -type ledgerMap map[peerKey]*ledger - -// FIXME share this externally -type peerKey u.Key - -// Peers returns a list of peers -func (s *strategist) Peers() []peer.Peer { - s.lock.RLock() - defer s.lock.RUnlock() - - response := make([]peer.Peer, 0) - for _, ledger := range s.ledgerMap { - response = append(response, ledger.Partner) - } - return response -} - -func (s *strategist) BlockIsWantedByPeer(k u.Key, p peer.Peer) bool { - s.lock.RLock() - defer s.lock.RUnlock() - - ledger := s.ledger(p) - return ledger.WantListContains(k) -} - -func (s *strategist) ShouldSendBlockToPeer(k u.Key, p peer.Peer) bool { - s.lock.RLock() - defer s.lock.RUnlock() - - ledger := s.ledger(p) - - // Dont resend blocks within a certain time period - t, ok := ledger.sentToPeer[k] - if ok && t.Add(resendTimeoutPeriod).After(time.Now()) { - return false - } - - return ledger.ShouldSend() -} - -func (s *strategist) BlockSentToPeer(k u.Key, p peer.Peer) { - s.lock.Lock() - defer s.lock.Unlock() - - ledger := s.ledger(p) - ledger.sentToPeer[k] = time.Now() -} - -func (s *strategist) Seed(int64) { - s.lock.Lock() - defer s.lock.Unlock() - - // TODO -} - -// MessageReceived performs book-keeping. Returns error if passed invalid -// arguments. -func (s *strategist) MessageReceived(p peer.Peer, m bsmsg.BitSwapMessage) error { - s.lock.Lock() - defer s.lock.Unlock() - - // TODO find a more elegant way to handle this check - if p == nil { - return errors.New("Strategy received nil peer") - } - if m == nil { - return errors.New("Strategy received nil message") - } - l := s.ledger(p) - for _, key := range m.Wantlist() { - l.Wants(key) - } - for _, block := range m.Blocks() { - // FIXME extract blocks.NumBytes(block) or block.NumBytes() method - l.ReceivedBytes(len(block.Data)) - } - return nil -} - -// TODO add contents of m.WantList() to my local wantlist? NB: could introduce -// race conditions where I send a message, but MessageSent gets handled after -// MessageReceived. The information in the local wantlist could become -// inconsistent. Would need to ensure that Sends and acknowledgement of the -// send happen atomically - -func (s *strategist) MessageSent(p peer.Peer, m bsmsg.BitSwapMessage) error { - s.lock.Lock() - defer s.lock.Unlock() - - l := s.ledger(p) - for _, block := range m.Blocks() { - l.SentBytes(len(block.Data)) - } - - // TODO remove these blocks from peer's want list - - return nil -} - -func (s *strategist) NumBytesSentTo(p peer.Peer) uint64 { - s.lock.RLock() - defer s.lock.RUnlock() - - return s.ledger(p).Accounting.BytesSent -} - -func (s *strategist) NumBytesReceivedFrom(p peer.Peer) uint64 { - s.lock.RLock() - defer s.lock.RUnlock() - - return s.ledger(p).Accounting.BytesRecv -} - -// ledger lazily instantiates a ledger -func (s *strategist) ledger(p peer.Peer) *ledger { - l, ok := s.ledgerMap[peerKey(p.Key())] - if !ok { - l = newLedger(p, s.strategyFunc) - s.ledgerMap[peerKey(p.Key())] = l - } - return l -} - -func (s *strategist) GetBatchSize() int { - return 10 -} - -func (s *strategist) GetRebroadcastDelay() time.Duration { - return time.Second * 5 -} diff --git a/exchange/bitswap/strategy/strategy_test.go b/exchange/bitswap/strategy/strategy_test.go deleted file mode 100644 index e063dff688e..00000000000 --- a/exchange/bitswap/strategy/strategy_test.go +++ /dev/null @@ -1,104 +0,0 @@ -package strategy - -import ( - "strings" - "testing" - - blocks "github.com/jbenet/go-ipfs/blocks" - message "github.com/jbenet/go-ipfs/exchange/bitswap/message" - peer "github.com/jbenet/go-ipfs/peer" - testutil "github.com/jbenet/go-ipfs/util/testutil" -) - -type peerAndStrategist struct { - peer.Peer - Strategy -} - -func newPeerAndStrategist(idStr string) peerAndStrategist { - return peerAndStrategist{ - Peer: testutil.NewPeerWithIDString(idStr), - Strategy: New(true), - } -} - -func TestConsistentAccounting(t *testing.T) { - sender := newPeerAndStrategist("Ernie") - receiver := newPeerAndStrategist("Bert") - - // Send messages from Ernie to Bert - for i := 0; i < 1000; i++ { - - m := message.New() - content := []string{"this", "is", "message", "i"} - m.AddBlock(blocks.NewBlock([]byte(strings.Join(content, " ")))) - - sender.MessageSent(receiver.Peer, m) - receiver.MessageReceived(sender.Peer, m) - } - - // Ensure sender records the change - if sender.NumBytesSentTo(receiver.Peer) == 0 { - t.Fatal("Sent bytes were not recorded") - } - - // Ensure sender and receiver have the same values - if sender.NumBytesSentTo(receiver.Peer) != receiver.NumBytesReceivedFrom(sender.Peer) { - t.Fatal("Inconsistent book-keeping. Strategies don't agree") - } - - // Ensure sender didn't record receving anything. And that the receiver - // didn't record sending anything - if receiver.NumBytesSentTo(sender.Peer) != 0 || sender.NumBytesReceivedFrom(receiver.Peer) != 0 { - t.Fatal("Bert didn't send bytes to Ernie") - } -} - -func TestBlockRecordedAsWantedAfterMessageReceived(t *testing.T) { - beggar := newPeerAndStrategist("can't be chooser") - chooser := newPeerAndStrategist("chooses JIF") - - block := blocks.NewBlock([]byte("data wanted by beggar")) - - messageFromBeggarToChooser := message.New() - messageFromBeggarToChooser.AddWanted(block.Key()) - - chooser.MessageReceived(beggar.Peer, messageFromBeggarToChooser) - // for this test, doesn't matter if you record that beggar sent - - if !chooser.BlockIsWantedByPeer(block.Key(), beggar.Peer) { - t.Fatal("chooser failed to record that beggar wants block") - } -} - -func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) { - - sanfrancisco := newPeerAndStrategist("sf") - seattle := newPeerAndStrategist("sea") - - m := message.New() - - sanfrancisco.MessageSent(seattle.Peer, m) - seattle.MessageReceived(sanfrancisco.Peer, m) - - if seattle.Peer.Key() == sanfrancisco.Peer.Key() { - t.Fatal("Sanity Check: Peers have same Key!") - } - - if !peerIsPartner(seattle.Peer, sanfrancisco.Strategy) { - t.Fatal("Peer wasn't added as a Partner") - } - - if !peerIsPartner(sanfrancisco.Peer, seattle.Strategy) { - t.Fatal("Peer wasn't added as a Partner") - } -} - -func peerIsPartner(p peer.Peer, s Strategy) bool { - for _, partner := range s.Peers() { - if partner.Key() == p.Key() { - return true - } - } - return false -} diff --git a/exchange/bitswap/wantlist/wantlist.go b/exchange/bitswap/wantlist/wantlist.go new file mode 100644 index 00000000000..aa58ee155f5 --- /dev/null +++ b/exchange/bitswap/wantlist/wantlist.go @@ -0,0 +1,111 @@ +package wantlist + +import ( + u "github.com/jbenet/go-ipfs/util" + "sort" + "sync" +) + +type ThreadSafe struct { + lk sync.RWMutex + Wantlist +} + +// not threadsafe +type Wantlist struct { + set map[u.Key]Entry +} + +type Entry struct { + // TODO consider making entries immutable so they can be shared safely and + // slices can be copied efficiently. + Key u.Key + Priority int +} + +type entrySlice []Entry + +func (es entrySlice) Len() int { return len(es) } +func (es entrySlice) Swap(i, j int) { es[i], es[j] = es[j], es[i] } +func (es entrySlice) Less(i, j int) bool { return es[i].Priority > es[j].Priority } + +func NewThreadSafe() *ThreadSafe { + return &ThreadSafe{ + Wantlist: *New(), + } +} + +func New() *Wantlist { + return &Wantlist{ + set: make(map[u.Key]Entry), + } +} + +func (w *ThreadSafe) Add(k u.Key, priority int) { + // TODO rm defer for perf + w.lk.Lock() + defer w.lk.Unlock() + w.Wantlist.Add(k, priority) +} + +func (w *ThreadSafe) Remove(k u.Key) { + // TODO rm defer for perf + w.lk.Lock() + defer w.lk.Unlock() + w.Wantlist.Remove(k) +} + +func (w *ThreadSafe) Contains(k u.Key) bool { + // TODO rm defer for perf + w.lk.RLock() + defer w.lk.RUnlock() + return w.Wantlist.Contains(k) +} + +func (w *ThreadSafe) Entries() []Entry { + w.lk.RLock() + defer w.lk.RUnlock() + return w.Wantlist.Entries() +} + +func (w *ThreadSafe) SortedEntries() []Entry { + w.lk.RLock() + defer w.lk.RUnlock() + return w.Wantlist.SortedEntries() +} + +func (w *Wantlist) Add(k u.Key, priority int) { + if _, ok := w.set[k]; ok { + return + } + w.set[k] = Entry{ + Key: k, + Priority: priority, + } +} + +func (w *Wantlist) Remove(k u.Key) { + delete(w.set, k) +} + +func (w *Wantlist) Contains(k u.Key) bool { + _, ok := w.set[k] + return ok +} + +func (w *Wantlist) Entries() []Entry { + var es entrySlice + for _, e := range w.set { + es = append(es, e) + } + return es +} + +func (w *Wantlist) SortedEntries() []Entry { + var es entrySlice + for _, e := range w.set { + es = append(es, e) + } + sort.Sort(es) + return es +} diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index d22e3a39698..3d8916b034f 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -2,6 +2,7 @@ package merkledag import ( + "bytes" "fmt" "sync" "time" @@ -294,8 +295,9 @@ func FetchGraph(ctx context.Context, root *Node, serv DAGService) chan struct{} // returns the indexes of any links pointing to it func FindLinks(n *Node, k u.Key) []int { var out []int + keybytes := []byte(k) for i, lnk := range n.Links { - if u.Key(lnk.Hash) == k { + if bytes.Equal([]byte(lnk.Hash), keybytes) { out = append(out, i) } } diff --git a/net/backpressure/backpressure.go b/net/backpressure/backpressure.go new file mode 100644 index 00000000000..19950a72801 --- /dev/null +++ b/net/backpressure/backpressure.go @@ -0,0 +1 @@ +package backpressure_tests diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 5a68bd75925..6e49c84cfda 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -120,15 +120,12 @@ func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.Peer, key string) er // add self as the provider pmes.ProviderPeers = pb.PeersToPBPeers(dht.network, []peer.Peer{dht.self}) - rpmes, err := dht.sendRequest(ctx, p, pmes) + err := dht.sendMessage(ctx, p, pmes) if err != nil { return err } log.Debugf("%s putProvider: %s for %s", dht.self, p, u.Key(key)) - if rpmes.GetKey() != pmes.GetKey() { - return errors.New("provider not added correctly") - } return nil } diff --git a/routing/dht/dht_net.go b/routing/dht/dht_net.go index d1898f15c37..6e46b4de63b 100644 --- a/routing/dht/dht_net.go +++ b/routing/dht/dht_net.go @@ -8,8 +8,8 @@ import ( peer "github.com/jbenet/go-ipfs/peer" pb "github.com/jbenet/go-ipfs/routing/dht/pb" - ggio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ggio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io" ) // handleNewStream implements the inet.StreamHandler @@ -102,3 +102,24 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *pb.Messa log.Event(ctx, "dhtReceivedMessage", dht.self, p, rpmes) return rpmes, nil } + +// sendMessage sends out a message +func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.Peer, pmes *pb.Message) error { + + log.Debugf("%s dht starting stream", dht.self) + s, err := dht.network.NewStream(inet.ProtocolDHT, p) + if err != nil { + return err + } + defer s.Close() + + w := ggio.NewDelimitedWriter(s) + + log.Debugf("%s writing", dht.self) + if err := w.WriteMsg(pmes); err != nil { + return err + } + log.Event(ctx, "dhtSentMessage", dht.self, p, pmes) + log.Debugf("%s done", dht.self) + return nil +} diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 76260e71085..51f15ff21be 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -11,6 +11,7 @@ import ( pb "github.com/jbenet/go-ipfs/routing/dht/pb" kb "github.com/jbenet/go-ipfs/routing/kbucket" u "github.com/jbenet/go-ipfs/util" + pset "github.com/jbenet/go-ipfs/util/peerset" ) // asyncQueryBuffer is the size of buffered channels in async queries. This @@ -140,11 +141,11 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, count int, peerOut chan peer.Peer) { defer close(peerOut) - ps := newPeerSet() + ps := pset.NewLimited(count) provs := dht.providers.GetProviders(ctx, key) for _, p := range provs { // NOTE: assuming that this list of peers is unique - if ps.AddIfSmallerThan(p, count) { + if ps.TryAdd(p) { select { case peerOut <- p: case <-ctx.Done(): @@ -175,7 +176,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co // Add unique providers from request, up to 'count' for _, prov := range provs { - if ps.AddIfSmallerThan(prov, count) { + if ps.TryAdd(prov) { select { case peerOut <- prov: case <-ctx.Done(): @@ -207,7 +208,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co } } -func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.Message_Peer, ps *peerSet, count int, out chan peer.Peer) { +func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.Message_Peer, ps *pset.PeerSet, count int, out chan peer.Peer) { var wg sync.WaitGroup for _, pbp := range peers { wg.Add(1) @@ -225,7 +226,7 @@ func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.M } dht.providers.AddProvider(k, p) - if ps.AddIfSmallerThan(p, count) { + if ps.TryAdd(p) { select { case out <- p: case <-ctx.Done(): diff --git a/routing/dht/util.go b/routing/dht/util.go index 00ac38dbc7e..2b0c1e2a2a7 100644 --- a/routing/dht/util.go +++ b/routing/dht/util.go @@ -2,8 +2,6 @@ package dht import ( "sync" - - peer "github.com/jbenet/go-ipfs/peer" ) // Pool size is the number of nodes used for group find/set RPC calls @@ -39,45 +37,3 @@ func (c *counter) Size() (s int) { c.mut.Unlock() return } - -// peerSet is a threadsafe set of peers -type peerSet struct { - ps map[string]bool - lk sync.RWMutex -} - -func newPeerSet() *peerSet { - ps := new(peerSet) - ps.ps = make(map[string]bool) - return ps -} - -func (ps *peerSet) Add(p peer.Peer) { - ps.lk.Lock() - ps.ps[string(p.ID())] = true - ps.lk.Unlock() -} - -func (ps *peerSet) Contains(p peer.Peer) bool { - ps.lk.RLock() - _, ok := ps.ps[string(p.ID())] - ps.lk.RUnlock() - return ok -} - -func (ps *peerSet) Size() int { - ps.lk.RLock() - defer ps.lk.RUnlock() - return len(ps.ps) -} - -func (ps *peerSet) AddIfSmallerThan(p peer.Peer, maxsize int) bool { - var success bool - ps.lk.Lock() - if _, ok := ps.ps[string(p.ID())]; !ok && len(ps.ps) < maxsize { - success = true - ps.ps[string(p.ID())] = true - } - ps.lk.Unlock() - return success -} diff --git a/routing/mock/mockrouting_test.go b/routing/mock/mockrouting_test.go index 6700cd8eded..44b1b52bd64 100644 --- a/routing/mock/mockrouting_test.go +++ b/routing/mock/mockrouting_test.go @@ -36,6 +36,9 @@ func TestClientFindProviders(t *testing.T) { if err != nil { t.Fatal(err) } + + // This is bad... but simulating networks is hard + time.Sleep(time.Millisecond * 300) max := 100 providersFromHashTable, err := rs.Client(peer).FindProviders(context.Background(), k) @@ -160,6 +163,7 @@ func TestValidAfter(t *testing.T) { if err != nil { t.Fatal(err) } + t.Log("providers", providers) if len(providers) != 1 { t.Fail() } diff --git a/util/peerset/peerset.go b/util/peerset/peerset.go new file mode 100644 index 00000000000..5dcf266822c --- /dev/null +++ b/util/peerset/peerset.go @@ -0,0 +1,61 @@ +package peerset + +import ( + peer "github.com/jbenet/go-ipfs/peer" + "sync" +) + +// PeerSet is a threadsafe set of peers +type PeerSet struct { + ps map[string]bool // FIXME can be map[string]struct{} + lk sync.RWMutex + size int +} + +func New() *PeerSet { + ps := new(PeerSet) + ps.ps = make(map[string]bool) + ps.size = -1 + return ps +} + +func NewLimited(size int) *PeerSet { + ps := new(PeerSet) + ps.ps = make(map[string]bool) + ps.size = size + return ps +} + +func (ps *PeerSet) Add(p peer.Peer) { + ps.lk.Lock() + ps.ps[string(p.ID())] = true + ps.lk.Unlock() +} + +func (ps *PeerSet) Contains(p peer.Peer) bool { + ps.lk.RLock() + _, ok := ps.ps[string(p.ID())] + ps.lk.RUnlock() + return ok +} + +func (ps *PeerSet) Size() int { + ps.lk.RLock() + defer ps.lk.RUnlock() + return len(ps.ps) +} + +// TryAdd Attempts to add the given peer into the set. +// This operation can fail for one of two reasons: +// 1) The given peer is already in the set +// 2) The number of peers in the set is equal to size +func (ps *PeerSet) TryAdd(p peer.Peer) bool { + var success bool + ps.lk.Lock() + if _, ok := ps.ps[string(p.ID())]; !ok && (len(ps.ps) < ps.size || ps.size == -1) { + success = true + ps.ps[string(p.ID())] = true + } + ps.lk.Unlock() + return success +}