Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

reduce allocations #12

Merged
merged 3 commits into from
Oct 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (e *Engine) Peers() []peer.ID {
// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
if len(m.Wantlist()) == 0 && len(m.Blocks()) == 0 {
if m.Empty() {
log.Debugf("received empty message from %s", p)
}

Expand Down Expand Up @@ -257,9 +257,9 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
}
log.Error(err)
} else {
// we have the block
// we have the block
newWorkExists = true
if msgSize + blockSize > maxMessageSize {
if msgSize+blockSize > maxMessageSize {
e.peerRequestQueue.Push(p, activeEntries...)
activeEntries = []*wl.Entry{}
msgSize = 0
Expand Down
19 changes: 10 additions & 9 deletions decision/peer_request_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type peerRequestQueue interface {

func newPRQ() *prq {
return &prq{
taskMap: make(map[string]*peerRequestTask),
taskMap: make(map[taskEntryKey]*peerRequestTask),
partners: make(map[peer.ID]*activePartner),
frozen: make(map[peer.ID]*activePartner),
pQueue: pq.New(partnerCompare),
Expand All @@ -39,7 +39,7 @@ var _ peerRequestQueue = &prq{}
type prq struct {
lock sync.Mutex
pQueue pq.PQ
taskMap map[string]*peerRequestTask
taskMap map[taskEntryKey]*peerRequestTask
partners map[peer.ID]*activePartner

frozen map[peer.ID]*activePartner
Expand All @@ -65,7 +65,7 @@ func (tl *prq) Push(to peer.ID, entries ...*wantlist.Entry) {
if partner.activeBlocks.Has(entry.Cid) {
continue
}
if task, ok := tl.taskMap[taskEntryKey(to, entry.Cid)]; ok {
if task, ok := tl.taskMap[taskEntryKey{to, entry.Cid}]; ok {
if entry.Priority > task.Priority {
task.Priority = entry.Priority
partner.taskQueue.Update(task.index)
Expand Down Expand Up @@ -98,7 +98,7 @@ func (tl *prq) Push(to peer.ID, entries ...*wantlist.Entry) {
task.Priority = priority
partner.taskQueue.Push(task)
for _, entry := range newEntries {
tl.taskMap[taskEntryKey(to, entry.Cid)] = task
tl.taskMap[taskEntryKey{to, entry.Cid}] = task
}
partner.requests += len(newEntries)
tl.pQueue.Update(partner.Index())
Expand All @@ -119,7 +119,7 @@ func (tl *prq) Pop() *peerRequestTask {

newEntries := make([]*wantlist.Entry, 0, len(out.Entries))
for _, entry := range out.Entries {
delete(tl.taskMap, taskEntryKey(out.Target, entry.Cid))
delete(tl.taskMap, taskEntryKey{out.Target, entry.Cid})
if entry.Trash {
continue
}
Expand All @@ -143,7 +143,7 @@ func (tl *prq) Pop() *peerRequestTask {
// Remove removes a task from the queue
func (tl *prq) Remove(k cid.Cid, p peer.ID) {
tl.lock.Lock()
t, ok := tl.taskMap[taskEntryKey(p, k)]
t, ok := tl.taskMap[taskEntryKey{p, k}]
if ok {
for _, entry := range t.Entries {
if entry.Cid.Equals(k) {
Expand Down Expand Up @@ -220,9 +220,10 @@ func (t *peerRequestTask) SetIndex(i int) {
t.index = i
}

// taskEntryKey returns a key that uniquely identifies a task.
func taskEntryKey(p peer.ID, k cid.Cid) string {
return string(p) + k.KeyString()
// taskEntryKey is a key identifying a task.
type taskEntryKey struct {
p peer.ID
k cid.Cid
}

// FIFO is a basic task comparator that returns tasks in the order created.
Expand Down
44 changes: 20 additions & 24 deletions message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ type Exportable interface {

type impl struct {
full bool
wantlist map[string]*Entry
blocks map[string]blocks.Block
wantlist map[cid.Cid]*Entry
blocks map[cid.Cid]blocks.Block
}

func New(full bool) BitSwapMessage {
Expand All @@ -59,8 +59,8 @@ func New(full bool) BitSwapMessage {

func newMsg(full bool) *impl {
return &impl{
blocks: make(map[string]blocks.Block),
wantlist: make(map[string]*Entry),
blocks: make(map[cid.Cid]blocks.Block),
wantlist: make(map[cid.Cid]*Entry),
full: full,
}
}
Expand All @@ -71,17 +71,17 @@ type Entry struct {
}

func newMessageFromProto(pbm pb.Message) (BitSwapMessage, error) {
m := newMsg(pbm.GetWantlist().GetFull())
for _, e := range pbm.GetWantlist().GetEntries() {
c, err := cid.Cast([]byte(e.GetBlock()))
m := newMsg(pbm.Wantlist.Full)
for _, e := range pbm.Wantlist.Entries {
c, err := cid.Cast([]byte(e.Block))
if err != nil {
return nil, fmt.Errorf("incorrectly formatted cid in wantlist: %s", err)
}
m.addEntry(c, int(e.GetPriority()), e.GetCancel())
m.addEntry(c, int(e.Priority), e.Cancel)
}

// deprecated
for _, d := range pbm.GetBlocks() {
for _, d := range pbm.Blocks {
// CIDv0, sha256, protobuf only
b := blocks.NewBlock(d)
m.AddBlock(b)
Expand Down Expand Up @@ -135,7 +135,7 @@ func (m *impl) Blocks() []blocks.Block {
}

func (m *impl) Cancel(k cid.Cid) {
delete(m.wantlist, k.KeyString())
delete(m.wantlist, k)
m.addEntry(k, 0, true)
}

Expand All @@ -144,13 +144,12 @@ func (m *impl) AddEntry(k cid.Cid, priority int) {
}

func (m *impl) addEntry(c cid.Cid, priority int, cancel bool) {
k := c.KeyString()
e, exists := m.wantlist[k]
e, exists := m.wantlist[c]
if exists {
e.Priority = priority
e.Cancel = cancel
} else {
m.wantlist[k] = &Entry{
m.wantlist[c] = &Entry{
Entry: &wantlist.Entry{
Cid: c,
Priority: priority,
Expand All @@ -161,7 +160,7 @@ func (m *impl) addEntry(c cid.Cid, priority int, cancel bool) {
}

func (m *impl) AddBlock(b blocks.Block) {
m.blocks[b.Cid().KeyString()] = b
m.blocks[b.Cid()] = b
}

func FromNet(r io.Reader) (BitSwapMessage, error) {
Expand All @@ -180,10 +179,9 @@ func FromPBReader(pbr ggio.Reader) (BitSwapMessage, error) {

func (m *impl) ToProtoV0() *pb.Message {
pbm := new(pb.Message)
pbm.Wantlist = new(pb.Message_Wantlist)
pbm.Wantlist.Entries = make([]*pb.Message_Wantlist_Entry, 0, len(m.wantlist))
pbm.Wantlist.Entries = make([]pb.Message_Wantlist_Entry, 0, len(m.wantlist))
for _, e := range m.wantlist {
pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, &pb.Message_Wantlist_Entry{
pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, pb.Message_Wantlist_Entry{
Block: e.Cid.Bytes(),
Priority: int32(e.Priority),
Cancel: e.Cancel,
Expand All @@ -201,10 +199,9 @@ func (m *impl) ToProtoV0() *pb.Message {

func (m *impl) ToProtoV1() *pb.Message {
pbm := new(pb.Message)
pbm.Wantlist = new(pb.Message_Wantlist)
pbm.Wantlist.Entries = make([]*pb.Message_Wantlist_Entry, 0, len(m.wantlist))
pbm.Wantlist.Entries = make([]pb.Message_Wantlist_Entry, 0, len(m.wantlist))
for _, e := range m.wantlist {
pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, &pb.Message_Wantlist_Entry{
pbm.Wantlist.Entries = append(pbm.Wantlist.Entries, pb.Message_Wantlist_Entry{
Block: e.Cid.Bytes(),
Priority: int32(e.Priority),
Cancel: e.Cancel,
Expand All @@ -213,13 +210,12 @@ func (m *impl) ToProtoV1() *pb.Message {
pbm.Wantlist.Full = m.full

blocks := m.Blocks()
pbm.Payload = make([]*pb.Message_Block, 0, len(blocks))
pbm.Payload = make([]pb.Message_Block, 0, len(blocks))
for _, b := range blocks {
blk := &pb.Message_Block{
pbm.Payload = append(pbm.Payload, pb.Message_Block{
Data: b.RawData(),
Prefix: b.Cid().Prefix().Bytes(),
}
pbm.Payload = append(pbm.Payload, blk)
})
}
return pbm
}
Expand Down
23 changes: 11 additions & 12 deletions message/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,26 @@ func TestAppendWanted(t *testing.T) {
m := New(true)
m.AddEntry(str, 1)

if !wantlistContains(m.ToProtoV0().GetWantlist(), str) {
if !wantlistContains(&m.ToProtoV0().Wantlist, str) {
t.Fail()
}
}

func TestNewMessageFromProto(t *testing.T) {
str := mkFakeCid("a_key")
protoMessage := new(pb.Message)
protoMessage.Wantlist = new(pb.Message_Wantlist)
protoMessage.Wantlist.Entries = []*pb.Message_Wantlist_Entry{
protoMessage.Wantlist.Entries = []pb.Message_Wantlist_Entry{
{Block: str.Bytes()},
}
if !wantlistContains(protoMessage.Wantlist, str) {
if !wantlistContains(&protoMessage.Wantlist, str) {
t.Fail()
}
m, err := newMessageFromProto(*protoMessage)
if err != nil {
t.Fatal(err)
}

if !wantlistContains(m.ToProtoV0().GetWantlist(), str) {
if !wantlistContains(&m.ToProtoV0().Wantlist, str) {
t.Fail()
}
}
Expand Down Expand Up @@ -94,7 +93,7 @@ func TestCopyProtoByValue(t *testing.T) {
m := New(true)
protoBeforeAppend := m.ToProtoV0()
m.AddEntry(str, 1)
if wantlistContains(protoBeforeAppend.GetWantlist(), str) {
if wantlistContains(&protoBeforeAppend.Wantlist, str) {
t.Fail()
}
}
Expand All @@ -121,13 +120,13 @@ func TestToNetFromNetPreservesWantList(t *testing.T) {
t.Fatal("fullness attribute got dropped on marshal")
}

keys := make(map[string]bool)
keys := make(map[cid.Cid]bool)
for _, k := range copied.Wantlist() {
keys[k.Cid.KeyString()] = true
keys[k.Cid] = true
}

for _, k := range original.Wantlist() {
if _, ok := keys[k.Cid.KeyString()]; !ok {
if _, ok := keys[k.Cid]; !ok {
t.Fatalf("Key Missing: \"%v\"", k)
}
}
Expand All @@ -151,13 +150,13 @@ func TestToAndFromNetMessage(t *testing.T) {
t.Fatal(err)
}

keys := make(map[string]bool)
keys := make(map[cid.Cid]bool)
for _, b := range m2.Blocks() {
keys[b.Cid().KeyString()] = true
keys[b.Cid()] = true
}

for _, b := range original.Blocks() {
if _, ok := keys[b.Cid().KeyString()]; !ok {
if _, ok := keys[b.Cid()]; !ok {
t.Fail()
}
}
Expand Down
2 changes: 1 addition & 1 deletion message/pb/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ GO = $(PB:.proto=.pb.go)
all: $(GO)

%.pb.go: %.proto
protoc --proto_path=$(GOPATH)/src:. --gogofast_out=. $<
protoc --proto_path=$(GOPATH)/src:. --gogofaster_out=. $<

clean:
rm -f *.pb.go
Expand Down
Loading