Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

feat: debounce wants manually #255

Merged
merged 2 commits into from
Feb 13, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
module github.com/ipfs/go-bitswap

require (
github.com/bep/debounce v1.2.0
github.com/cskr/pubsub v1.0.2
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.3.2 // indirect
Expand Down
6 changes: 0 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/bep/debounce v1.2.0 h1:wXds8Kq8qRfwAOpAxHrJDbCXgC5aHSzgQb/0gKsHQqo=
github.com/bep/debounce v1.2.0/go.mod h1:H8yggRPQKLUhUoqrJC1bO2xNya7vanpDl7xR3ISbCJ0=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32 h1:qkOC5Gd33k54tobS36cXdAzJbeHaduLtnLQQwNoIi78=
github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8=
github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c h1:aEbSeNALREWXk0G7UdNhR3ayBV7tZ4M2PNmnrCAph6Q=
Expand Down Expand Up @@ -94,8 +92,6 @@ github.com/ipfs/go-cid v0.0.2 h1:tuuKaZPU1M6HcejsO3AcYWW8sZ8MTvyxfc4uqB4eFE8=
github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-cid v0.0.3 h1:UIAh32wymBpStoe83YCzwVQQ5Oy/H0FdxvUS6DJDzms=
github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-cid v0.0.4 h1:UlfXKrZx1DjZoBhQHmNHLC1fK1dUJDN20Y28A7s+gJ8=
github.com/ipfs/go-cid v0.0.4/go.mod h1:4LLaPOQwmk5z9LBgQnpkivrx8BJjUyGwTXCd5Xfj6+M=
github.com/ipfs/go-cid v0.0.5 h1:o0Ix8e/ql7Zb5UVUJEUfjsWCIY8t48++9lR8qi6oiJU=
github.com/ipfs/go-cid v0.0.5/go.mod h1:plgt+Y5MnOey4vO4UlUazGqdbEXuFYitED67FexhXog=
github.com/ipfs/go-datastore v0.0.1 h1:AW/KZCScnBWlSb5JbnEnLKFWXL224LBEh/9KXXOrUms=
Expand Down Expand Up @@ -344,8 +340,6 @@ github.com/multiformats/go-multihash v0.0.5 h1:1wxmCvTXAifAepIMyF39vZinRw5sbqjPs
github.com/multiformats/go-multihash v0.0.5/go.mod h1:lt/HCbqlQwlPBz7lv0sQCdtfcMtlJvakRUn/0Ual8po=
github.com/multiformats/go-multihash v0.0.8 h1:wrYcW5yxSi3dU07n5jnuS5PrNwyHy0zRHGVoUugWvXg=
github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew=
github.com/multiformats/go-multihash v0.0.10 h1:lMoNbh2Ssd9PUF74Nz008KGzGPlfeV6wH3rit5IIGCM=
github.com/multiformats/go-multihash v0.0.10/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew=
github.com/multiformats/go-multihash v0.0.13 h1:06x+mk/zj1FoMsgNejLpy6QTvJqlSt/BhLEy87zidlc=
github.com/multiformats/go-multihash v0.0.13/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc=
github.com/multiformats/go-multistream v0.1.0 h1:UpO6jrsjqs46mqAK3n6wKRYFhugss9ArzbyUzU+4wkQ=
Expand Down
61 changes: 47 additions & 14 deletions internal/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"sync"
"time"

debounce "github.com/bep/debounce"

bsmsg "github.com/ipfs/go-bitswap/message"
pb "github.com/ipfs/go-bitswap/message/pb"
bsnet "github.com/ipfs/go-bitswap/network"
Expand All @@ -34,6 +32,11 @@ const (
maxPriority = math.MaxInt32
// sendMessageDebounce is the debounce duration when calling sendMessage()
sendMessageDebounce = time.Millisecond
// when we reach sendMessaageCuttoff wants/cancels, we'll send the message immediately.
sendMessageCuttoff = 100
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
// when we debounce for more than sendMessageMaxDelay, we'll send the
// message immediately.
sendMessageMaxDelay = 100 * time.Millisecond
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
)

// MessageNetwork is any network that can connect peers and generate a message
Expand All @@ -54,9 +57,8 @@ type MessageQueue struct {
maxMessageSize int
sendErrorBackoff time.Duration

signalWorkReady func()
outgoingWork chan struct{}
done chan struct{}
outgoingWork chan time.Time
done chan struct{}

// Take lock whenever any of these variables are modified
wllock sync.Mutex
Expand Down Expand Up @@ -165,17 +167,13 @@ func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork,
bcstWants: newRecallWantList(),
peerWants: newRecallWantList(),
cancels: cid.NewSet(),
outgoingWork: make(chan struct{}, 1),
outgoingWork: make(chan time.Time, 1),
done: make(chan struct{}),
rebroadcastInterval: defaultRebroadcastInterval,
sendErrorBackoff: sendErrorBackoff,
priority: maxPriority,
}

// Apply debounce to the work ready signal (which triggers sending a message)
debounced := debounce.New(sendMessageDebounce)
mq.signalWorkReady = func() { debounced(mq.onWorkReady) }

return mq
}

Expand Down Expand Up @@ -285,11 +283,42 @@ func (mq *MessageQueue) onShutdown() {
func (mq *MessageQueue) runQueue() {
defer mq.onShutdown()

// Create a timer for debouncing scheduled work.
scheduleWork := time.NewTimer(0)
if !scheduleWork.Stop() {
<-scheduleWork.C
}

var workScheduled time.Time
for {
select {
case <-mq.rebroadcastTimer.C:
mq.rebroadcastWantlist()
case <-mq.outgoingWork:
case when := <-mq.outgoingWork:
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
// If we have work scheduled, cancel the timer. If we
// don't, record when the work was scheduled.
// We send the time on the channel so we accurately
// track delay.
if workScheduled.IsZero() {
workScheduled = when
} else if !scheduleWork.Stop() {
<-scheduleWork.C
}

// If we have too many updates and/or we've waited too
// long, send immediately.
if mq.pendingWorkCount() > sendMessageCuttoff ||
time.Since(workScheduled) >= sendMessageMaxDelay {
mq.sendIfReady()
workScheduled = time.Time{}
} else {
// Otherwise, extend the timer.
scheduleWork.Reset(sendMessageDebounce)
}
case <-scheduleWork.C:
// We have work scheduled and haven't seen any updates
// in sendMessageDebounce. Send immediately.
workScheduled = time.Time{}
mq.sendIfReady()
case <-mq.done:
if mq.sender != nil {
Expand Down Expand Up @@ -335,9 +364,9 @@ func (mq *MessageQueue) transferRebroadcastWants() bool {
return true
}

func (mq *MessageQueue) onWorkReady() {
func (mq *MessageQueue) signalWorkReady() {
select {
case mq.outgoingWork <- struct{}{}:
case mq.outgoingWork <- time.Now():
default:
}
}
Expand Down Expand Up @@ -443,10 +472,14 @@ func (mq *MessageQueue) simulateDontHaveWithTimeout(msg bsmsg.BitSwapMessage) {
// }

func (mq *MessageQueue) hasPendingWork() bool {
return mq.pendingWorkCount() > 0
}

func (mq *MessageQueue) pendingWorkCount() int {
mq.wllock.Lock()
defer mq.wllock.Unlock()

return mq.bcstWants.pending.Len() > 0 || mq.peerWants.pending.Len() > 0 || mq.cancels.Len() > 0
return mq.bcstWants.pending.Len() + mq.peerWants.pending.Len() + mq.cancels.Len()
}

func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwapMessage, func()) {
Expand Down