Skip to content

Commit

Permalink
chore: improve notification spend (#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
gusin13 authored Jan 23, 2025
1 parent 1877fec commit 12a09ec
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 32 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@
# Go workspace file
go.work

rabbitmq_data
rabbitmq_data

.vscode
46 changes: 35 additions & 11 deletions internal/services/pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,41 @@ func (s *Service) processBTCSubscriber(ctx context.Context) error {
continue
}

if err := s.registerStakingSpendNotification(
delegation.StakingTxHashHex,
delegation.StakingTx.TxHex,
uint32(delegation.StakingTx.OutputIndex),
uint32(delegation.StakingTx.StartHeight),
); err != nil {
log.Error().
Err(err).
Str("stakingTxHash", delegation.StakingTxHashHex).
Msg("Failed to register staking spend notification")
return fmt.Errorf("failed to register staking spend notification: %w", err)
if delegation.State == types.Unbonded && delegation.UnbondingTx != nil {
// For early unbonded delegations i.e. state is Unbonded and Unbonding Tx is present:
// 1. Staking output is already spent by the unbonding tx
// 2. Track unbonding output to detect withdrawal tx

// we are certain the unbonding tx will be spent after the timelock expires
unbondingSpendHeightHint := delegation.UnbondingTx.StartHeight + delegation.UnbondingTx.TimeLock - 1
if err := s.registerUnbondingSpendNotification(
delegation.StakingTxHashHex,
delegation.UnbondingTx.TxHex,
uint32(unbondingSpendHeightHint),
); err != nil {
log.Error().
Err(err).
Str("stakingTxHash", delegation.StakingTxHashHex).
Msg("Failed to register unbonding spend notification")
return fmt.Errorf("failed to register unbonding spend notification: %w", err)
}
} else {
// For all other cases, we track the staking transaction output:
// 1. Natural unbonding: Need to detect withdrawal tx
// 2. Unbonding requested: Need to monitor staking output
// until the unbonding transaction is found.
if err := s.registerStakingSpendNotification(
delegation.StakingTxHashHex,
delegation.StakingTx.TxHex,
uint32(delegation.StakingTx.OutputIndex),
uint32(delegation.StakingTx.StartHeight),
); err != nil {
log.Error().
Err(err).
Str("stakingTxHash", delegation.StakingTxHashHex).
Msg("Failed to register staking spend notification")
return fmt.Errorf("failed to register staking spend notification: %w", err)
}
}

s.trackedSubs.AddSubscription(delegation.StakingTxHashHex)
Expand Down
55 changes: 35 additions & 20 deletions internal/services/watch_btc_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (s *Service) watchForSpendStakingTx(
)
if err != nil {
log.Error().
Interface("error", err).
Err(err).
Stack().
Str("staking_tx", stakingTxHashHex).
Str("spending_tx", spendDetail.SpendingTx.TxHash().String()).
Expand All @@ -65,7 +65,6 @@ func (s *Service) watchForSpendUnbondingTx(
spendEvent *notifier.SpendEvent,
stakingTxHashHex string,
) {
defer s.wg.Done()
quitCtx, cancel := s.quitContext()
defer cancel()

Expand All @@ -83,7 +82,7 @@ func (s *Service) watchForSpendUnbondingTx(
)
if err != nil {
log.Error().
Interface("error", err).
Err(err).
Stack().
Str("staking_tx", stakingTxHashHex).
Str("spending_tx", spendDetail.SpendingTx.TxHash().String()).
Expand Down Expand Up @@ -166,7 +165,8 @@ func (s *Service) handleSpendingStakingTransaction(
utils.PushOrQuit(s.unbondingDelegationChan, unbondingEvent, s.quit)

// Register unbonding spend notification
return s.registerUnbondingSpendNotification(stakingTxHashHex, unbondingTxHex, unbondingStartHeight)
unbondingSpendHeightHint := unbondingStartHeight + delegation.UnbondingTx.TimeLock - 1
return s.registerUnbondingSpendNotification(stakingTxHashHex, unbondingTxHex, uint32(unbondingSpendHeightHint))
}

// Try to validate as withdrawal transaction
Expand Down Expand Up @@ -545,7 +545,7 @@ func (s *Service) registerStakingSpendNotification(

log.Debug().
Str("staking_tx", stakingTxHashHex).
Msg("registered spend notification")
Msg("registered staking spend notification")

// Watch in the same goroutine
s.watchForSpendStakingTx(spendEv, stakingTxHashHex)
Expand All @@ -557,7 +557,7 @@ func (s *Service) registerStakingSpendNotification(
func (s *Service) registerUnbondingSpendNotification(
stakingTxHashHex string,
unbondingTxHex string,
unbondingStartHeight uint64,
spendHeightHint uint32,
) error {
unbondingTxBytes, parseErr := hex.DecodeString(unbondingTxHex)
if parseErr != nil {
Expand All @@ -574,22 +574,37 @@ func (s *Service) registerUnbondingSpendNotification(
Index: 0, // unbonding tx has only 1 output
}

spendEv, btcErr := s.btcNotifier.RegisterSpendNtfn(
&unbondingOutpoint,
unbondingTx.TxOut[0].PkScript,
uint32(unbondingStartHeight),
)
if btcErr != nil {
return fmt.Errorf("failed to register spend ntfn for unbonding tx %s: %w", stakingTxHashHex, btcErr)
}
// Launch both registration and watching in a single goroutine
// to save time for caller
s.wg.Add(1)
go func() {
defer s.wg.Done()

spendEv, btcErr := s.btcNotifier.RegisterSpendNtfn(
&unbondingOutpoint,
unbondingTx.TxOut[0].PkScript,
spendHeightHint,
)
if btcErr != nil {
// TODO: Handle the error in a better way such as retrying immediately
// If continue to fail, we could retry by sending to queue and processing
// later again to make sure we don't miss any spend
// Will leave it as it is for now with alerts on log
log.Error().
Err(btcErr).
Str("staking_tx", stakingTxHashHex).
Msg("failed to register early unbonding spend notification")
return
}

log.Debug().
Str("staking_tx", stakingTxHashHex).
Str("unbonding_tx", unbondingTx.TxHash().String()).
Msg("registered early unbonding spend notification")
log.Debug().
Str("staking_tx", stakingTxHashHex).
Str("unbonding_tx", unbondingTx.TxHash().String()).
Msg("registered early unbonding spend notification")

s.wg.Add(1)
go s.watchForSpendUnbondingTx(spendEv, stakingTxHashHex)
// Watch in the same goroutine
s.watchForSpendUnbondingTx(spendEv, stakingTxHashHex)
}()

return nil
}

0 comments on commit 12a09ec

Please sign in to comment.