Skip to content

Commit

Permalink
Fix wantlist overflow handling to select newer entries.
Browse files Browse the repository at this point in the history
wantlist overflow handling now cancels existing entries to make room for newer requests. This fix prevents the wantlist from filling up with CIDs that the server does not have.

Fixes #527
  • Loading branch information
gammazero committed Jun 23, 2024
1 parent dfd4a53 commit 713faee
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 41 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ The following emojis are used to highlight certain changes:
### Changed

- `boxo/gateway` is now tested against [gateway-conformance v6](https://github.com/ipfs/gateway-conformance/releases/tag/v0.6.0)
- `bitswap/client` supports additional tracing
- `bitswap/client` supports additional tracing

### Removed

Expand All @@ -39,6 +39,7 @@ The following emojis are used to highlight certain changes:

- `routing/http`: the `FindPeer` now returns `routing.ErrNotFound` when no addresses are found
- `routing/http`: the `FindProvidersAsync` no longer causes a goroutine buildup
- bitswap wantlist overflow handling now cancels existing entries to make room for newer entries. This fix prevents the wantlist from filling up with CIDs that the server does not have.

## [v0.20.0]

Expand Down
12 changes: 8 additions & 4 deletions bitswap/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,17 +253,21 @@ func (m *impl) Empty() bool {
}

func (m *impl) Wantlist() []Entry {
out := make([]Entry, 0, len(m.wantlist))
out := make([]Entry, len(m.wantlist))
var i int
for _, e := range m.wantlist {
out = append(out, *e)
out[i] = *e
i++
}
return out
}

func (m *impl) Blocks() []blocks.Block {
bs := make([]blocks.Block, 0, len(m.blocks))
bs := make([]blocks.Block, len(m.blocks))
var i int
for _, block := range m.blocks {
bs = append(bs, block)
bs[i] = block
i++
}
return bs
}
Expand Down
106 changes: 71 additions & 35 deletions bitswap/server/internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package decision
import (
"context"
"fmt"
"math/bits"
"sync"
"time"

Expand Down Expand Up @@ -134,7 +133,7 @@ type PeerLedger interface {
// Wants informs the ledger that [peer.ID] wants [wl.Entry].
Wants(p peer.ID, e wl.Entry)

// CancelWant returns true if the [cid.Cid] is present in the wantlist of [peer.ID].
// CancelWant returns true if the [cid.Cid] was removed from the wantlist of [peer.ID].
CancelWant(p peer.ID, k cid.Cid) bool

// CancelWantWithType will not cancel WantBlock if we sent a HAVE message.
Expand Down Expand Up @@ -668,8 +667,9 @@ func (e *Engine) Peers() []peer.ID {

// MessageReceived is called when a message is received from a remote peer.
// For each item in the wantlist, add a want-have or want-block entry to the
// request queue (this is later popped off by the workerTasks)
func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) (mustKillConnection bool) {
// request queue (this is later popped off by the workerTasks). Returns true
// if the connection to the server must be closed.
func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) bool {
entries := m.Wantlist()

if len(entries) > 0 {
Expand Down Expand Up @@ -708,7 +708,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
blockSizes, err := e.bsm.getBlockSizes(ctx, wantKs.Keys())
if err != nil {
log.Info("aborting message processing", err)
return
return false

Check warning on line 711 in bitswap/server/internal/decision/engine.go

View check run for this annotation

Codecov / codecov/patch

bitswap/server/internal/decision/engine.go#L711

Added line #L711 was not covered by tests
}

e.lock.Lock()
Expand All @@ -717,39 +717,72 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
e.peerLedger.ClearPeerWantlist(p)
}

s := uint(e.peerLedger.WantlistSizeForPeer(p))
if wouldBe := s + uint(len(wants)); wouldBe > e.maxQueuedWantlistEntriesPerPeer {
log.Debugw("wantlist overflow", "local", e.self, "remote", p, "would be", wouldBe)
// truncate wantlist to avoid overflow
available, o := bits.Sub(e.maxQueuedWantlistEntriesPerPeer, s, 0)
if o != 0 {
available = 0
if len(wants) != 0 {
filteredWants := wants[:0] // shift inplace
for _, entry := range wants {
if entry.Cid.Prefix().MhType == mh.IDENTITY {
// This is a truely broken client, let's kill the connection.
e.lock.Unlock()
log.Warnw("peer wants an identity CID", "local", e.self, "remote", p)
return true
}
if e.maxCidSize != 0 && uint(entry.Cid.ByteLen()) > e.maxCidSize {
// Ignore requests about CIDs that big.
continue
}
filteredWants = append(filteredWants, entry)
if len(filteredWants) == int(e.maxQueuedWantlistEntriesPerPeer) {
// filteredWants at limit, ignore remaining wants from request.
log.Debugw("requested wants exceeds max wantlist size", "local", e.self, "remote", p, "ignoring", len(wants)-len(filteredWants))
break
}
}
wants = wants[:available]
}

filteredWants := wants[:0] // shift inplace

for _, entry := range wants {
if entry.Cid.Prefix().MhType == mh.IDENTITY {
// This is a truely broken client, let's kill the connection.
e.lock.Unlock()
log.Warnw("peer wants an identity CID", "local", e.self, "remote", p)
return true
wants = wants[len(filteredWants):]
for i := range wants {
wants[i] = bsmsg.Entry{} // early GC
}
if e.maxCidSize != 0 && uint(entry.Cid.ByteLen()) > e.maxCidSize {
// Ignore requests about CIDs that big.
continue
wants = filteredWants

// Ensure sufficient space for new wants.
s := e.peerLedger.WantlistSizeForPeer(p)
available := int(e.maxQueuedWantlistEntriesPerPeer) - s
if len(wants) > available {
needSpace := len(wants) - available
log.Debugw("wantlist overflow", "local", e.self, "remote", p, "would be", s+len(wants), "canceling", needSpace)
// Cancel any wants that are being requested again. This makes room
// for new wants and minimizes that existing wants to cancel that
// are not in the new request.
for _, entry := range wants {
if e.peerLedger.CancelWant(p, entry.Cid) {
e.peerRequestQueue.Remove(entry.Cid, p)
needSpace--
if needSpace == 0 {
break

Check warning on line 760 in bitswap/server/internal/decision/engine.go

View check run for this annotation

Codecov / codecov/patch

bitswap/server/internal/decision/engine.go#L760

Added line #L760 was not covered by tests
}
}
}
// Cancel additional wants, that are not being replaced, to make
// room for new wants.
if needSpace != 0 {
wl := e.peerLedger.WantlistForPeer(p)
for i := range wl {
entCid := wl[i].Cid
if e.peerLedger.CancelWant(p, entCid) {
e.peerRequestQueue.Remove(entCid, p)
needSpace--
if needSpace == 0 {
break
}
}
}
}
}

e.peerLedger.Wants(p, entry.Entry)
filteredWants = append(filteredWants, entry)
}
clear := wants[len(filteredWants):]
for i := range clear {
clear[i] = bsmsg.Entry{} // early GC
for _, entry := range wants {
e.peerLedger.Wants(p, entry.Entry)
}
}
wants = filteredWants

for _, entry := range cancels {
if entry.Cid.Prefix().MhType == mh.IDENTITY {
// This is a truely broken client, let's kill the connection.
Expand Down Expand Up @@ -852,6 +885,9 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap

// Split the want-have / want-block entries from the cancel entries
func (e *Engine) splitWantsCancels(es []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Entry) {
if len(es) == 0 {
return nil, nil
}
wants := make([]bsmsg.Entry, 0, len(es))
cancels := make([]bsmsg.Entry, 0, len(es))
for _, et := range es {
Expand All @@ -866,12 +902,12 @@ func (e *Engine) splitWantsCancels(es []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Ent

// Split the want-have / want-block entries from the block that will be denied access
func (e *Engine) splitWantsDenials(p peer.ID, allWants []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Entry) {
if e.peerBlockRequestFilter == nil {
if e.peerBlockRequestFilter == nil || len(allWants) == 0 {
return allWants, nil
}

wants := make([]bsmsg.Entry, 0, len(allWants))
denied := make([]bsmsg.Entry, 0, len(allWants))
var denied []bsmsg.Entry

for _, et := range allWants {
if e.peerBlockRequestFilter(p, et.Cid) {
Expand Down
58 changes: 58 additions & 0 deletions bitswap/server/internal/decision/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1733,3 +1733,61 @@ func TestKillConnectionForInlineCid(t *testing.T) {
t.Fatal("connection was not killed when receiving inline in cancel")
}
}

func TestWantlistOverflow(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

const limit = 32
warsaw := newTestEngine(ctx, "warsaw", WithMaxQueuedWantlistEntriesPerPeer(limit))
riga := newTestEngine(ctx, "riga")

m := message.New(false)
for i := 0; i < limit+(limit/2); i++ {
m.AddEntry(blocks.NewBlock([]byte(fmt.Sprint(i))).Cid(), 0, pb.Message_Wantlist_Block, true)
}
warsaw.Engine.MessageReceived(ctx, riga.Peer, m)

if warsaw.Peer == riga.Peer {
t.Fatal("Sanity Check: Peers have same Key!")
}

// Check that the wantlist is at the size limit, and limit/2 wants ignored.
wl := warsaw.Engine.WantlistForPeer(riga.Peer)
if len(wl) != limit {
t.Fatal("wantlist does not match limit", len(wl))
}

m = message.New(false)
blockCids := make([]cid.Cid, limit/2+4)
for i := 0; i < limit/2+4; i++ {
c := blocks.NewBlock([]byte(fmt.Sprint(i + limit))).Cid()
m.AddEntry(c, 0, pb.Message_Wantlist_Block, true)
blockCids[i] = c
}
warsaw.Engine.MessageReceived(ctx, riga.Peer, m)
wl = warsaw.Engine.WantlistForPeer(riga.Peer)

// Check that wantlist is still at size limit.
if len(wl) != limit {
t.Fatalf("wantlist size %d does not match limit %d", len(wl), limit)
}

// Check that all new blocks are in wantlist.
var missing int
for _, c := range blockCids {
var found bool
for i := range wl {
if wl[i].Cid == c {
found = true
break
}
}
if !found {
missing++
}
}
if missing != 0 {
t.Fatalf("Missing %d new wants expected in wantlist", missing)
}
}
3 changes: 2 additions & 1 deletion bitswap/server/internal/decision/peer_ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ func (l *DefaultPeerLedger) CancelWant(p peer.ID, k cid.Cid) bool {
if !ok {
return false
}
_, had := wants[k]
delete(wants, k)
if len(wants) == 0 {
delete(l.peers, p)
}

l.removePeerFromCid(p, k)
return true
return had
}

func (l *DefaultPeerLedger) CancelWantWithType(p peer.ID, k cid.Cid, typ pb.Message_Wantlist_WantType) {
Expand Down

0 comments on commit 713faee

Please sign in to comment.