diff --git a/record_tx_strategy_outgoing_tx.go b/record_tx_strategy_outgoing_tx.go index a9eaf2dd..edc7465a 100644 --- a/record_tx_strategy_outgoing_tx.go +++ b/record_tx_strategy_outgoing_tx.go @@ -192,7 +192,7 @@ func _outgoingNotifyP2p(ctx context.Context, logger *zerolog.Logger, tx *Transac Str("txID", tx.ID). Msg("start p2p") - if err := processP2PTransaction(ctx, tx.syncTransaction, tx); err != nil { + if err := processP2PTransaction(ctx, tx); err != nil { logger.Error(). Str("txID", tx.ID). Msgf("processP2PTransaction failed. Reason: %s", err) diff --git a/sync_tx_service.go b/sync_tx_service.go index 13a3a728..9f707d65 100644 --- a/sync_tx_service.go +++ b/sync_tx_service.go @@ -273,10 +273,11 @@ func _syncTxDataFromChain(ctx context.Context, syncTx *SyncTransaction, transact } // processP2PTransaction will process the sync transaction record, or save the failure -func processP2PTransaction(ctx context.Context, syncTx *SyncTransaction, transaction *Transaction) error { +func processP2PTransaction(ctx context.Context, tx *Transaction) error { // Successfully capture any panics, convert to readable string and log the error - defer recoverAndLog(syncTx.Client().Logger()) + defer recoverAndLog(tx.Client().Logger()) + syncTx := tx.syncTransaction // Create the lock and set the release for after the function completes unlock, err := newWriteLock( ctx, fmt.Sprintf(lockKeyProcessP2PTx, syncTx.GetID()), syncTx.Client().Cachestore(), @@ -286,17 +287,8 @@ func processP2PTransaction(ctx context.Context, syncTx *SyncTransaction, transac return err } - // Get the transaction - if transaction == nil { - if transaction, err = getTransactionByID( - ctx, "", syncTx.ID, syncTx.GetOptions(false)..., - ); err != nil { - return err - } - } - // No draft? - if len(transaction.DraftID) == 0 { + if len(tx.DraftID) == 0 { _bailAndSaveSyncTransaction( ctx, syncTx, SyncStatusComplete, syncActionP2P, "all", "no draft found, cannot complete p2p", ) @@ -305,7 +297,7 @@ func processP2PTransaction(ctx context.Context, syncTx *SyncTransaction, transac // Notify any P2P paymail providers associated to the transaction var results []*SyncResult - if results, err = _notifyPaymailProviders(ctx, transaction); err != nil { + if results, err = _notifyPaymailProviders(ctx, tx); err != nil { _bailAndSaveSyncTransaction( ctx, syncTx, SyncStatusReady, syncActionP2P, "", err.Error(), ) @@ -339,45 +331,46 @@ func processP2PTransaction(ctx context.Context, syncTx *SyncTransaction, transac // _notifyPaymailProviders will notify any associated Paymail providers func _notifyPaymailProviders(ctx context.Context, transaction *Transaction) ([]*SyncResult, error) { - // First get the draft tx - draftTx, err := getDraftTransactionID( - ctx, - transaction.XPubID, - transaction.DraftID, - transaction.GetOptions(false)..., - ) - if err != nil { - return nil, err - } else if draftTx == nil { - return nil, errors.New("draft not found: " + transaction.DraftID) - } - - // Loop each output looking for paymail outputs - var attempts []*SyncResult pm := transaction.Client().PaymailClient() + outputs := transaction.draftTransaction.Configuration.Outputs + + notifiedReceivers := make([]string, 0) + results := make([]*SyncResult, len(outputs)) + var payload *paymail.P2PTransactionPayload + var err error - for _, out := range draftTx.Configuration.Outputs { - if out.PaymailP4 != nil && out.PaymailP4.ResolutionType == ResolutionTypeP2P { - - // Notify each provider with the transaction - if payload, err = finalizeP2PTransaction( - ctx, - pm, - out.PaymailP4, - transaction, - ); err != nil { - return nil, err - } - attempts = append(attempts, &SyncResult{ - Action: syncActionP2P, - ExecutedAt: time.Now().UTC(), - Provider: out.PaymailP4.ReceiveEndpoint, - StatusMessage: "success: " + payload.TxID, - }) + for _, out := range outputs { + p4 := out.PaymailP4 + + if p4 == nil || p4.ResolutionType != ResolutionTypeP2P { + continue } + + receiver := fmt.Sprintf("%s@%s", p4.Alias, p4.Domain) + if contains(notifiedReceivers, func(x string) bool { return x == receiver }) { + continue // no need to send the same transaction to the same receiver second time + } + + if payload, err = finalizeP2PTransaction( + ctx, + pm, + p4, + transaction, + ); err != nil { + return nil, err + } + + notifiedReceivers = append(notifiedReceivers, receiver) + results = append(results, &SyncResult{ + Action: syncActionP2P, + ExecutedAt: time.Now().UTC(), + Provider: p4.ReceiveEndpoint, + StatusMessage: "success: " + payload.TxID, + }) + } - return attempts, nil + return results, nil } // utils diff --git a/utils.go b/utils.go index b9961e17..0d287eb2 100644 --- a/utils.go +++ b/utils.go @@ -15,3 +15,17 @@ func recoverAndLog(log *zerolog.Logger) { ) } } + +// finds the first element in a collection that satisfies a specified condition. +func find[E any](collection []E, predicate func(E) bool) *E { + for _, v := range collection { + if predicate(v) { + return &v + } + } + return nil +} + +func contains[E any](collection []E, predicate func(E) bool) bool{ + return find(collection, predicate) != nil +}