Skip to content

Commit

Permalink
sweep: remove block subscription in UtxoSweeper and TxPublisher
Browse files Browse the repository at this point in the history
This commit removes the independent block subscriptions in `UtxoSweeper`
and `TxPublisher`. These subsystems now listen to the `BlockbeatChan`
for new blocks.
  • Loading branch information
yyforyongyu committed Nov 18, 2024
1 parent 1c5eded commit c07e1ee
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 55 deletions.
31 changes: 9 additions & 22 deletions sweep/fee_bumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,13 +799,8 @@ func (t *TxPublisher) Start() error {
return fmt.Errorf("TxPublisher started more than once")
}

blockEvent, err := t.cfg.Notifier.RegisterBlockEpochNtfn(nil)
if err != nil {
return fmt.Errorf("register block epoch ntfn: %w", err)
}

t.wg.Add(1)
go t.monitor(blockEvent)
go t.monitor()

log.Debugf("TxPublisher started")

Expand Down Expand Up @@ -833,33 +828,25 @@ func (t *TxPublisher) Stop() error {
// to be bumped. If so, it will attempt to bump the fee of the tx.
//
// NOTE: Must be run as a goroutine.
func (t *TxPublisher) monitor(blockEvent *chainntnfs.BlockEpochEvent) {
defer blockEvent.Cancel()
func (t *TxPublisher) monitor() {
defer t.wg.Done()

for {
select {
case epoch, ok := <-blockEvent.Epochs:
if !ok {
// We should stop the publisher before stopping
// the chain service. Otherwise it indicates an
// error.
log.Error("Block epoch channel closed, exit " +
"monitor")

return
}

log.Debugf("TxPublisher received new block: %v",
epoch.Height)
case beat := <-t.BlockbeatChan:
height := beat.Height()
log.Debugf("TxPublisher received new block: %v", height)

// Update the best known height for the publisher.
t.currentHeight.Store(epoch.Height)
t.currentHeight.Store(height)

// Check all monitored txns to see if any of them needs
// to be bumped.
t.processRecords()

// Notify we've processed the block.
t.NotifyBlockProcessed(beat, nil)

case <-t.quit:
log.Debug("Fee bumper stopped, exit monitor")
return
Expand Down
42 changes: 9 additions & 33 deletions sweep/sweeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,21 +452,12 @@ func (s *UtxoSweeper) Start() error {
// not change from here on.
s.relayFeeRate = s.cfg.FeeEstimator.RelayFeePerKW()

// We need to register for block epochs and retry sweeping every block.
// We should get a notification with the current best block immediately
// if we don't provide any epoch. We'll wait for that in the collector.
blockEpochs, err := s.cfg.Notifier.RegisterBlockEpochNtfn(nil)
if err != nil {
return fmt.Errorf("register block epoch ntfn: %w", err)
}

// Start sweeper main loop.
s.wg.Add(1)
go func() {
defer blockEpochs.Cancel()
defer s.wg.Done()

s.collector(blockEpochs.Epochs)
s.collector()

// The collector exited and won't longer handle incoming
// requests. This can happen on shutdown, when the block
Expand Down Expand Up @@ -657,17 +648,8 @@ func (s *UtxoSweeper) removeConflictSweepDescendants(

// collector is the sweeper main loop. It processes new inputs, spend
// notifications and counts down to publication of the sweep tx.
func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
// We registered for the block epochs with a nil request. The notifier
// should send us the current best block immediately. So we need to wait
// for it here because we need to know the current best height.
select {
case bestBlock := <-blockEpochs:
s.currentHeight = bestBlock.Height

case <-s.quit:
return
}
func (s *UtxoSweeper) collector() {
defer s.wg.Done()

for {
// Clean inputs, which will remove inputs that are swept,
Expand Down Expand Up @@ -737,25 +719,16 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {

// A new block comes in, update the bestHeight, perform a check
// over all pending inputs and publish sweeping txns if needed.
case epoch, ok := <-blockEpochs:
if !ok {
// We should stop the sweeper before stopping
// the chain service. Otherwise it indicates an
// error.
log.Error("Block epoch channel closed")

return
}

case beat := <-s.BlockbeatChan:
// Update the sweeper to the best height.
s.currentHeight = epoch.Height
s.currentHeight = beat.Height()

// Update the inputs with the latest height.
inputs := s.updateSweeperInputs()

log.Debugf("Received new block: height=%v, attempt "+
"sweeping %d inputs:\n%s",
epoch.Height, len(inputs),
s.currentHeight, len(inputs),
lnutils.NewLogClosure(func() string {
inps := make(
[]input.Input, 0, len(inputs),
Expand All @@ -770,6 +743,9 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
// Attempt to sweep any pending inputs.
s.sweepPendingInputs(inputs)

// Notify we've processed the block.
s.NotifyBlockProcessed(beat, nil)

case <-s.quit:
return
}
Expand Down

0 comments on commit c07e1ee

Please sign in to comment.