Skip to content

Commit

Permalink
Merge pull request #4851 from halseth/sweeper-cnct-deadlock
Browse files Browse the repository at this point in the history
sweeper: avoid deadlock on shutdown
  • Loading branch information
halseth authored Dec 10, 2020
2 parents f6f3ab5 + 77daa3d commit 08bb8ab
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 0 deletions.
32 changes: 32 additions & 0 deletions sweep/sweeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ type inputCluster struct {
// attempting to sweep.
type pendingSweepsReq struct {
respChan chan map[wire.OutPoint]*PendingInput
errChan chan error
}

// PendingInput contains information about an input that is currently being
Expand Down Expand Up @@ -381,6 +382,33 @@ func (s *UtxoSweeper) Start() error {
defer s.wg.Done()

s.collector(blockEpochs.Epochs)

// The collector exited and won't longer handle incoming
// requests. This can happen on shutdown, when the block
// notifier shuts down before the sweeper and its clients. In
// order to not deadlock the clients waiting for their requests
// being handled, we handle them here and immediately return an
// error. When the sweeper finally is shut down we can exit as
// the clients will be notified.
for {
select {
case inp := <-s.newInputs:
inp.resultChan <- Result{
Err: ErrSweeperShuttingDown,
}

case req := <-s.pendingSweepsReqs:
req.errChan <- ErrSweeperShuttingDown

case req := <-s.updateReqs:
req.responseChan <- &updateResp{
err: ErrSweeperShuttingDown,
}

case <-s.quit:
return
}
}
}()

return nil
Expand Down Expand Up @@ -1290,9 +1318,11 @@ func (s *UtxoSweeper) waitForSpend(outpoint wire.OutPoint,
// attempting to sweep.
func (s *UtxoSweeper) PendingInputs() (map[wire.OutPoint]*PendingInput, error) {
respChan := make(chan map[wire.OutPoint]*PendingInput, 1)
errChan := make(chan error, 1)
select {
case s.pendingSweepsReqs <- &pendingSweepsReq{
respChan: respChan,
errChan: errChan,
}:
case <-s.quit:
return nil, ErrSweeperShuttingDown
Expand All @@ -1301,6 +1331,8 @@ func (s *UtxoSweeper) PendingInputs() (map[wire.OutPoint]*PendingInput, error) {
select {
case pendingSweeps := <-respChan:
return pendingSweeps, nil
case err := <-errChan:
return nil, err
case <-s.quit:
return nil, ErrSweeperShuttingDown
}
Expand Down
41 changes: 41 additions & 0 deletions sweep/sweeper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2218,3 +2218,44 @@ func TestRequiredTxOuts(t *testing.T) {
})
}
}

// TestSweeperShutdownHandling tests that we notify callers when the sweeper
// cannot handle requests since it's in the process of shutting down.
func TestSweeperShutdownHandling(t *testing.T) {
ctx := createSweeperTestContext(t)

// Make the backing notifier break down. This is what happens during
// lnd shut down, since the notifier is stopped before the sweeper.
require.Len(t, ctx.notifier.epochChan, 1)
for epochChan := range ctx.notifier.epochChan {
close(epochChan)
}

// Give the collector some time to exit.
time.Sleep(50 * time.Millisecond)

// Now trying to sweep inputs should return an error on the error
// channel.
resultChan, err := ctx.sweeper.SweepInput(
spendableInputs[0], defaultFeePref,
)
require.NoError(t, err)

select {
case res := <-resultChan:
require.Equal(t, ErrSweeperShuttingDown, res.Err)

case <-time.After(defaultTestTimeout):
t.Fatalf("no result arrived")
}

// Stop the sweeper properly.
err = ctx.sweeper.Stop()
require.NoError(t, err)

// Now attempting to sweep an input should error out immediately.
_, err = ctx.sweeper.SweepInput(
spendableInputs[0], defaultFeePref,
)
require.Error(t, err)
}

0 comments on commit 08bb8ab

Please sign in to comment.