diff --git a/go.mod b/go.mod index f05ca787..d180b371 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,6 @@ module github.com/ipfs/go-bitswap require ( - github.com/bep/debounce v1.2.0 github.com/cskr/pubsub v1.0.2 github.com/gogo/protobuf v1.3.1 github.com/golang/protobuf v1.3.2 // indirect diff --git a/go.sum b/go.sum index b7159a75..15736678 100644 --- a/go.sum +++ b/go.sum @@ -5,8 +5,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= -github.com/bep/debounce v1.2.0 h1:wXds8Kq8qRfwAOpAxHrJDbCXgC5aHSzgQb/0gKsHQqo= -github.com/bep/debounce v1.2.0/go.mod h1:H8yggRPQKLUhUoqrJC1bO2xNya7vanpDl7xR3ISbCJ0= github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32 h1:qkOC5Gd33k54tobS36cXdAzJbeHaduLtnLQQwNoIi78= github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8= github.com/btcsuite/btcd v0.0.0-20190523000118-16327141da8c h1:aEbSeNALREWXk0G7UdNhR3ayBV7tZ4M2PNmnrCAph6Q= @@ -94,8 +92,6 @@ github.com/ipfs/go-cid v0.0.2 h1:tuuKaZPU1M6HcejsO3AcYWW8sZ8MTvyxfc4uqB4eFE8= github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.3 h1:UIAh32wymBpStoe83YCzwVQQ5Oy/H0FdxvUS6DJDzms= github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= -github.com/ipfs/go-cid v0.0.4 h1:UlfXKrZx1DjZoBhQHmNHLC1fK1dUJDN20Y28A7s+gJ8= -github.com/ipfs/go-cid v0.0.4/go.mod h1:4LLaPOQwmk5z9LBgQnpkivrx8BJjUyGwTXCd5Xfj6+M= github.com/ipfs/go-cid v0.0.5 h1:o0Ix8e/ql7Zb5UVUJEUfjsWCIY8t48++9lR8qi6oiJU= github.com/ipfs/go-cid v0.0.5/go.mod h1:plgt+Y5MnOey4vO4UlUazGqdbEXuFYitED67FexhXog= github.com/ipfs/go-datastore v0.0.1 h1:AW/KZCScnBWlSb5JbnEnLKFWXL224LBEh/9KXXOrUms= @@ -344,8 +340,6 @@ github.com/multiformats/go-multihash v0.0.5 h1:1wxmCvTXAifAepIMyF39vZinRw5sbqjPs github.com/multiformats/go-multihash v0.0.5/go.mod h1:lt/HCbqlQwlPBz7lv0sQCdtfcMtlJvakRUn/0Ual8po= github.com/multiformats/go-multihash v0.0.8 h1:wrYcW5yxSi3dU07n5jnuS5PrNwyHy0zRHGVoUugWvXg= github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew= -github.com/multiformats/go-multihash v0.0.10 h1:lMoNbh2Ssd9PUF74Nz008KGzGPlfeV6wH3rit5IIGCM= -github.com/multiformats/go-multihash v0.0.10/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew= github.com/multiformats/go-multihash v0.0.13 h1:06x+mk/zj1FoMsgNejLpy6QTvJqlSt/BhLEy87zidlc= github.com/multiformats/go-multihash v0.0.13/go.mod h1:VdAWLKTwram9oKAatUcLxBNUjdtcVwxObEQBtRfuyjc= github.com/multiformats/go-multistream v0.1.0 h1:UpO6jrsjqs46mqAK3n6wKRYFhugss9ArzbyUzU+4wkQ= diff --git a/internal/messagequeue/messagequeue.go b/internal/messagequeue/messagequeue.go index 15f8100d..e60d52c3 100644 --- a/internal/messagequeue/messagequeue.go +++ b/internal/messagequeue/messagequeue.go @@ -6,8 +6,6 @@ import ( "sync" "time" - debounce "github.com/bep/debounce" - bsmsg "github.com/ipfs/go-bitswap/message" pb "github.com/ipfs/go-bitswap/message/pb" bsnet "github.com/ipfs/go-bitswap/network" @@ -34,6 +32,11 @@ const ( maxPriority = math.MaxInt32 // sendMessageDebounce is the debounce duration when calling sendMessage() sendMessageDebounce = time.Millisecond + // when we reach sendMessageCutoff wants/cancels, we'll send the message immediately. + sendMessageCutoff = 256 + // when we debounce for more than sendMessageMaxDelay, we'll send the + // message immediately. + sendMessageMaxDelay = 20 * time.Millisecond ) // MessageNetwork is any network that can connect peers and generate a message @@ -54,9 +57,8 @@ type MessageQueue struct { maxMessageSize int sendErrorBackoff time.Duration - signalWorkReady func() - outgoingWork chan struct{} - done chan struct{} + outgoingWork chan time.Time + done chan struct{} // Take lock whenever any of these variables are modified wllock sync.Mutex @@ -165,17 +167,13 @@ func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork, bcstWants: newRecallWantList(), peerWants: newRecallWantList(), cancels: cid.NewSet(), - outgoingWork: make(chan struct{}, 1), + outgoingWork: make(chan time.Time, 1), done: make(chan struct{}), rebroadcastInterval: defaultRebroadcastInterval, sendErrorBackoff: sendErrorBackoff, priority: maxPriority, } - // Apply debounce to the work ready signal (which triggers sending a message) - debounced := debounce.New(sendMessageDebounce) - mq.signalWorkReady = func() { debounced(mq.onWorkReady) } - return mq } @@ -285,11 +283,45 @@ func (mq *MessageQueue) onShutdown() { func (mq *MessageQueue) runQueue() { defer mq.onShutdown() + // Create a timer for debouncing scheduled work. + scheduleWork := time.NewTimer(0) + if !scheduleWork.Stop() { + // Need to drain the timer if Stop() returns false + // See: https://golang.org/pkg/time/#Timer.Stop + <-scheduleWork.C + } + + var workScheduled time.Time for { select { case <-mq.rebroadcastTimer.C: mq.rebroadcastWantlist() - case <-mq.outgoingWork: + case when := <-mq.outgoingWork: + // If we have work scheduled, cancel the timer. If we + // don't, record when the work was scheduled. + // We send the time on the channel so we accurately + // track delay. + if workScheduled.IsZero() { + workScheduled = when + } else if !scheduleWork.Stop() { + // Need to drain the timer if Stop() returns false + <-scheduleWork.C + } + + // If we have too many updates and/or we've waited too + // long, send immediately. + if mq.pendingWorkCount() > sendMessageCutoff || + time.Since(workScheduled) >= sendMessageMaxDelay { + mq.sendIfReady() + workScheduled = time.Time{} + } else { + // Otherwise, extend the timer. + scheduleWork.Reset(sendMessageDebounce) + } + case <-scheduleWork.C: + // We have work scheduled and haven't seen any updates + // in sendMessageDebounce. Send immediately. + workScheduled = time.Time{} mq.sendIfReady() case <-mq.done: if mq.sender != nil { @@ -335,9 +367,9 @@ func (mq *MessageQueue) transferRebroadcastWants() bool { return true } -func (mq *MessageQueue) onWorkReady() { +func (mq *MessageQueue) signalWorkReady() { select { - case mq.outgoingWork <- struct{}{}: + case mq.outgoingWork <- time.Now(): default: } } @@ -443,10 +475,14 @@ func (mq *MessageQueue) simulateDontHaveWithTimeout(msg bsmsg.BitSwapMessage) { // } func (mq *MessageQueue) hasPendingWork() bool { + return mq.pendingWorkCount() > 0 +} + +func (mq *MessageQueue) pendingWorkCount() int { mq.wllock.Lock() defer mq.wllock.Unlock() - return mq.bcstWants.pending.Len() > 0 || mq.peerWants.pending.Len() > 0 || mq.cancels.Len() > 0 + return mq.bcstWants.pending.Len() + mq.peerWants.pending.Len() + mq.cancels.Len() } func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwapMessage, func()) {