diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 91f66551ddc..dc5dcafe3f5 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -82,7 +82,6 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, }) bs := &Bitswap{ - self: p, blockstore: bstore, notifications: notif, engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method @@ -112,34 +111,36 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, // Bitswap instances implement the bitswap protocol. type Bitswap struct { + // the peermanager manages sending messages to peers in a way that + // wont block bitswap operation + wm *WantManager - // the ID of the peer to act on behalf of - self peer.ID + // the engine is the bit of logic that decides who to send which blocks to + engine *decision.Engine // network delivers messages on behalf of the session network bsnet.BitSwapNetwork - // the peermanager manages sending messages to peers in a way that - // wont block bitswap operation - wm *WantManager - // blockstore is the local database // NB: ensure threadsafety blockstore blockstore.Blockstore + // notifications engine for receiving new blocks and routing them to the + // appropriate user requests notifications notifications.PubSub - // send keys to a worker to find and connect to providers for them + // findKeys sends keys to a worker to find and connect to providers for them findKeys chan *blockRequest - - engine *decision.Engine - - process process.Process - + // newBlocks is a channel for newly added blocks to be provided to the + // network. blocks pushed down this channel get buffered and fed to the + // provideKeys channel later on to avoid too much network activity newBlocks chan *cid.Cid - + // provideKeys directly feeds provide workers provideKeys chan *cid.Cid + process process.Process + + // Counters for various statistics counterLk sync.Mutex blocksRecvd int dupBlocksRecvd int @@ -167,13 +168,12 @@ func (bs *Bitswap) GetBlock(parent context.Context, k *cid.Cid) (blocks.Block, e // enforce. May this comment keep you safe. ctx, cancelFunc := context.WithCancel(parent) + // TODO: this request ID should come in from a higher layer so we can track + // across multiple 'GetBlock' invocations ctx = logging.ContextWithLoggable(ctx, loggables.Uuid("GetBlockRequest")) log.Event(ctx, "Bitswap.GetBlockRequest.Start", k) defer log.Event(ctx, "Bitswap.GetBlockRequest.End", k) - - defer func() { - cancelFunc() - }() + defer cancelFunc() promise, err := bs.GetBlocks(ctx, []*cid.Cid{k}) if err != nil { diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index c0eeb2b5c9f..28d4690dd1f 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -175,28 +175,13 @@ func (mq *msgQueue) runQueue(ctx context.Context) { } func (mq *msgQueue) doWork(ctx context.Context) { - // allow ten minutes for connections - // this includes looking them up in the dht - // dialing them, and handshaking if mq.sender == nil { - conctx, cancel := context.WithTimeout(ctx, time.Minute*10) - defer cancel() - - err := mq.network.ConnectTo(conctx, mq.p) + err := mq.openSender(ctx) if err != nil { - log.Infof("cant connect to peer %s: %s", mq.p, err) + log.Infof("cant open message sender to peer %s: %s", mq.p, err) // TODO: cant connect, what now? return } - - nsender, err := mq.network.NewMessageSender(ctx, mq.p) - if err != nil { - log.Infof("cant open new stream to peer %s: %s", mq.p, err) - // TODO: cant open stream, what now? - return - } - - mq.sender = nsender } // grab outgoing message @@ -210,14 +195,64 @@ func (mq *msgQueue) doWork(ctx context.Context) { mq.outlk.Unlock() // send wantlist updates - err := mq.sender.SendMsg(wlm) - if err != nil { + for { // try to send this message until we fail. + err := mq.sender.SendMsg(wlm) + if err == nil { + return + } + log.Infof("bitswap send error: %s", err) mq.sender.Close() mq.sender = nil - // TODO: what do we do if this fails? - return + + select { + case <-mq.done: + return + case <-ctx.Done(): + return + case <-time.After(time.Millisecond * 100): + // wait 100ms in case disconnect notifications are still propogating + log.Warning("SendMsg errored but neither 'done' nor context.Done() were set") + } + + err = mq.openSender(ctx) + if err != nil { + log.Error("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err) + // TODO(why): what do we do now? + // I think the *right* answer is to probably put the message we're + // trying to send back, and then return to waiting for new work or + // a disconnect. + return + } + + // TODO: Is this the same instance for the remote peer? + // If its not, we should resend our entire wantlist to them + /* + if mq.sender.InstanceID() != mq.lastSeenInstanceID { + wlm = mq.getFullWantlistMessage() + } + */ + } +} + +func (mq *msgQueue) openSender(ctx context.Context) error { + // allow ten minutes for connections this includes looking them up in the + // dht dialing them, and handshaking + conctx, cancel := context.WithTimeout(ctx, time.Minute*10) + defer cancel() + + err := mq.network.ConnectTo(conctx, mq.p) + if err != nil { + return err + } + + nsender, err := mq.network.NewMessageSender(ctx, mq.p) + if err != nil { + return err } + + mq.sender = nsender + return nil } func (pm *WantManager) Connected(p peer.ID) { @@ -292,14 +327,13 @@ func (pm *WantManager) Run() { } func (wm *WantManager) newMsgQueue(p peer.ID) *msgQueue { - mq := new(msgQueue) - mq.done = make(chan struct{}) - mq.work = make(chan struct{}, 1) - mq.network = wm.network - mq.p = p - mq.refcnt = 1 - - return mq + return &msgQueue{ + done: make(chan struct{}), + work: make(chan struct{}, 1), + network: wm.network, + p: p, + refcnt: 1, + } } func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) { @@ -312,8 +346,7 @@ func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) { } }() - // if we have no message held, or the one we are given is full - // overwrite the one we are holding + // if we have no message held allocate a new one if mq.out == nil { mq.out = bsmsg.New(false) } diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index 942c37ba830..5e06447821d 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -197,6 +197,12 @@ func (bs *Bitswap) providerQueryManager(ctx context.Context) { for { select { case e := <-bs.findKeys: + select { // make sure its not already cancelled + case <-e.Ctx.Done(): + continue + default: + } + activeLk.Lock() if kset.Has(e.Cid) { activeLk.Unlock()