Skip to content

Commit

Permalink
Merge pull request #3408 from ipfs/feat/bitswap-cleanup
Browse files Browse the repository at this point in the history
cleanup bitswap and handle message send failure slightly better
  • Loading branch information
whyrusleeping committed Nov 27, 2016
2 parents 4511a4a + 42e6e54 commit 4e8015d
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 49 deletions.
36 changes: 18 additions & 18 deletions exchange/bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
})

bs := &Bitswap{
self: p,
blockstore: bstore,
notifications: notif,
engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
Expand Down Expand Up @@ -112,34 +111,36 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,

// Bitswap instances implement the bitswap protocol.
type Bitswap struct {
// the peermanager manages sending messages to peers in a way that
// wont block bitswap operation
wm *WantManager

// the ID of the peer to act on behalf of
self peer.ID
// the engine is the bit of logic that decides who to send which blocks to
engine *decision.Engine

// network delivers messages on behalf of the session
network bsnet.BitSwapNetwork

// the peermanager manages sending messages to peers in a way that
// wont block bitswap operation
wm *WantManager

// blockstore is the local database
// NB: ensure threadsafety
blockstore blockstore.Blockstore

// notifications engine for receiving new blocks and routing them to the
// appropriate user requests
notifications notifications.PubSub

// send keys to a worker to find and connect to providers for them
// findKeys sends keys to a worker to find and connect to providers for them
findKeys chan *blockRequest

engine *decision.Engine

process process.Process

// newBlocks is a channel for newly added blocks to be provided to the
// network. blocks pushed down this channel get buffered and fed to the
// provideKeys channel later on to avoid too much network activity
newBlocks chan *cid.Cid

// provideKeys directly feeds provide workers
provideKeys chan *cid.Cid

process process.Process

// Counters for various statistics
counterLk sync.Mutex
blocksRecvd int
dupBlocksRecvd int
Expand Down Expand Up @@ -167,13 +168,12 @@ func (bs *Bitswap) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, e
// enforce. May this comment keep you safe.
ctx, cancelFunc := context.WithCancel(parent)

// TODO: this request ID should come in from a higher layer so we can track
// across multiple 'GetBlock' invocations
ctx = logging.ContextWithLoggable(ctx, loggables.Uuid("GetBlockRequest"))
log.Event(ctx, "Bitswap.GetBlockRequest.Start", k)
defer log.Event(ctx, "Bitswap.GetBlockRequest.End", k)

defer func() {
cancelFunc()
}()
defer cancelFunc()

promise, err := bs.GetBlocks(ctx, []*cid.Cid{k})
if err != nil {
Expand Down
95 changes: 64 additions & 31 deletions exchange/bitswap/wantmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,28 +175,13 @@ func (mq *msgQueue) runQueue(ctx context.Context) {
}

func (mq *msgQueue) doWork(ctx context.Context) {
// allow ten minutes for connections
// this includes looking them up in the dht
// dialing them, and handshaking
if mq.sender == nil {
conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
defer cancel()

err := mq.network.ConnectTo(conctx, mq.p)
err := mq.openSender(ctx)
if err != nil {
log.Infof("cant connect to peer %s: %s", mq.p, err)
log.Infof("cant open message sender to peer %s: %s", mq.p, err)
// TODO: cant connect, what now?
return
}

nsender, err := mq.network.NewMessageSender(ctx, mq.p)
if err != nil {
log.Infof("cant open new stream to peer %s: %s", mq.p, err)
// TODO: cant open stream, what now?
return
}

mq.sender = nsender
}

// grab outgoing message
Expand All @@ -210,14 +195,64 @@ func (mq *msgQueue) doWork(ctx context.Context) {
mq.outlk.Unlock()

// send wantlist updates
err := mq.sender.SendMsg(wlm)
if err != nil {
for { // try to send this message until we fail.
err := mq.sender.SendMsg(wlm)
if err == nil {
return
}

log.Infof("bitswap send error: %s", err)
mq.sender.Close()
mq.sender = nil
// TODO: what do we do if this fails?
return

select {
case <-mq.done:
return
case <-ctx.Done():
return
case <-time.After(time.Millisecond * 100):
// wait 100ms in case disconnect notifications are still propogating
log.Warning("SendMsg errored but neither 'done' nor context.Done() were set")
}

err = mq.openSender(ctx)
if err != nil {
log.Error("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err)
// TODO(why): what do we do now?
// I think the *right* answer is to probably put the message we're
// trying to send back, and then return to waiting for new work or
// a disconnect.
return
}

// TODO: Is this the same instance for the remote peer?
// If its not, we should resend our entire wantlist to them
/*
if mq.sender.InstanceID() != mq.lastSeenInstanceID {
wlm = mq.getFullWantlistMessage()
}
*/
}
}

func (mq *msgQueue) openSender(ctx context.Context) error {
// allow ten minutes for connections this includes looking them up in the
// dht dialing them, and handshaking
conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
defer cancel()

err := mq.network.ConnectTo(conctx, mq.p)
if err != nil {
return err
}

nsender, err := mq.network.NewMessageSender(ctx, mq.p)
if err != nil {
return err
}

mq.sender = nsender
return nil
}

func (pm *WantManager) Connected(p peer.ID) {
Expand Down Expand Up @@ -292,14 +327,13 @@ func (pm *WantManager) Run() {
}

func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue {
mq := new(msgQueue)
mq.done = make(chan struct{})
mq.work = make(chan struct{}, 1)
mq.network = wm.network
mq.p = p
mq.refcnt = 1

return mq
return &msgQueue{
done: make(chan struct{}),
work: make(chan struct{}, 1),
network: wm.network,
p: p,
refcnt: 1,
}
}

func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) {
Expand All @@ -312,8 +346,7 @@ func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) {
}
}()

// if we have no message held, or the one we are given is full
// overwrite the one we are holding
// if we have no message held allocate a new one
if mq.out == nil {
mq.out = bsmsg.New(false)
}
Expand Down
6 changes: 6 additions & 0 deletions exchange/bitswap/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@ func (bs *Bitswap) providerQueryManager(ctx context.Context) {
for {
select {
case e := <-bs.findKeys:
select { // make sure its not already cancelled
case <-e.Ctx.Done():
continue
default:
}

activeLk.Lock()
if kset.Has(e.Cid) {
activeLk.Unlock()
Expand Down

0 comments on commit 4e8015d

Please sign in to comment.