Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
perf(BUX-000): refactorize P2P - send tx to the receiver only once
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadiuszos4chain committed Dec 27, 2023
1 parent 1c5c50b commit 291a7fb
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 48 deletions.
2 changes: 1 addition & 1 deletion record_tx_strategy_outgoing_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func _outgoingNotifyP2p(ctx context.Context, logger *zerolog.Logger, tx *Transac
Str("txID", tx.ID).
Msg("start p2p")

if err := processP2PTransaction(ctx, tx.syncTransaction, tx); err != nil {
if err := processP2PTransaction(ctx, tx); err != nil {
logger.Error().
Str("txID", tx.ID).
Msgf("processP2PTransaction failed. Reason: %s", err)
Expand Down
87 changes: 40 additions & 47 deletions sync_tx_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,11 @@ func _syncTxDataFromChain(ctx context.Context, syncTx *SyncTransaction, transact
}

// processP2PTransaction will process the sync transaction record, or save the failure
func processP2PTransaction(ctx context.Context, syncTx *SyncTransaction, transaction *Transaction) error {
func processP2PTransaction(ctx context.Context, tx *Transaction) error {
// Successfully capture any panics, convert to readable string and log the error
defer recoverAndLog(syncTx.Client().Logger())
defer recoverAndLog(tx.Client().Logger())

syncTx := tx.syncTransaction
// Create the lock and set the release for after the function completes
unlock, err := newWriteLock(
ctx, fmt.Sprintf(lockKeyProcessP2PTx, syncTx.GetID()), syncTx.Client().Cachestore(),
Expand All @@ -286,17 +287,8 @@ func processP2PTransaction(ctx context.Context, syncTx *SyncTransaction, transac
return err
}

// Get the transaction
if transaction == nil {
if transaction, err = getTransactionByID(
ctx, "", syncTx.ID, syncTx.GetOptions(false)...,
); err != nil {
return err
}
}

// No draft?
if len(transaction.DraftID) == 0 {
if len(tx.DraftID) == 0 {
_bailAndSaveSyncTransaction(
ctx, syncTx, SyncStatusComplete, syncActionP2P, "all", "no draft found, cannot complete p2p",
)
Expand All @@ -305,7 +297,7 @@ func processP2PTransaction(ctx context.Context, syncTx *SyncTransaction, transac

// Notify any P2P paymail providers associated to the transaction
var results []*SyncResult
if results, err = _notifyPaymailProviders(ctx, transaction); err != nil {
if results, err = _notifyPaymailProviders(ctx, tx); err != nil {
_bailAndSaveSyncTransaction(
ctx, syncTx, SyncStatusReady, syncActionP2P, "", err.Error(),
)
Expand Down Expand Up @@ -339,45 +331,46 @@ func processP2PTransaction(ctx context.Context, syncTx *SyncTransaction, transac

// _notifyPaymailProviders will notify any associated Paymail providers
func _notifyPaymailProviders(ctx context.Context, transaction *Transaction) ([]*SyncResult, error) {
// First get the draft tx
draftTx, err := getDraftTransactionID(
ctx,
transaction.XPubID,
transaction.DraftID,
transaction.GetOptions(false)...,
)
if err != nil {
return nil, err
} else if draftTx == nil {
return nil, errors.New("draft not found: " + transaction.DraftID)
}

// Loop each output looking for paymail outputs
var attempts []*SyncResult
pm := transaction.Client().PaymailClient()
outputs := transaction.draftTransaction.Configuration.Outputs

notifiedReceivers := make([]string, 0)
results := make([]*SyncResult, len(outputs))

var payload *paymail.P2PTransactionPayload
var err error

for _, out := range draftTx.Configuration.Outputs {
if out.PaymailP4 != nil && out.PaymailP4.ResolutionType == ResolutionTypeP2P {

// Notify each provider with the transaction
if payload, err = finalizeP2PTransaction(
ctx,
pm,
out.PaymailP4,
transaction,
); err != nil {
return nil, err
}
attempts = append(attempts, &SyncResult{
Action: syncActionP2P,
ExecutedAt: time.Now().UTC(),
Provider: out.PaymailP4.ReceiveEndpoint,
StatusMessage: "success: " + payload.TxID,
})
for _, out := range outputs {
p4 := out.PaymailP4

if p4 == nil || p4.ResolutionType != ResolutionTypeP2P {
continue
}

receiver := fmt.Sprintf("%s@%s", p4.Alias, p4.Domain)
if contains(notifiedReceivers, func(x string) bool { return x == receiver }) {
continue // no need to send the same transaction to the same receiver second time
}

if payload, err = finalizeP2PTransaction(
ctx,
pm,
p4,
transaction,
); err != nil {
return nil, err
}

notifiedReceivers = append(notifiedReceivers, receiver)
results = append(results, &SyncResult{
Action: syncActionP2P,
ExecutedAt: time.Now().UTC(),
Provider: p4.ReceiveEndpoint,
StatusMessage: "success: " + payload.TxID,
})

}
return attempts, nil
return results, nil
}

// utils
Expand Down
14 changes: 14 additions & 0 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,17 @@ func recoverAndLog(log *zerolog.Logger) {
)
}
}

// finds the first element in a collection that satisfies a specified condition.
func find[E any](collection []E, predicate func(E) bool) *E {
for _, v := range collection {
if predicate(v) {
return &v
}
}
return nil
}

func contains[E any](collection []E, predicate func(E) bool) bool{
return find(collection, predicate) != nil
}

0 comments on commit 291a7fb

Please sign in to comment.