Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
feat(BUX-291): make strategies more readable
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadiuszos4chain committed Nov 6, 2023
1 parent 7f51d26 commit 4cafaf3
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 51 deletions.
30 changes: 17 additions & 13 deletions record_tx_strategy_external_incoming_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"

"github.com/libsv/go-bt/v2"
zLogger "github.com/mrz1836/go-logger"
)

type externalIncomingTx struct {
Expand All @@ -21,24 +22,14 @@ func (tx *externalIncomingTx) Execute(ctx context.Context, c ClientInterface, op
}

transaction, err := _createExternalTxToRecord(ctx, tx, c, opts)

logger.Info(ctx, fmt.Sprintf("ExternalIncomingTx.Execute(): start without ITC, TxID: %s", transaction.ID))

if err != nil {
return nil, fmt.Errorf("ExternalIncomingTx.Execute(): creation of external incoming tx failed. Reason: %w", err)
}

logger.Info(ctx, fmt.Sprintf("ExternalIncomingTx.Execute(): start without ITC, TxID: %s", transaction.ID))

if transaction.syncTransaction.BroadcastStatus == SyncStatusReady {
logger.Info(ctx, fmt.Sprintf("ExternalIncomingTx.Execute(): start broadcast, TxID: %s", transaction.ID))

if err = broadcastSyncTransaction(ctx, transaction.syncTransaction); err != nil {
// ignore error, transaction will be broadcaset in a cron task
logger.
Warn(ctx, fmt.Sprintf("ExternalIncomingTx.Execute(): broadcasting failed. Reason: %s, TxID: %s", err, transaction.ID))
} else {
logger.
Info(ctx, fmt.Sprintf("ExternalIncomingTx.Execute(): broadcast complete, TxID: %s", transaction.ID))
}
_externalIncomingBroadcast(ctx, logger, transaction) // ignore error, transaction will be broadcaset in a cron task
}

// record
Expand Down Expand Up @@ -125,3 +116,16 @@ func _hydrateExternalWithSync(tx *Transaction) {
sync.transaction = tx
tx.syncTransaction = sync
}

func _externalIncomingBroadcast(ctx context.Context, logger zLogger.GormLoggerInterface, tx *Transaction) {
logger.Info(ctx, fmt.Sprintf("ExternalIncomingTx.Execute(): start broadcast, TxID: %s", tx.ID))

if err := broadcastSyncTransaction(ctx, tx.syncTransaction); err != nil {
// ignore error, transaction will be broadcaset in a cron task
logger.
Warn(ctx, fmt.Sprintf("ExternalIncomingTx.Execute(): broadcasting failed. Reason: %s, TxID: %s", err, tx.ID))
} else {
logger.
Info(ctx, fmt.Sprintf("ExternalIncomingTx.Execute(): broadcast complete, TxID: %s", tx.ID))
}
}
49 changes: 29 additions & 20 deletions record_tx_strategy_internal_incoming_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"

zLogger "github.com/mrz1836/go-logger"
)

