Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
fix(BUX-250): fix intenral tx flow;; fix write lock
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadiuszos4chain committed Nov 13, 2023
1 parent 33f6587 commit 483fbf7
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 48 deletions.
6 changes: 3 additions & 3 deletions paymail_service_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,15 @@ func (p *PaymailDefaultServiceProvider) RecordTransaction(ctx context.Context,
metadata[ReferenceIDField] = p2pTx.Reference

// Record the transaction
rts, err := getRecordTxStrategy(ctx, p.client, "", p2pTx.Hex, "")
rts, err := getIncomingTxRecordStrategy(ctx, p.client, p2pTx.Hex)
if err != nil {
return nil, err
}

rts.(recordIncomingTxStrategy).ForceBroadcast(true)
rts.ForceBroadcast(true)

if p2pTx.Beef != "" {
rts.(recordIncomingTxStrategy).FailOnBroadcastError(true)
rts.FailOnBroadcastError(true)
}

transaction, err := recordTransaction(ctx, p.client, rts, WithMetadatas(metadata))
Expand Down
25 changes: 20 additions & 5 deletions record_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,19 @@ import (

type recordTxStrategy interface {
TxID() string
LockKey() string
Validate() error
Execute(ctx context.Context, c ClientInterface, opts []ModelOps) (*Transaction, error)
}

type recordIncomingTxStrategy interface {
recordTxStrategy
ForceBroadcast(force bool)
FailOnBroadcastError(forceFail bool)
}

func recordTransaction(ctx context.Context, c ClientInterface, strategy recordTxStrategy, opts ...ModelOps) (*Transaction, error) {
unlock := waitForRecordTxWriteLock(ctx, c, strategy.TxID())
unlock := waitForRecordTxWriteLock(ctx, c, strategy.LockKey())
defer unlock()

return strategy.Execute(ctx, c, opts)
Expand Down Expand Up @@ -53,13 +55,13 @@ func getOutgoingTxRecordStrategy(xPubKey, txHex, draftID string) recordTxStrateg
}
}

func getIncomingTxRecordStrategy(ctx context.Context, c ClientInterface, txHex string) (recordTxStrategy, error) {
func getIncomingTxRecordStrategy(ctx context.Context, c ClientInterface, txHex string) (recordIncomingTxStrategy, error) {
tx, err := getTransactionByHex(ctx, txHex, c.DefaultModelOptions()...)
if err != nil {
return nil, err
}

var rts recordTxStrategy
var rts recordIncomingTxStrategy

if tx != nil {
rts = &internalIncomingTx{
Expand All @@ -84,15 +86,28 @@ func waitForRecordTxWriteLock(ctx context.Context, c ClientInterface, key string
// Create the lock and set the release for after the function completes
// Waits for the moment when the transaction is unlocked and creates a new lock
// Relevant for bux to bux transactions, as we have 1 tx but need to record 2 txs - outgoing and incoming

lockKey := fmt.Sprintf(lockKeyRecordTx, key)

// TODO: change to DEBUG level log when we will support it
c.Logger().Info(ctx, fmt.Sprintf("try add write lock %s", lockKey))

for {

unlock, err = newWriteLock(
ctx, fmt.Sprintf(lockKeyRecordTx, key), c.Cachestore(),
ctx, lockKey, c.Cachestore(),
)
if err == nil {
// TODO: change to DEBUG level log when we will support it
c.Logger().Info(ctx, fmt.Sprintf("added write lock %s", lockKey))
break
}
time.Sleep(time.Second * 1)
}

return unlock
return func() {
// TODO: change to DEBUG level log when we will support it
c.Logger().Info(ctx, fmt.Sprintf("unlock %s", lockKey))
unlock()
}
}
10 changes: 7 additions & 3 deletions record_tx_strategy_external_incoming_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (strategy *externalIncomingTx) Execute(ctx context.Context, c ClientInterfa
logger.
Error(ctx, fmt.Sprintf("ExternalIncomingTx.Execute(): broadcasting failed, transaction rejected! Reason: %s, TxID: %s", err, transaction.ID))

return nil, fmt.Errorf("ExternalIncomingTx.Execute(): broadcasting failed, transaction rejected! Reason: %w, TxID: %s", err, transaction.ID)
return nil, fmt.Errorf("ExternalIncomingTx.Execute(): broadcasting failed, transaction rejected! Reason: %w", err)
}
}

Expand All @@ -62,6 +62,10 @@ func (strategy *externalIncomingTx) TxID() string {
return btTx.TxID()
}

func (strategy *externalIncomingTx) LockKey() string {
return fmt.Sprintf("incoming-%s", strategy.TxID())
}

func (strategy *externalIncomingTx) ForceBroadcast(force bool) {
strategy.broadcastNow = force
}
Expand Down Expand Up @@ -120,8 +124,8 @@ func _hydrateExternalWithSync(tx *Transaction) {
// to simplfy: broadcast every external incoming txs
sync.BroadcastStatus = SyncStatusReady

sync.P2PStatus = SyncStatusSkipped // the owner of the Tx should have already notified paymail providers
//sync.SyncStatus = SyncStatusReady
sync.P2PStatus = SyncStatusSkipped // the sender of the Tx should have already notified us
sync.SyncStatus = SyncStatusPending // wait until transaciton will be broadcasted

// Use the same metadata
sync.Metadata = tx.Metadata
Expand Down
16 changes: 5 additions & 11 deletions record_tx_strategy_internal_incoming_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (strategy *internalIncomingTx) Execute(ctx context.Context, c ClientInterfa
// process
transaction := strategy.Tx
syncTx, err := GetSyncTransactionByID(ctx, transaction.ID, transaction.GetOptions(false)...)
if err != nil {
if err != nil || syncTx == nil {
return nil, fmt.Errorf("InternalIncomingTx.Execute(): getting syncTx failed. Reason: %w", err)
}

Expand Down Expand Up @@ -54,6 +54,10 @@ func (strategy *internalIncomingTx) TxID() string {
return strategy.Tx.ID
}

func (strategy *internalIncomingTx) LockKey() string {
return fmt.Sprintf("incoming-%s", strategy.Tx.ID)
}

func (strategy *internalIncomingTx) ForceBroadcast(force bool) {
strategy.broadcastNow = force
}
Expand All @@ -79,16 +83,6 @@ func _internalIncomingBroadcast(ctx context.Context, logger zLogger.GormLoggerIn
logger.
Warn(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): broadcasting failed, next try will be handled by task manager. Reason: %s, TxID: %s", err, transaction.ID))

// TODO: do I really need this?
if syncTx.BroadcastStatus == SyncStatusSkipped { // revert status to ready after fail to re-run broadcasting, this can happen when we received internal BEEF tx
syncTx.BroadcastStatus = SyncStatusReady

if err = syncTx.Save(ctx); err != nil {
logger.
Error(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): changing synctx.BroadcastStatus from Skipped to Ready failed. Reason: %s, TxID: %s", err, transaction.ID))
}
}

// ignore broadcast error - will be repeted by task manager
return nil
}
Expand Down
57 changes: 36 additions & 21 deletions record_tx_strategy_outgoing_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,58 +15,73 @@ type outgoingTx struct {
XPubKey string
}

func (tx *outgoingTx) Execute(ctx context.Context, c ClientInterface, opts []ModelOps) (*Transaction, error) {
func (strategy *outgoingTx) Execute(ctx context.Context, c ClientInterface, opts []ModelOps) (*Transaction, error) {
logger := c.Logger()
logger.Info(ctx, fmt.Sprintf("OutgoingTx.Execute(): start, TxID: %s", strategy.TxID()))

// process
transaction, err := _createOutgoingTxToRecord(ctx, tx, c, opts)

logger.Info(ctx, fmt.Sprintf("OutgoingTx.Execute(): start, TxID: %s", transaction.ID))

// create
transaction, err := _createOutgoingTxToRecord(ctx, strategy, c, opts)
if err != nil {
return nil, fmt.Errorf("OutgoingTx.Execute(): creation of outgoing tx failed. Reason: %w", err)
}

if err = transaction.Save(ctx); err != nil {
return nil, fmt.Errorf("OutgoingTx.Execute(): saving of Transaction failed. Reason: %w", err)
}

// process
if transaction.syncTransaction.P2PStatus == SyncStatusReady {
if err = _outgoingNotifyP2p(ctx, logger, transaction); err != nil {
return nil, err // reject transaction if P2P notification failed
// reject transaction if P2P notification failed
logger.Error(ctx, fmt.Sprintf("OutgoingTx.Execute(): transaction rejected by P2P provider, try to revert transaction. Reason: %s", err))

if revertErr := c.RevertTransaction(ctx, transaction.ID); revertErr != nil {
logger.Error(ctx, fmt.Sprintf("OutgoingTx.Execute(): FATAL! Reverting transaction after failed P2P notification failed. Reason: %s", err))
}

return nil, err
}
}

if transaction.syncTransaction.BroadcastStatus == SyncStatusReady {
_outgoingBroadcast(ctx, logger, transaction) // ignore error, transaction will be broadcasted by cron task
// get newest syncTx from DB - if it's an internal tx it could be broadcasted by us already
syncTx, err := GetSyncTransactionByID(ctx, transaction.ID, transaction.GetOptions(false)...)
if err != nil || syncTx == nil {
return nil, fmt.Errorf("OutgoingTx.Execute(): getting syncTx failed. Reason: %w", err)
}

// record
if err = transaction.Save(ctx); err != nil {
return nil, fmt.Errorf("OutgoingTx.Execute(): saving of Transaction failed. Reason: %w", err)
if syncTx.BroadcastStatus == SyncStatusReady {
_outgoingBroadcast(ctx, logger, transaction) // ignore error
}

logger.Info(ctx, fmt.Sprintf("OutgoingTx.Execute(): complete, TxID: %s", transaction.ID))
return transaction, nil
}

func (tx outgoingTx) Validate() error {
if tx.Hex == "" {
func (strategy *outgoingTx) Validate() error {
if strategy.Hex == "" {
return ErrMissingFieldHex
}

if tx.RelatedDraftID == "" {
if strategy.RelatedDraftID == "" {
return errors.New("empty RelatedDraftID")
}

if tx.XPubKey == "" {
return errors.New("empty xPubKey") // is it required ?
if strategy.XPubKey == "" {
return errors.New("empty xPubKey")
}

return nil // is valid
}

func (tx outgoingTx) TxID() string {
btTx, _ := bt.NewTxFromString(tx.Hex)
func (strategy *outgoingTx) TxID() string {
btTx, _ := bt.NewTxFromString(strategy.Hex)
return btTx.TxID()
}

func (strategy *outgoingTx) LockKey() string {
return fmt.Sprintf("outgoing-%s", strategy.TxID())
}

func _createOutgoingTxToRecord(ctx context.Context, oTx *outgoingTx, c ClientInterface, opts []ModelOps) (*Transaction, error) {
// Create NEW transaction model
newOpts := c.DefaultModelOptions(append(opts, WithXPub(oTx.XPubKey), New())...)
Expand Down Expand Up @@ -124,7 +139,7 @@ func _hydrateOutgoingWithSync(tx *Transaction) {
// setup synchronization
sync.BroadcastStatus = _getBroadcastSyncStatus(tx)
sync.P2PStatus = _getP2pSyncStatus(tx)
//sync.SyncStatus = SyncStatusReady
sync.SyncStatus = SyncStatusPending // wait until transaction is broadcasted or P2P provider is notified

sync.Metadata = tx.Metadata

Expand Down Expand Up @@ -186,7 +201,7 @@ func _outgoingBroadcast(ctx context.Context, logger zLogger.GormLoggerInterface,
if err := broadcastSyncTransaction(ctx, tx.syncTransaction); err != nil {
// ignore error, transaction will be broadcasted by cron task
logger.
Warn(ctx, fmt.Sprintf("OutgoingTx.Execute(): broadcasting failed. Reason: %s, TxID: %s", err, tx.ID))
Warn(ctx, fmt.Sprintf("OutgoingTx.Execute(): broadcasting failed, next try will be handled by task manager. Reason: %s, TxID: %s", err, tx.ID))
} else {
logger.
Info(ctx, fmt.Sprintf("OutgoingTx.Execute(): broadcast complete, TxID: %s", tx.ID))
Expand Down
11 changes: 6 additions & 5 deletions sync_tx_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,6 @@ func broadcastSyncTransaction(ctx context.Context, syncTx *SyncTransaction) erro
StatusMessage: message,
})

// Update the P2P status
if syncTx.P2PStatus == SyncStatusPending {
syncTx.P2PStatus = SyncStatusReady
}

// Update sync status to be ready now
if syncTx.SyncStatus == SyncStatusPending {
syncTx.SyncStatus = SyncStatusReady
Expand Down Expand Up @@ -357,6 +352,12 @@ func processP2PTransaction(ctx context.Context, syncTx *SyncTransaction, transac

// Save the record
syncTx.P2PStatus = SyncStatusComplete

// Update sync status to be ready now
if syncTx.SyncStatus == SyncStatusPending {
syncTx.SyncStatus = SyncStatusReady
}

if err = syncTx.Save(ctx); err != nil {
_bailAndSaveSyncTransaction(
ctx, syncTx, SyncStatusError, syncActionP2P, "internal", err.Error(),
Expand Down

0 comments on commit 483fbf7

Please sign in to comment.