-
-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bitswap sessions #3867
Bitswap sessions #3867
Changes from 9 commits
47479b6
bda8c3a
e43d131
b680f49
b1247d3
579fd46
1ffb44c
eab2024
124afdb
3be5c91
dd7589b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -64,7 +64,11 @@ var unwantCmd = &cmds.Command{ | |
ks = append(ks, c) | ||
} | ||
|
||
bs.CancelWants(ks) | ||
// TODO: This should maybe find *all* sessions for this request and cancel them? | ||
// (why): in reality, i think this command should be removed. Its | ||
// messing with the internal state of bitswap. You should cancel wants | ||
// by killing the command that caused the want. | ||
bs.CancelWants(ks, 0) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does canceling the context not cancel the associated wants? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, cancelling the context of any request command cancels the associated wants. So this command really should just get removed, it has no real purpose |
||
}, | ||
} | ||
|
||
|
@@ -107,6 +111,11 @@ Print out all blocks currently on the bitswap wantlist for the local peer.`, | |
res.SetError(err, cmds.ErrNormal) | ||
return | ||
} | ||
if pid == nd.Identity { | ||
res.SetOutput(&KeyList{bs.GetWantlist()}) | ||
return | ||
} | ||
|
||
res.SetOutput(&KeyList{bs.WantlistForPeer(pid)}) | ||
} else { | ||
res.SetOutput(&KeyList{bs.GetWantlist()}) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ import ( | |
"errors" | ||
"math" | ||
"sync" | ||
"sync/atomic" | ||
"time" | ||
|
||
blockstore "github.com/ipfs/go-ipfs/blocks/blockstore" | ||
|
@@ -17,13 +18,12 @@ import ( | |
notifications "github.com/ipfs/go-ipfs/exchange/bitswap/notifications" | ||
flags "github.com/ipfs/go-ipfs/flags" | ||
"github.com/ipfs/go-ipfs/thirdparty/delay" | ||
blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format" | ||
|
||
metrics "gx/ipfs/QmRg1gKTHzc3CZXSKzem8aR4E3TubFhbgXwfVuWnSK5CC5/go-metrics-interface" | ||
process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess" | ||
procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context" | ||
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" | ||
loggables "gx/ipfs/QmVesPmqbPp7xRGyY96tnBwzDtVV1nqv4SCVxo5zCqKyH8/go-libp2p-loggables" | ||
blocks "gx/ipfs/QmXxGS5QsUxpR3iqL5DjmsYPHR1Yz74siRQ4ChJqWFosMh/go-block-format" | ||
cid "gx/ipfs/Qma4RJSuh7mMeJQYCqMbKzekn6EwBo7HEs5AQYjVRMQATB/go-cid" | ||
peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer" | ||
) | ||
|
@@ -99,6 +99,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, | |
newBlocks: make(chan *cid.Cid, HasBlockBufferSize), | ||
provideKeys: make(chan *cid.Cid, provideKeysBufferSize), | ||
wm: NewWantManager(ctx, network), | ||
counters: new(counters), | ||
|
||
dupMetric: dupHist, | ||
allMetric: allHist, | ||
|
@@ -152,17 +153,29 @@ type Bitswap struct { | |
process process.Process | ||
|
||
// Counters for various statistics | ||
counterLk sync.Mutex | ||
blocksRecvd int | ||
dupBlocksRecvd int | ||
dupDataRecvd uint64 | ||
blocksSent int | ||
dataSent uint64 | ||
dataRecvd uint64 | ||
counterLk sync.Mutex | ||
counters *counters | ||
|
||
// Metrics interface metrics | ||
dupMetric metrics.Histogram | ||
allMetric metrics.Histogram | ||
|
||
// Sessions | ||
sessions []*Session | ||
sessLk sync.Mutex | ||
|
||
sessID uint64 | ||
sessIDLk sync.Mutex | ||
} | ||
|
||
type counters struct { | ||
blocksRecvd uint64 | ||
dupBlocksRecvd uint64 | ||
dupDataRecvd uint64 | ||
blocksSent uint64 | ||
dataSent uint64 | ||
dataRecvd uint64 | ||
messagesRecvd uint64 | ||
} | ||
|
||
type blockRequest struct { | ||
|
@@ -173,45 +186,7 @@ type blockRequest struct { | |
// GetBlock attempts to retrieve a particular block from peers within the | ||
// deadline enforced by the context. | ||
func (bs *Bitswap) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, error) { | ||
if k == nil { | ||
log.Error("nil cid in GetBlock") | ||
return nil, blockstore.ErrNotFound | ||
} | ||
|
||
// Any async work initiated by this function must end when this function | ||
// returns. To ensure this, derive a new context. Note that it is okay to | ||
// listen on parent in this scope, but NOT okay to pass |parent| to | ||
// functions called by this one. Otherwise those functions won't return | ||
// when this context's cancel func is executed. This is difficult to | ||
// enforce. May this comment keep you safe. | ||
ctx, cancelFunc := context.WithCancel(parent) | ||
|
||
// TODO: this request ID should come in from a higher layer so we can track | ||
// across multiple 'GetBlock' invocations | ||
ctx = logging.ContextWithLoggable(ctx, loggables.Uuid("GetBlockRequest")) | ||
log.Event(ctx, "Bitswap.GetBlockRequest.Start", k) | ||
defer log.Event(ctx, "Bitswap.GetBlockRequest.End", k) | ||
defer cancelFunc() | ||
|
||
promise, err := bs.GetBlocks(ctx, []*cid.Cid{k}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
select { | ||
case block, ok := <-promise: | ||
if !ok { | ||
select { | ||
case <-ctx.Done(): | ||
return nil, ctx.Err() | ||
default: | ||
return nil, errors.New("promise channel was closed") | ||
} | ||
} | ||
return block, nil | ||
case <-parent.Done(): | ||
return nil, parent.Err() | ||
} | ||
return getBlock(parent, k, bs.GetBlocks) | ||
} | ||
|
||
func (bs *Bitswap) WantlistForPeer(p peer.ID) []*cid.Cid { | ||
|
@@ -251,7 +226,9 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block | |
log.Event(ctx, "Bitswap.GetBlockRequest.Start", k) | ||
} | ||
|
||
bs.wm.WantBlocks(ctx, keys) | ||
mses := bs.getNextSessionID() | ||
|
||
bs.wm.WantBlocks(ctx, keys, nil, mses) | ||
|
||
// NB: Optimization. Assumes that providers of key[0] are likely to | ||
// be able to provide for all keys. This currently holds true in most | ||
|
@@ -273,7 +250,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block | |
defer close(out) | ||
defer func() { | ||
// can't just defer this call on its own, arguments are resolved *when* the defer is created | ||
bs.CancelWants(remaining.Keys()) | ||
bs.CancelWants(remaining.Keys(), mses) | ||
}() | ||
for { | ||
select { | ||
|
@@ -282,6 +259,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block | |
return | ||
} | ||
|
||
bs.CancelWants([]*cid.Cid{blk.Cid()}, mses) | ||
remaining.Remove(blk.Cid()) | ||
select { | ||
case out <- blk: | ||
|
@@ -302,9 +280,19 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []*cid.Cid) (<-chan block | |
} | ||
} | ||
|
||
func (bs *Bitswap) getNextSessionID() uint64 { | ||
bs.sessIDLk.Lock() | ||
defer bs.sessIDLk.Unlock() | ||
bs.sessID++ | ||
return bs.sessID | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could use an atomic. However, is there any reason this uses session IDs instead of just pointers to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. doing it this way because other bits of code need to be able to reference which sessions things belong to. Otherwise we might end up with circular dependencies since bitswap imports wantlists and wantlists reference sessions IDs. |
||
|
||
// CancelWant removes a given key from the wantlist | ||
func (bs *Bitswap) CancelWants(cids []*cid.Cid) { | ||
bs.wm.CancelWants(cids) | ||
func (bs *Bitswap) CancelWants(cids []*cid.Cid, ses uint64) { | ||
if len(cids) == 0 { | ||
return | ||
} | ||
bs.wm.CancelWants(context.Background(), cids, nil, ses) | ||
} | ||
|
||
// HasBlock announces the existance of a block to this bitswap service. The | ||
|
@@ -340,7 +328,23 @@ func (bs *Bitswap) HasBlock(blk blocks.Block) error { | |
return nil | ||
} | ||
|
||
// SessionsForBlock returns a slice of all sessions that may be interested in the given cid | ||
func (bs *Bitswap) SessionsForBlock(c *cid.Cid) []*Session { | ||
bs.sessLk.Lock() | ||
defer bs.sessLk.Unlock() | ||
|
||
var out []*Session | ||
for _, s := range bs.sessions { | ||
if s.interestedIn(c) { | ||
out = append(out, s) | ||
} | ||
} | ||
return out | ||
} | ||
|
||
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) { | ||
atomic.AddUint64(&bs.counters.messagesRecvd, 1) | ||
|
||
// This call records changes to wantlists, blocks received, | ||
// and number of bytes transfered. | ||
bs.engine.MessageReceived(p, incoming) | ||
|
@@ -362,7 +366,6 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg | |
} | ||
keys = append(keys, block.Cid()) | ||
} | ||
bs.wm.CancelWants(keys) | ||
|
||
wg := sync.WaitGroup{} | ||
for _, block := range iblocks { | ||
|
@@ -375,6 +378,10 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg | |
k := b.Cid() | ||
log.Event(ctx, "Bitswap.GetBlockRequest.End", k) | ||
|
||
for _, ses := range bs.SessionsForBlock(k) { | ||
ses.receiveBlockFrom(p, b) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unless I'm mistaken, this is the only place that feeds blocks into the session. If a block gets evicted from the interested set before we find it, we'll never forward it to the session. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oooh, good catch. the 'InterestedIn' check should also check the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Out of curiosity, what happens if the user adds the block to the local blockstore via the commandline while we're trying to fetch it from the network? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All blocks added to the datastore get added through either the blockservice or the dagservice (which wraps the blockservice). The blockservice calls "exchange.HasBlock" for each block being added to it, then the blocks get pushed through the notifications channel There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't that a bit racy? That is, what happens if the user adds the block after the call to I'm asking because there's also race condition in this PR when a block is neither in the interest cache (we're interested in too many blocks) nor in the liveWants set (we haven't started fetching it yet) but gets fetched by another session (or manually added by the user). In this case, we'll try to fetch the block from the network anyways even if we already have it. So, I'm wondering if it's worth adding a notification framework to Blockstores where one can wait for blocks to be added. An alternative is to bitswap with ourself (but that seems kind of hacky). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The first one already exists but the second one is a bit different and has a much larger window. To reproduce:
The operation will never complete because, when added, the 100th node was neither in the interest set nor in the However, all these race conditions like this could be fixed by some centralized notification system (doesn't have to be built into the blockstore). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure that a notification system would actually eliminate race conditions like this, it seems very tricky to do without taking a lock around all 'put block' calls that gets released once the notification has finished propogating. Will think about this more after some sleep. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should be fine as long as we always notify when putting a block into the blockstore. Yes, we may end up starting a bitswap search for a block that you just got but we can cancel it immediately and don't have to wait for it to finish. My primary worry here is that the bitswap search may never finish if the block was retrieved on some side-channel and can't be found on the network. The way I see this working is as follows:
The main event loop for the BlockService will:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (Note: this is very much "future work" in case that wasn't clear.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Stebalien I pushed a fix that at least doesnt lose track of blocks that may be added locally, and left some TODO notes for future work. I think that covers the major outstanding concerns, there will definitely be followup PRs, but this one should be good to go |
||
bs.CancelWants([]*cid.Cid{k}, ses.id) | ||
} | ||
log.Debugf("got block %s from %s", b, p) | ||
if err := bs.HasBlock(b); err != nil { | ||
log.Warningf("ReceiveMessage HasBlock error: %s", err) | ||
|
@@ -401,12 +408,13 @@ func (bs *Bitswap) updateReceiveCounters(b blocks.Block) { | |
|
||
bs.counterLk.Lock() | ||
defer bs.counterLk.Unlock() | ||
c := bs.counters | ||
|
||
bs.blocksRecvd++ | ||
bs.dataRecvd += uint64(len(b.RawData())) | ||
c.blocksRecvd++ | ||
c.dataRecvd += uint64(len(b.RawData())) | ||
if has { | ||
bs.dupBlocksRecvd++ | ||
bs.dupDataRecvd += uint64(blkLen) | ||
c.dupBlocksRecvd++ | ||
c.dupDataRecvd += uint64(blkLen) | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed, can't we just pass
s.exchange
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope, because then the response wouldnt be
nil
(it would be a typed nil, which in interface form is != nil)