Skip to content

Commit

Permalink
track broadcasted wantlist entries
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Jeromy <jeromyj@gmail.com>
  • Loading branch information
whyrusleeping committed May 20, 2017
1 parent 11e52e4 commit 4908e8d
Showing 1 changed file with 23 additions and 24 deletions.
47 changes: 23 additions & 24 deletions exchange/bitswap/wantmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type WantManager struct {
// synchronized by Run loop, only touch inside there
peers map[peer.ID]*msgQueue
wl *wantlist.ThreadSafe
bcwl *wantlist.ThreadSafe

network bsnet.BitSwapNetwork
ctx context.Context
Expand All @@ -47,6 +48,7 @@ func NewWantManager(ctx context.Context, network bsnet.BitSwapNetwork) *WantMana
peerReqs: make(chan chan []peer.ID),
peers: make(map[peer.ID]*msgQueue),
wl: wantlist.NewThreadSafe(),
bcwl: wantlist.NewThreadSafe(),
network: network,
ctx: ctx,
cancel: cancel,
Expand All @@ -71,7 +73,7 @@ type msgQueue struct {
outlk sync.Mutex
out bsmsg.BitSwapMessage
network bsnet.BitSwapNetwork
wl *wantlist.Wantlist
wl *wantlist.ThreadSafe

sender bsnet.MessageSender

Expand Down Expand Up @@ -144,9 +146,10 @@ func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue {

// new peer, we will want to give them our full wantlist
fullwantlist := bsmsg.New(true)
for _, e := range pm.wl.Entries() {
ne := *e
mq.wl.AddEntry(&ne)
for _, e := range pm.bcwl.Entries() {
for k := range e.SesTrk {
mq.wl.AddEntry(e, k)
}
fullwantlist.AddEntry(e.Cid, e.Priority)
}
mq.out = fullwantlist
Expand Down Expand Up @@ -294,13 +297,23 @@ func (pm *WantManager) Run() {
select {
case ws := <-pm.incoming:

// is this a broadcast or not?
brdc := len(ws.targets) == 0

// add changes to our wantlist
for _, e := range ws.entries {
if e.Cancel {
if brdc {
pm.bcwl.Remove(e.Cid, ws.from)
}

if pm.wl.Remove(e.Cid, ws.from) {
pm.wantlistGauge.Dec()
}
} else {
if brdc {
pm.bcwl.AddEntry(e.Entry, ws.from)
}
if pm.wl.AddEntry(e.Entry, ws.from) {
pm.wantlistGauge.Inc()
}
Expand All @@ -310,7 +323,7 @@ func (pm *WantManager) Run() {
// broadcast those wantlist changes
if len(ws.targets) == 0 {
for _, p := range pm.peers {
p.addMessage(ws.entries)
p.addMessage(ws.entries, ws.from)
}
} else {
for _, t := range ws.targets {
Expand All @@ -319,24 +332,10 @@ func (pm *WantManager) Run() {
log.Warning("tried sending wantlist change to non-partner peer")
continue
}
p.addMessage(ws.entries)
p.addMessage(ws.entries, ws.from)
}
}

case <-tock.C:
// resend entire wantlist every so often (REALLY SHOULDNT BE NECESSARY)
var es []*bsmsg.Entry
for _, e := range pm.wl.Entries() {
es = append(es, &bsmsg.Entry{Entry: e})
}

for _, p := range pm.peers {
p.outlk.Lock()
p.out = bsmsg.New(true)
p.outlk.Unlock()

p.addMessage(es)
}
case p := <-pm.connect:
pm.startPeerHandler(p)
case p := <-pm.disconnect:
Expand All @@ -357,14 +356,14 @@ func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue {
return &msgQueue{
done: make(chan struct{}),
work: make(chan struct{}, 1),
wl: wantlist.New(),
wl: wantlist.NewThreadSafe(),
network: wm.network,
p: p,
refcnt: 1,
}
}

func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) {
func (mq *msgQueue) addMessage(entries []*bsmsg.Entry, ses uint64) {
var work bool
mq.outlk.Lock()
defer func() {
Expand All @@ -388,12 +387,12 @@ func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) {
// one passed in
for _, e := range entries {
if e.Cancel {
if mq.wl.Remove(e.Cid) {
if mq.wl.Remove(e.Cid, ses) {
work = true
mq.out.Cancel(e.Cid)
}
} else {
if mq.wl.Add(e.Cid, e.Priority) {
if mq.wl.Add(e.Cid, e.Priority, ses) {
work = true
mq.out.AddEntry(e.Cid, e.Priority)
}
Expand Down

0 comments on commit 4908e8d

Please sign in to comment.