type internalIncomingTx struct {
Expand All @@ -23,28 +25,10 @@ func (tx *internalIncomingTx) Execute(ctx context.Context, c ClientInterface, op
}

if tx.BroadcastNow || syncTx.BroadcastStatus == SyncStatusReady {
logger.Info(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): start broadcast, TxID: %s", transaction.ID))

syncTx.transaction = transaction
err := broadcastSyncTransaction(ctx, syncTx)
if err != nil {
logger.
Warn(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): broadcasting failed. Reason: %s, TxID: %s", err, transaction.ID))

if syncTx.BroadcastStatus == SyncStatusSkipped { // revert status to ready after fail to re-run broadcasting, this can happen when we received internal BEEF tx
syncTx.BroadcastStatus = SyncStatusReady

if err = syncTx.Save(ctx); err != nil {
logger.
Error(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): changing synctx.BroadcastStatus from Pending to Ready failed. Reason: %s, TxID: %s", err, transaction.ID))
}
}
transaction.syncTransaction = syncTx

// ignore broadcast error - will be repeted by task manager
} else {
logger.
Info(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): broadcast complete, TxID: %s", transaction.ID))
}
_internalIncomingBroadcast(ctx, logger, transaction) // ignore broadcast error - will be repeted by task manager
}

logger.Info(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): complete, TxID: %s", transaction.ID))
Expand All @@ -66,3 +50,28 @@ func (tx *internalIncomingTx) TxID() string {
func (tx *internalIncomingTx) ForceBroadcast(force bool) {
tx.BroadcastNow = force
}

func _internalIncomingBroadcast(ctx context.Context, logger zLogger.GormLoggerInterface, transaction *Transaction) {
logger.Info(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): start broadcast, TxID: %s", transaction.ID))

syncTx := transaction.syncTransaction
err := broadcastSyncTransaction(ctx, syncTx)
if err != nil {
logger.
Warn(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): broadcasting failed. Reason: %s, TxID: %s", err, transaction.ID))

if syncTx.BroadcastStatus == SyncStatusSkipped { // revert status to ready after fail to re-run broadcasting, this can happen when we received internal BEEF tx
syncTx.BroadcastStatus = SyncStatusReady

if err = syncTx.Save(ctx); err != nil {
logger.
Error(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): changing synctx.BroadcastStatus from Pending to Ready failed. Reason: %s, TxID: %s", err, transaction.ID))
}
}

// ignore broadcast error - will be repeted by task manager
} else {
logger.
Info(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): broadcast complete, TxID: %s", transaction.ID))
}
}
48 changes: 30 additions & 18 deletions record_tx_strategy_outgoing_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"

"github.com/libsv/go-bt/v2"
zLogger "github.com/mrz1836/go-logger"
)

type outgoingTx struct {
Expand All @@ -27,29 +28,13 @@ func (tx *outgoingTx) Execute(ctx context.Context, c ClientInterface, opts []Mod
}

if transaction.syncTransaction.P2PStatus == SyncStatusReady {
logger.Info(ctx, fmt.Sprintf("OutgoingTx.Execute(): start p2p, TxID: %s", transaction.ID))

if err = processP2PTransaction(ctx, transaction.syncTransaction, transaction); err != nil {
logger.
Error(ctx, fmt.Sprintf("OutgoingTx.Execute(): processP2PTransaction failed. Reason: %s, TxID: %s", err, transaction.ID))

if err = _outgoingNotifyP2p(ctx, logger, transaction); err != nil {
return nil, err // reject transaction if P2P notification failed
}

logger.Info(ctx, fmt.Sprintf("OutgoingTx.Execute(): p2p complete, TxID: %s", transaction.ID))
}

if transaction.syncTransaction.BroadcastStatus == SyncStatusReady {
logger.Info(ctx, fmt.Sprintf("OutgoingTx.Execute(): start broadcast, TxID: %s", transaction.ID))

if err = broadcastSyncTransaction(ctx, transaction.syncTransaction); err != nil {
// ignore error, transaction will be broadcasted by cron task
logger.
Warn(ctx, fmt.Sprintf("OutgoingTx.Execute(): broadcasting failed. Reason: %s, TxID: %s", err, transaction.ID))
} else {
logger.
Info(ctx, fmt.Sprintf("OutgoingTx.Execute(): broadcast complete, TxID: %s", transaction.ID))
}
_outgoingBroadcast(ctx, logger, transaction) // ignore error, transaction will be broadcasted by cron task
}

// record
Expand Down Expand Up @@ -180,3 +165,30 @@ func _getP2pSyncStatus(tx *Transaction) SyncStatus {

return p2pStatus
}

func _outgoingNotifyP2p(ctx context.Context, logger zLogger.GormLoggerInterface, tx *Transaction) error {
logger.Info(ctx, fmt.Sprintf("OutgoingTx.Execute(): start p2p, TxID: %s", tx.ID))

if err := processP2PTransaction(ctx, tx.syncTransaction, tx); err != nil {
logger.
Error(ctx, fmt.Sprintf("OutgoingTx.Execute(): processP2PTransaction failed. Reason: %s, TxID: %s", err, tx.ID))

return err
}

logger.Info(ctx, fmt.Sprintf("OutgoingTx.Execute(): p2p complete, TxID: %s", tx.ID))
return nil
}

func _outgoingBroadcast(ctx context.Context, logger zLogger.GormLoggerInterface, tx *Transaction) {
logger.Info(ctx, fmt.Sprintf("OutgoingTx.Execute(): start broadcast, TxID: %s", tx.ID))

if err := broadcastSyncTransaction(ctx, tx.syncTransaction); err != nil {
// ignore error, transaction will be broadcasted by cron task
logger.
Warn(ctx, fmt.Sprintf("OutgoingTx.Execute(): broadcasting failed. Reason: %s, TxID: %s", err, tx.ID))
} else {
logger.
Info(ctx, fmt.Sprintf("OutgoingTx.Execute(): broadcast complete, TxID: %s", tx.ID))
}
}

0 comments on commit 4cafaf3

Please sign in to comment.