diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 5e00a5888a2..a785b15dc4c 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -78,9 +78,8 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) go func() { message := bsmsg.New() for _, wanted := range bs.wantlist.Keys() { - message.AppendWanted(wanted) + message.AddWanted(wanted) } - message.AppendWanted(k) for peerToQuery := range peersToQuery { log.Debugf("bitswap got peersToQuery: %s", peerToQuery) go func(p peer.Peer) { @@ -168,7 +167,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm message := bsmsg.New() for _, wanted := range bs.wantlist.Keys() { - message.AppendWanted(wanted) + message.AddWanted(wanted) } for _, key := range incoming.Wantlist() { // TODO: might be better to check if we have the block before checking @@ -177,7 +176,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil { continue } else { - message.AppendBlock(*block) + message.AddBlock(*block) } } } @@ -207,9 +206,9 @@ func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) log.Debugf("%v wants %v", p, block.Key()) if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) { message := bsmsg.New() - message.AppendBlock(block) + message.AddBlock(block) for _, wanted := range bs.wantlist.Keys() { - message.AppendWanted(wanted) + message.AddWanted(wanted) } go bs.send(ctx, p, message) } diff --git a/exchange/bitswap/message/message.go b/exchange/bitswap/message/message.go index b7216b02468..e0aea227d40 100644 --- a/exchange/bitswap/message/message.go +++ b/exchange/bitswap/message/message.go @@ -14,10 +14,25 @@ import ( // TODO move bs/msg/internal/pb to bs/internal/pb and rename pb package to bitswap_pb type BitSwapMessage interface { + // Wantlist returns a slice of unique keys that represent data wanted by + // the sender. Wantlist() []u.Key + + // Blocks returns a slice of unique blocks Blocks() []blocks.Block - AppendWanted(k u.Key) - AppendBlock(b blocks.Block) + + // AddWanted adds the key to the Wantlist. + // + // Insertion order determines priority. That is, earlier insertions are + // deemed higher priority than keys inserted later. + // + // t = 0, msg.AddWanted(A) + // t = 1, msg.AddWanted(B) + // + // implies Priority(A) > Priority(B) + AddWanted(u.Key) + + AddBlock(blocks.Block) Exportable } @@ -26,44 +41,55 @@ type Exportable interface { ToNet(p peer.Peer) (nm.NetMessage, error) } -// message wraps a proto message for convenience -type message struct { - wantlist []u.Key - blocks []blocks.Block +type impl struct { + existsInWantlist map[u.Key]struct{} // map to detect duplicates + wantlist []u.Key // slice to preserve ordering + blocks map[u.Key]blocks.Block // map to detect duplicates } -func New() *message { - return new(message) +func New() BitSwapMessage { + return &impl{ + blocks: make(map[u.Key]blocks.Block), + existsInWantlist: make(map[u.Key]struct{}), + wantlist: make([]u.Key, 0), + } } func newMessageFromProto(pbm pb.Message) BitSwapMessage { m := New() for _, s := range pbm.GetWantlist() { - m.AppendWanted(u.Key(s)) + m.AddWanted(u.Key(s)) } for _, d := range pbm.GetBlocks() { b := blocks.NewBlock(d) - m.AppendBlock(*b) + m.AddBlock(*b) } return m } -// TODO(brian): convert these into keys -func (m *message) Wantlist() []u.Key { +func (m *impl) Wantlist() []u.Key { return m.wantlist } -// TODO(brian): convert these into blocks -func (m *message) Blocks() []blocks.Block { - return m.blocks +func (m *impl) Blocks() []blocks.Block { + bs := make([]blocks.Block, 0) + for _, block := range m.blocks { + bs = append(bs, block) + } + return bs } -func (m *message) AppendWanted(k u.Key) { +func (m *impl) AddWanted(k u.Key) { + _, exists := m.existsInWantlist[k] + if exists { + return + } + m.existsInWantlist[k] = struct{}{} m.wantlist = append(m.wantlist, k) } -func (m *message) AppendBlock(b blocks.Block) { - m.blocks = append(m.blocks, b) +func (m *impl) AddBlock(b blocks.Block) { + m.blocks[b.Key()] = b } func FromNet(nmsg netmsg.NetMessage) (BitSwapMessage, error) { @@ -75,7 +101,7 @@ func FromNet(nmsg netmsg.NetMessage) (BitSwapMessage, error) { return m, nil } -func (m *message) ToProto() *pb.Message { +func (m *impl) ToProto() *pb.Message { pb := new(pb.Message) for _, k := range m.Wantlist() { pb.Wantlist = append(pb.Wantlist, string(k)) @@ -86,6 +112,6 @@ func (m *message) ToProto() *pb.Message { return pb } -func (m *message) ToNet(p peer.Peer) (nm.NetMessage, error) { +func (m *impl) ToNet(p peer.Peer) (nm.NetMessage, error) { return nm.FromObject(p, m.ToProto()) } diff --git a/exchange/bitswap/message/message_test.go b/exchange/bitswap/message/message_test.go index 932c14e9bdb..9c69136cd7c 100644 --- a/exchange/bitswap/message/message_test.go +++ b/exchange/bitswap/message/message_test.go @@ -13,7 +13,7 @@ import ( func TestAppendWanted(t *testing.T) { const str = "foo" m := New() - m.AppendWanted(u.Key(str)) + m.AddWanted(u.Key(str)) if !contains(m.ToProto().GetWantlist(), str) { t.Fail() @@ -42,7 +42,7 @@ func TestAppendBlock(t *testing.T) { m := New() for _, str := range strs { block := blocks.NewBlock([]byte(str)) - m.AppendBlock(*block) + m.AddBlock(*block) } // assert strings are in proto message @@ -58,7 +58,7 @@ func TestWantlist(t *testing.T) { keystrs := []string{"foo", "bar", "baz", "bat"} m := New() for _, s := range keystrs { - m.AppendWanted(u.Key(s)) + m.AddWanted(u.Key(s)) } exported := m.Wantlist() @@ -81,7 +81,7 @@ func TestCopyProtoByValue(t *testing.T) { const str = "foo" m := New() protoBeforeAppend := m.ToProto() - m.AppendWanted(u.Key(str)) + m.AddWanted(u.Key(str)) if contains(protoBeforeAppend.GetWantlist(), str) { t.Fail() } @@ -101,11 +101,11 @@ func TestToNetMethodSetsPeer(t *testing.T) { func TestToNetFromNetPreservesWantList(t *testing.T) { original := New() - original.AppendWanted(u.Key("M")) - original.AppendWanted(u.Key("B")) - original.AppendWanted(u.Key("D")) - original.AppendWanted(u.Key("T")) - original.AppendWanted(u.Key("F")) + original.AddWanted(u.Key("M")) + original.AddWanted(u.Key("B")) + original.AddWanted(u.Key("D")) + original.AddWanted(u.Key("T")) + original.AddWanted(u.Key("F")) p := peer.WithIDString("X") netmsg, err := original.ToNet(p) @@ -133,10 +133,10 @@ func TestToNetFromNetPreservesWantList(t *testing.T) { func TestToAndFromNetMessage(t *testing.T) { original := New() - original.AppendBlock(*blocks.NewBlock([]byte("W"))) - original.AppendBlock(*blocks.NewBlock([]byte("E"))) - original.AppendBlock(*blocks.NewBlock([]byte("F"))) - original.AppendBlock(*blocks.NewBlock([]byte("M"))) + original.AddBlock(*blocks.NewBlock([]byte("W"))) + original.AddBlock(*blocks.NewBlock([]byte("E"))) + original.AddBlock(*blocks.NewBlock([]byte("F"))) + original.AddBlock(*blocks.NewBlock([]byte("M"))) p := peer.WithIDString("X") netmsg, err := original.ToNet(p) @@ -169,3 +169,20 @@ func contains(s []string, x string) bool { } return false } + +func TestDuplicates(t *testing.T) { + b := blocks.NewBlock([]byte("foo")) + msg := New() + + msg.AddWanted(b.Key()) + msg.AddWanted(b.Key()) + if len(msg.Wantlist()) != 1 { + t.Fatal("Duplicate in BitSwapMessage") + } + + msg.AddBlock(*b) + msg.AddBlock(*b) + if len(msg.Blocks()) != 1 { + t.Fatal("Duplicate in BitSwapMessage") + } +} diff --git a/exchange/bitswap/strategy/strategy_test.go b/exchange/bitswap/strategy/strategy_test.go index e3ffc05ea81..ef93d98275e 100644 --- a/exchange/bitswap/strategy/strategy_test.go +++ b/exchange/bitswap/strategy/strategy_test.go @@ -30,7 +30,7 @@ func TestConsistentAccounting(t *testing.T) { m := message.New() content := []string{"this", "is", "message", "i"} - m.AppendBlock(*blocks.NewBlock([]byte(strings.Join(content, " ")))) + m.AddBlock(*blocks.NewBlock([]byte(strings.Join(content, " ")))) sender.MessageSent(receiver.Peer, m) receiver.MessageReceived(sender.Peer, m) @@ -60,7 +60,7 @@ func TestBlockRecordedAsWantedAfterMessageReceived(t *testing.T) { block := blocks.NewBlock([]byte("data wanted by beggar")) messageFromBeggarToChooser := message.New() - messageFromBeggarToChooser.AppendWanted(block.Key()) + messageFromBeggarToChooser.AddWanted(block.Key()) chooser.MessageReceived(beggar.Peer, messageFromBeggarToChooser) // for this test, doesn't matter if you record that beggar sent diff --git a/exchange/bitswap/testnet/network_test.go b/exchange/bitswap/testnet/network_test.go index c2cc28f8d02..3930c2a8cb4 100644 --- a/exchange/bitswap/testnet/network_test.go +++ b/exchange/bitswap/testnet/network_test.go @@ -33,7 +33,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) { // TODO test contents of incoming message m := bsmsg.New() - m.AppendBlock(*blocks.NewBlock([]byte(expectedStr))) + m.AddBlock(*blocks.NewBlock([]byte(expectedStr))) return from, m })) @@ -41,7 +41,7 @@ func TestSendRequestToCooperativePeer(t *testing.T) { t.Log("Build a message and send a synchronous request to recipient") message := bsmsg.New() - message.AppendBlock(*blocks.NewBlock([]byte("data"))) + message.AddBlock(*blocks.NewBlock([]byte("data"))) response, err := initiator.SendRequest( context.Background(), peer.WithID(idOfRecipient), message) if err != nil { @@ -77,7 +77,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { peer.Peer, bsmsg.BitSwapMessage) { msgToWaiter := bsmsg.New() - msgToWaiter.AppendBlock(*blocks.NewBlock([]byte(expectedStr))) + msgToWaiter.AddBlock(*blocks.NewBlock([]byte(expectedStr))) return fromWaiter, msgToWaiter })) @@ -105,7 +105,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { })) messageSentAsync := bsmsg.New() - messageSentAsync.AppendBlock(*blocks.NewBlock([]byte("data"))) + messageSentAsync.AddBlock(*blocks.NewBlock([]byte("data"))) errSending := waiter.SendMessage( context.Background(), peer.WithID(idOfResponder), messageSentAsync) if errSending != nil {