Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
Merge pull request #79 from ipfs/feat/fewer-allocations
Browse files Browse the repository at this point in the history
Avoid allocating for wantlist entries
  • Loading branch information
Stebalien authored Feb 21, 2019
2 parents 8b713c4 + 5257505 commit 39b96ac
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 50 deletions.
2 changes: 1 addition & 1 deletion decision/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ func BenchmarkTaskQueuePush(b *testing.B) {
for i := 0; i < b.N; i++ {
c := cid.NewCidV0(u.Hash([]byte(fmt.Sprint(i))))

q.Push(peers[i%len(peers)], &wantlist.Entry{Cid: c, Priority: math.MaxInt32})
q.Push(peers[i%len(peers)], wantlist.Entry{Cid: c, Priority: math.MaxInt32})
}
}
6 changes: 3 additions & 3 deletions decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
return e
}

func (e *Engine) WantlistForPeer(p peer.ID) (out []*wl.Entry) {
func (e *Engine) WantlistForPeer(p peer.ID) (out []wl.Entry) {
partner := e.findOrCreate(p)
partner.lk.Lock()
defer partner.lk.Unlock()
Expand Down Expand Up @@ -241,7 +241,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
}

var msgSize int
var activeEntries []*wl.Entry
var activeEntries []wl.Entry
for _, entry := range m.Wantlist() {
if entry.Cancel {
log.Debugf("%s cancel %s", p, entry.Cid)
Expand All @@ -261,7 +261,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
newWorkExists = true
if msgSize+blockSize > maxMessageSize {
e.peerRequestQueue.Push(p, activeEntries...)
activeEntries = []*wl.Entry{}
activeEntries = []wl.Entry{}
msgSize = 0
}
activeEntries = append(activeEntries, entry.Entry)
Expand Down
2 changes: 1 addition & 1 deletion decision/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (l *ledger) CancelWant(k cid.Cid) {
l.wantList.Remove(k)
}

func (l *ledger) WantListContains(k cid.Cid) (*wl.Entry, bool) {
func (l *ledger) WantListContains(k cid.Cid) (wl.Entry, bool) {
return l.wantList.Contains(k)
}

Expand Down
24 changes: 12 additions & 12 deletions decision/peer_request_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
type peerRequestQueue interface {
// Pop returns the next peerRequestTask. Returns nil if the peerRequestQueue is empty.
Pop() *peerRequestTask
Push(to peer.ID, entries ...*wantlist.Entry)
Push(to peer.ID, entries ...wantlist.Entry)
Remove(k cid.Cid, p peer.ID)

// NB: cannot expose simply expose taskQueue.Len because trashed elements
Expand Down Expand Up @@ -46,7 +46,7 @@ type prq struct {
}

// Push currently adds a new peerRequestTask to the end of the list.
func (tl *prq) Push(to peer.ID, entries ...*wantlist.Entry) {
func (tl *prq) Push(to peer.ID, entries ...wantlist.Entry) {
tl.lock.Lock()
defer tl.lock.Unlock()
partner, ok := tl.partners[to]
Expand All @@ -60,7 +60,7 @@ func (tl *prq) Push(to peer.ID, entries ...*wantlist.Entry) {
defer partner.activelk.Unlock()

var priority int
newEntries := make([]*peerRequestTaskEntry, 0, len(entries))
newEntries := make([]peerRequestTaskEntry, 0, len(entries))
for _, entry := range entries {
if partner.activeBlocks.Has(entry.Cid) {
continue
Expand All @@ -75,7 +75,7 @@ func (tl *prq) Push(to peer.ID, entries ...*wantlist.Entry) {
if entry.Priority > priority {
priority = entry.Priority
}
newEntries = append(newEntries, &peerRequestTaskEntry{entry, false})
newEntries = append(newEntries, peerRequestTaskEntry{entry, false})
}

if len(newEntries) == 0 {
Expand All @@ -86,7 +86,7 @@ func (tl *prq) Push(to peer.ID, entries ...*wantlist.Entry) {
Entries: newEntries,
Target: to,
created: time.Now(),
Done: func(e []*peerRequestTaskEntry) {
Done: func(e []peerRequestTaskEntry) {
tl.lock.Lock()
for _, entry := range e {
partner.TaskDone(entry.Cid)
Expand Down Expand Up @@ -117,7 +117,7 @@ func (tl *prq) Pop() *peerRequestTask {
for partner.taskQueue.Len() > 0 && partner.freezeVal == 0 {
out = partner.taskQueue.Pop().(*peerRequestTask)

newEntries := make([]*peerRequestTaskEntry, 0, len(out.Entries))
newEntries := make([]peerRequestTaskEntry, 0, len(out.Entries))
for _, entry := range out.Entries {
delete(tl.taskMap, taskEntryKey{out.Target, entry.Cid})
if entry.trash {
Expand Down Expand Up @@ -145,12 +145,12 @@ func (tl *prq) Remove(k cid.Cid, p peer.ID) {
tl.lock.Lock()
t, ok := tl.taskMap[taskEntryKey{p, k}]
if ok {
for _, entry := range t.Entries {
if entry.Cid.Equals(k) {
for i := range t.Entries {
if t.Entries[i].Cid.Equals(k) {
// remove the task "lazily"
// simply mark it as trash, so it'll be dropped when popped off the
// queue.
entry.trash = true
t.Entries[i].trash = true
break
}
}
Expand Down Expand Up @@ -198,17 +198,17 @@ func (tl *prq) thawRound() {
}

type peerRequestTaskEntry struct {
*wantlist.Entry
wantlist.Entry
// trash in a book-keeping field
trash bool
}
type peerRequestTask struct {
Entries []*peerRequestTaskEntry
Entries []peerRequestTaskEntry
Priority int
Target peer.ID

// A callback to signal that this task has been completed
Done func([]*peerRequestTaskEntry)
Done func([]peerRequestTaskEntry)

// created marks the time that the task was added to the queue
created time.Time
Expand Down
10 changes: 5 additions & 5 deletions decision/peer_request_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestPushPop(t *testing.T) {
t.Log(partner.String())

c := cid.NewCidV0(u.Hash([]byte(letter)))
prq.Push(partner, &wantlist.Entry{Cid: c, Priority: math.MaxInt32 - index})
prq.Push(partner, wantlist.Entry{Cid: c, Priority: math.MaxInt32 - index})
}
for _, consonant := range consonants {
c := cid.NewCidV0(u.Hash([]byte(consonant)))
Expand Down Expand Up @@ -87,10 +87,10 @@ func TestPeerRepeats(t *testing.T) {

for i := 0; i < 5; i++ {
elcid := cid.NewCidV0(u.Hash([]byte(fmt.Sprint(i))))
prq.Push(a, &wantlist.Entry{Cid: elcid})
prq.Push(b, &wantlist.Entry{Cid: elcid})
prq.Push(c, &wantlist.Entry{Cid: elcid})
prq.Push(d, &wantlist.Entry{Cid: elcid})
prq.Push(a, wantlist.Entry{Cid: elcid})
prq.Push(b, wantlist.Entry{Cid: elcid})
prq.Push(c, wantlist.Entry{Cid: elcid})
prq.Push(d, wantlist.Entry{Cid: elcid})
}

// now, pop off four entries, there should be one from each
Expand Down
4 changes: 2 additions & 2 deletions message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func newMsg(full bool) *impl {
}

type Entry struct {
*wantlist.Entry
wantlist.Entry
Cancel bool
}

Expand Down Expand Up @@ -150,7 +150,7 @@ func (m *impl) addEntry(c cid.Cid, priority int, cancel bool) {
e.Cancel = cancel
} else {
m.wantlist[c] = &Entry{
Entry: &wantlist.Entry{
Entry: wantlist.Entry{
Cid: c,
Priority: priority,
},
Expand Down
38 changes: 19 additions & 19 deletions wantlist/wantlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type SessionTrackedWantlist struct {
}

type Wantlist struct {
set map[cid.Cid]*Entry
set map[cid.Cid]Entry
}

type Entry struct {
Expand All @@ -22,19 +22,19 @@ type Entry struct {
}

type sessionTrackedEntry struct {
*Entry
Entry
sesTrk map[uint64]struct{}
}

// NewRefEntry creates a new reference tracked wantlist entry.
func NewRefEntry(c cid.Cid, p int) *Entry {
return &Entry{
func NewRefEntry(c cid.Cid, p int) Entry {
return Entry{
Cid: c,
Priority: p,
}
}

type entrySlice []*Entry
type entrySlice []Entry

func (es entrySlice) Len() int { return len(es) }
func (es entrySlice) Swap(i, j int) { es[i], es[j] = es[j], es[i] }
Expand All @@ -48,7 +48,7 @@ func NewSessionTrackedWantlist() *SessionTrackedWantlist {

func New() *Wantlist {
return &Wantlist{
set: make(map[cid.Cid]*Entry),
set: make(map[cid.Cid]Entry),
}
}

Expand All @@ -68,15 +68,15 @@ func (w *SessionTrackedWantlist) Add(c cid.Cid, priority int, ses uint64) bool {
}

w.set[c] = &sessionTrackedEntry{
Entry: &Entry{Cid: c, Priority: priority},
Entry: Entry{Cid: c, Priority: priority},
sesTrk: map[uint64]struct{}{ses: struct{}{}},
}

return true
}

// AddEntry adds given Entry to the wantlist. For more information see Add method.
func (w *SessionTrackedWantlist) AddEntry(e *Entry, ses uint64) bool {
func (w *SessionTrackedWantlist) AddEntry(e Entry, ses uint64) bool {
if ex, ok := w.set[e.Cid]; ok {
ex.sesTrk[ses] = struct{}{}
return false
Expand Down Expand Up @@ -108,23 +108,23 @@ func (w *SessionTrackedWantlist) Remove(c cid.Cid, ses uint64) bool {

// Contains returns true if the given cid is in the wantlist tracked by one or
// more sessions.
func (w *SessionTrackedWantlist) Contains(k cid.Cid) (*Entry, bool) {
func (w *SessionTrackedWantlist) Contains(k cid.Cid) (Entry, bool) {
e, ok := w.set[k]
if !ok {
return nil, false
return Entry{}, false
}
return e.Entry, true
}

func (w *SessionTrackedWantlist) Entries() []*Entry {
es := make([]*Entry, 0, len(w.set))
func (w *SessionTrackedWantlist) Entries() []Entry {
es := make([]Entry, 0, len(w.set))
for _, e := range w.set {
es = append(es, e.Entry)
}
return es
}

func (w *SessionTrackedWantlist) SortedEntries() []*Entry {
func (w *SessionTrackedWantlist) SortedEntries() []Entry {
es := w.Entries()
sort.Sort(entrySlice(es))
return es
Expand All @@ -151,15 +151,15 @@ func (w *Wantlist) Add(c cid.Cid, priority int) bool {
return false
}

w.set[c] = &Entry{
w.set[c] = Entry{
Cid: c,
Priority: priority,
}

return true
}

func (w *Wantlist) AddEntry(e *Entry) bool {
func (w *Wantlist) AddEntry(e Entry) bool {
if _, ok := w.set[e.Cid]; ok {
return false
}
Expand All @@ -177,20 +177,20 @@ func (w *Wantlist) Remove(c cid.Cid) bool {
return true
}

func (w *Wantlist) Contains(c cid.Cid) (*Entry, bool) {
func (w *Wantlist) Contains(c cid.Cid) (Entry, bool) {
e, ok := w.set[c]
return e, ok
}

func (w *Wantlist) Entries() []*Entry {
es := make([]*Entry, 0, len(w.set))
func (w *Wantlist) Entries() []Entry {
es := make([]Entry, 0, len(w.set))
for _, e := range w.set {
es = append(es, e)
}
return es
}

func (w *Wantlist) SortedEntries() []*Entry {
func (w *Wantlist) SortedEntries() []Entry {
es := w.Entries()
sort.Sort(entrySlice(es))
return es
Expand Down
2 changes: 1 addition & 1 deletion wantlist/wantlist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func init() {
}

type wli interface {
Contains(cid.Cid) (*Entry, bool)
Contains(cid.Cid) (Entry, bool)
}

func assertHasCid(t *testing.T, w wli, c cid.Cid) {
Expand Down
12 changes: 6 additions & 6 deletions wantmanager/wantmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ func (wm *WantManager) IsWanted(c cid.Cid) bool {
}

// CurrentWants returns the list of current wants.
func (wm *WantManager) CurrentWants() []*wantlist.Entry {
resp := make(chan []*wantlist.Entry, 1)
func (wm *WantManager) CurrentWants() []wantlist.Entry {
resp := make(chan []wantlist.Entry, 1)
select {
case wm.wantMessages <- &currentWantsMessage{resp}:
case <-wm.ctx.Done():
Expand All @@ -116,8 +116,8 @@ func (wm *WantManager) CurrentWants() []*wantlist.Entry {
}

// CurrentBroadcastWants returns the current list of wants that are broadcasts.
func (wm *WantManager) CurrentBroadcastWants() []*wantlist.Entry {
resp := make(chan []*wantlist.Entry, 1)
func (wm *WantManager) CurrentBroadcastWants() []wantlist.Entry {
resp := make(chan []wantlist.Entry, 1)
select {
case wm.wantMessages <- &currentBroadcastWantsMessage{resp}:
case <-wm.ctx.Done():
Expand Down Expand Up @@ -246,15 +246,15 @@ func (iwm *isWantedMessage) handle(wm *WantManager) {
}

type currentWantsMessage struct {
resp chan<- []*wantlist.Entry
resp chan<- []wantlist.Entry
}

func (cwm *currentWantsMessage) handle(wm *WantManager) {
cwm.resp <- wm.wl.Entries()
}

type currentBroadcastWantsMessage struct {
resp chan<- []*wantlist.Entry
resp chan<- []wantlist.Entry
}

func (cbcwm *currentBroadcastWantsMessage) handle(wm *WantManager) {
Expand Down

0 comments on commit 39b96ac

Please sign in to comment.