diff --git a/db_model_transactions.go b/db_model_transactions.go index 8ab4da8a..130edd07 100644 --- a/db_model_transactions.go +++ b/db_model_transactions.go @@ -59,80 +59,6 @@ func (m *Transaction) BeforeCreating(ctx context.Context) error { return err } - // m.xPubID is the xpub of the user registering the transaction - if len(m.XPubID) > 0 && len(m.DraftID) > 0 { - // Only get the draft if we haven't already - if m.draftTransaction == nil { - if m.draftTransaction, err = getDraftTransactionID( - ctx, m.XPubID, m.DraftID, m.GetOptions(false)..., - ); err != nil { - return err - } else if m.draftTransaction == nil { - return ErrDraftNotFound - } - } - } - - // Validations and broadcast config check - if m.draftTransaction != nil { - - // No config set? Use the default from the client - if m.draftTransaction.Configuration.Sync == nil { - m.draftTransaction.Configuration.Sync = m.Client().DefaultSyncConfig() - } - - // Create the sync transaction model - sync := newSyncTransaction( - m.GetID(), - m.draftTransaction.Configuration.Sync, - m.GetOptions(true)..., - ) - - // Found any p2p outputs? - p2pStatus := SyncStatusSkipped - if m.draftTransaction.Configuration.Outputs != nil { - for _, output := range m.draftTransaction.Configuration.Outputs { - if output.PaymailP4 != nil && output.PaymailP4.ResolutionType == ResolutionTypeP2P { - p2pStatus = SyncStatusPending - } - } - } - sync.P2PStatus = p2pStatus - - // Use the same metadata - sync.Metadata = m.Metadata - - // set this transaction on the sync transaction object. This is needed for the first broadcast - sync.transaction = m - - // If all the options are skipped, do not make a new model (ignore the record) - if !sync.isSkipped() { - m.syncTransaction = sync - } - } - - // If we are external and the user disabled incoming transaction checking, check outputs - if m.isExternal() && !m.Client().IsITCEnabled() { - // Check that the transaction has >= 1 known destination - if !m.TransactionBase.hasOneKnownDestination(ctx, m.Client(), m.GetOptions(false)...) { - return ErrNoMatchingOutputs - } - } - - // Process the UTXOs - if err = m.processUtxos(ctx); err != nil { - return err - } - - // Set the values from the inputs/outputs and draft tx - m.TotalValue, m.Fee = m.getValues() - - // Add values if found - if m.TransactionBase.parsedTx != nil { - m.NumberOfInputs = uint32(len(m.TransactionBase.parsedTx.Inputs)) - m.NumberOfOutputs = uint32(len(m.TransactionBase.parsedTx.Outputs)) - } - m.DebugLog("end: " + m.Name() + " BeforeCreating hook") m.beforeCreateCalled = true return nil diff --git a/model_incoming_transactions.go b/model_incoming_transactions.go index 72c7b39b..4e5e9b9d 100644 --- a/model_incoming_transactions.go +++ b/model_incoming_transactions.go @@ -2,10 +2,9 @@ package bux import ( "context" + "encoding/json" "errors" "fmt" - "runtime/debug" - "strings" "time" "github.com/BuxOrg/bux/chainstate" @@ -31,13 +30,12 @@ type IncomingTransaction struct { } // newIncomingTransaction will start a new model -func newIncomingTransaction(txID, hex string, opts ...ModelOps) (tx *IncomingTransaction) { +func newIncomingTransaction(hex string, opts ...ModelOps) (tx *IncomingTransaction) { // Create the model tx = &IncomingTransaction{ Model: *NewBaseModel(ModelIncomingTransaction, opts...), TransactionBase: TransactionBase{ - ID: txID, Hex: hex, }, Status: SyncStatusReady, @@ -45,7 +43,8 @@ func newIncomingTransaction(txID, hex string, opts ...ModelOps) (tx *IncomingTra // Attempt to parse if len(hex) > 0 { - tx.TransactionBase.parsedTx, _ = bt.NewTxFromString(hex) + tx.parsedTx, _ = bt.NewTxFromString(hex) + tx.ID = tx.parsedTx.TxID() } return @@ -54,7 +53,7 @@ func newIncomingTransaction(txID, hex string, opts ...ModelOps) (tx *IncomingTra // getIncomingTransactionByID will get the incoming transactions to process func getIncomingTransactionByID(ctx context.Context, id string, opts ...ModelOps) (*IncomingTransaction, error) { // Construct an empty tx - tx := newIncomingTransaction("", "", opts...) + tx := newIncomingTransaction("", opts...) tx.ID = id // Get the record @@ -128,6 +127,23 @@ func (m *IncomingTransaction) GetID() string { return m.ID } +func (m *IncomingTransaction) toTransactionDto() *Transaction { + t := Transaction{} + t.Hex = m.Hex + + // @arkadiusz: check if we need to set these fields here + t.parsedTx = m.parsedTx + t.rawXpubKey = m.rawXpubKey + t.setXPubID() + t.setID() + + t.Metadata = m.Metadata + t.NumberOfOutputs = uint32(len(m.parsedTx.Outputs)) + t.NumberOfInputs = uint32(len(m.parsedTx.Inputs)) + + return &t +} + // BeforeCreating will fire before the model is being inserted into the Datastore func (m *IncomingTransaction) BeforeCreating(ctx context.Context) error { m.DebugLog("starting: [" + m.name.String() + "] BeforeCreating hook...") @@ -189,7 +205,7 @@ func (m *IncomingTransaction) AfterCreated(ctx context.Context) error { // todo: this should be refactored into a task // go func(incomingTx *IncomingTransaction) { - if err := processIncomingTransaction(context.Background(), nil, m); err != nil { + if err := processIncomingTransaction(context.Background(), m.Client().Logger(), m); err != nil { m.Client().Logger().Error(ctx, "error processing incoming transaction: "+err.Error()) } // }(m) @@ -274,21 +290,14 @@ func processIncomingTransactions(ctx context.Context, logClient zLogger.GormLogg func processIncomingTransaction(ctx context.Context, logClient zLogger.GormLoggerInterface, incomingTx *IncomingTransaction) error { - if logClient != nil { - logClient.Info(ctx, fmt.Sprintf("processing incoming transaction: %v", incomingTx)) + if logClient == nil { + logClient = incomingTx.client.Logger() } + logClient.Info(ctx, fmt.Sprintf("processIncomingTransaction(): transaction: %v", incomingTx)) + // Successfully capture any panics, convert to readable string and log the error - defer func() { - if err := recover(); err != nil { - incomingTx.Client().Logger().Error(ctx, - fmt.Sprintf( - "panic: %v - stack trace: %v", err, - strings.ReplaceAll(string(debug.Stack()), "\n", ""), - ), - ) - } - }() + defer recoverAndLog(ctx, incomingTx.client.Logger()) // Create the lock and set the release for after the function completes unlock, err := newWriteLock( @@ -301,13 +310,11 @@ func processIncomingTransaction(ctx context.Context, logClient zLogger.GormLogge // Find in mempool or on-chain var txInfo *chainstate.TransactionInfo - if txInfo, err = incomingTx.Client().Chainstate().QueryTransactionFastest( + if txInfo, err = incomingTx.Client().Chainstate().QueryTransactionFastest( // @arkadiusz: why QyeryTransactionFastest() here? In syncTx we use QueryTransaction() ctx, incomingTx.ID, chainstate.RequiredInMempool, defaultQueryTxTimeout, ); err != nil { - if logClient != nil { - logClient.Error(ctx, fmt.Sprintf("error finding transaction %s on chain: %s", incomingTx.ID, err.Error())) - } + logClient.Error(ctx, fmt.Sprintf("processIncomingTransaction(): error finding transaction %s on chain. Reason: %s", incomingTx.ID, err)) // TX might not have been broadcast yet? (race condition, or it was never broadcast...) if errors.Is(err, chainstate.ErrTransactionNotFound) { @@ -322,42 +329,54 @@ func processIncomingTransaction(ctx context.Context, logClient zLogger.GormLogge } // Broadcast was successful, so the transaction was accepted by the network, continue processing like before - if logClient != nil { - logClient.Info(ctx, fmt.Sprintf("broadcast of transaction was successful using %s", provider)) - } + logClient.Info(ctx, fmt.Sprintf("processIncomingTransaction(): broadcast of transaction %s was successful using %s. Incoming tx will be processed again.", incomingTx.ID, provider)) + // allow propagation time.Sleep(3 * time.Second) - if txInfo, err = incomingTx.Client().Chainstate().QueryTransactionFastest( - ctx, incomingTx.ID, chainstate.RequiredInMempool, defaultQueryTxTimeout, - ); err != nil { - incomingTx.Status = statusReady - incomingTx.StatusMessage = "tx was not found on-chain, attempting to broadcast using provider: " + provider - _ = incomingTx.Save(ctx) - return err - } - } else { - // Actual error occurred - bailAndSaveIncomingTransaction(ctx, incomingTx, err.Error()) - return err + return nil // reprocess it when triggering the task again } + + // Actual error occurred + bailAndSaveIncomingTransaction(ctx, incomingTx, err.Error()) + return err } - if logClient != nil { - logClient.Info(ctx, fmt.Sprintf("found incoming transaction %s in %s", incomingTx.ID, txInfo.Provider)) + // validate txInfo + if txInfo.BlockHash == "" || txInfo.MerkleProof == nil || txInfo.MerkleProof.TxOrID == "" || len(txInfo.MerkleProof.Nodes) == 0 { + logClient.Warn(ctx, fmt.Sprintf("processIncomingTransaction(): txInfo for %s is invalid, will try again later", incomingTx.ID)) + + if incomingTx.client.IsDebug() { + txInfoJSON, _ := json.Marshal(txInfo) //nolint:nolintlint,nilerr // error is not needed + incomingTx.DebugLog(string(txInfoJSON)) + } + return nil } - // Create the new transaction model - transaction := newTransactionFromIncomingTransaction(incomingTx) + logClient.Info(ctx, fmt.Sprintf("found incoming transaction %s in %s", incomingTx.ID, txInfo.Provider)) - // Get the transaction by ID - if tx, _ := getTransactionByID( - ctx, transaction.rawXpubKey, transaction.TransactionBase.ID, transaction.client.DefaultModelOptions()..., - ); tx != nil { - transaction = tx + // Check if we have transaction in DB already (@arkadiusz: it always should be false - to confirm in next refactorization iteration) + transaction, _ := getTransactionByID( + ctx, incomingTx.rawXpubKey, incomingTx.ID, incomingTx.client.DefaultModelOptions()..., + ) + + if transaction == nil { + // Create the new transaction model + transaction = newTransactionFromIncomingTransaction(incomingTx) + + if err = transaction.processUtxos(ctx); err != nil { + logClient.Error(ctx, fmt.Sprintf("processIncomingTransaction(): processUtxos() for %s failed. Reason: %s", incomingTx.ID, err)) + return err + } + + // Set the values from the inputs/outputs and draft tx // @arkadiusz: why it's not inside ctor? investigate it + transaction.TotalValue, transaction.Fee = transaction.getValues() + + // Set the fields // @arkadiusz: why it's not inside ctor? investigate it + transaction.NumberOfOutputs = uint32(len(transaction.TransactionBase.parsedTx.Outputs)) + transaction.NumberOfInputs = uint32(len(transaction.TransactionBase.parsedTx.Inputs)) } - // Add additional information (if found on-chain) - transaction.BlockHeight = uint64(txInfo.BlockHeight) - transaction.BlockHash = txInfo.BlockHash + + transaction.updateChainInfo(txInfo) // Create status message onChain := len(transaction.BlockHash) > 0 || transaction.BlockHeight > 0 diff --git a/model_sync_transactions.go b/model_sync_transactions.go index ab77d261..5031f28f 100644 --- a/model_sync_transactions.go +++ b/model_sync_transactions.go @@ -49,7 +49,7 @@ func newSyncTransaction(txID string, config *SyncConfig, opts ...ModelOps) *Sync } // Sync - ss := SyncStatusReady + ss := SyncStatusReady // @arkadiusz: as I understand without this we won't be able to determine if transaction is mined or not if !config.SyncOnChain { ss = SyncStatusSkipped } @@ -108,17 +108,6 @@ func (m *SyncTransaction) BeforeCreating(_ context.Context) error { func (m *SyncTransaction) AfterCreated(ctx context.Context) error { m.DebugLog("starting: " + m.Name() + " AfterCreated hook...") - // Should we broadcast immediately? - if m.Configuration.Broadcast && - m.Configuration.BroadcastInstant { - if err := processBroadcastTransaction( // TODO: remove business logic - ctx, m, - ); err != nil { - // return err (do not return and fail the record creation) - m.Client().Logger().Error(ctx, "error running broadcast tx: "+err.Error()) - } - } - m.DebugLog("end: " + m.Name() + " AfterCreated hook") return nil } diff --git a/model_transactions.go b/model_transactions.go index 8bfef403..f6391ecc 100644 --- a/model_transactions.go +++ b/model_transactions.go @@ -3,6 +3,7 @@ package bux import ( "context" + "github.com/BuxOrg/bux/chainstate" "github.com/BuxOrg/bux/taskmanager" "github.com/BuxOrg/bux/utils" "github.com/libsv/go-bt/v2" @@ -122,11 +123,6 @@ func newTransactionFromIncomingTransaction(incomingTx *IncomingTransaction) *Tra _ = tx.setID() } - // Set the fields - tx.NumberOfOutputs = uint32(len(tx.TransactionBase.parsedTx.Outputs)) - tx.NumberOfInputs = uint32(len(tx.TransactionBase.parsedTx.Inputs)) - tx.Status = statusProcessing - return tx } @@ -227,6 +223,20 @@ func (m *Transaction) isExternal() bool { return m.draftTransaction == nil } +func (m *Transaction) updateChainInfo(txInfo *chainstate.TransactionInfo) { + m.BlockHash = txInfo.BlockHash + m.BlockHeight = uint64(txInfo.BlockHeight) + + if txInfo.MerkleProof != nil { + mp := MerkleProof(*txInfo.MerkleProof) + m.MerkleProof = mp + + bump := mp.ToBUMP() + bump.BlockHeight = m.BlockHeight + m.BUMP = bump + } +} + // IsXpubAssociated will check if this key is associated to this transaction func (m *Transaction) IsXpubAssociated(rawXpubKey string) bool { // Hash the raw key diff --git a/record_tx_strategy_external_incoming_tx.go b/record_tx_strategy_external_incoming_tx.go index ab972d77..ddd0e7b3 100644 --- a/record_tx_strategy_external_incoming_tx.go +++ b/record_tx_strategy_external_incoming_tx.go @@ -64,9 +64,9 @@ func _addTxToCheck(ctx context.Context, tx *externalIncomingTx, c ClientInterfac return nil, fmt.Errorf("ExternalIncomingTx.Execute(): addind new IncomingTx to check queue failed. Reason: %w", err) } - // TODO: ensure I don't need syncTransaction here - - return newTransactionFromIncomingTransaction(incomingTx), nil // TODO: change incoming processing + result := incomingTx.toTransactionDto() + result.Status = statusProcessing + return result, nil } func _createExternalTxToRecord(ctx context.Context, eTx *externalIncomingTx, c ClientInterface, opts []ModelOps) (*Transaction, error) { @@ -79,15 +79,13 @@ func _createExternalTxToRecord(ctx context.Context, eTx *externalIncomingTx, c C return nil, ErrNoMatchingOutputs } - // Process the UTXOs if err := tx.processUtxos(ctx); err != nil { return nil, err } - // Set the values from the inputs/outputs and draft tx - tx.TotalValue, tx.Fee = tx.getValues() + tx.TotalValue, tx.Fee = tx.getValues() // @arkadiusz: why it's not inside ctor? investigate it - // Add values if found + // Add values if found // @arkadiusz: why it's not inside ctor? investigate it if tx.TransactionBase.parsedTx != nil { tx.NumberOfInputs = uint32(len(tx.TransactionBase.parsedTx.Inputs)) tx.NumberOfOutputs = uint32(len(tx.TransactionBase.parsedTx.Outputs)) @@ -113,7 +111,7 @@ func _hydrateExternalWithSync(tx *Transaction) { // Use the same metadata sync.Metadata = tx.Metadata - // @arkadiusz: my assumptium is we cannot skip sync here + // @arkadiusz: my assumption is we cannot skip sync here // // If all the options are skipped, do not make a new model (ignore the record) // if !sync.isSkipped() { // m.syncTransaction = sync diff --git a/record_tx_strategy_outgoing_tx.go b/record_tx_strategy_outgoing_tx.go index d5d7cf3e..937d9bf5 100644 --- a/record_tx_strategy_outgoing_tx.go +++ b/record_tx_strategy_outgoing_tx.go @@ -81,15 +81,13 @@ func _createOutgoingTxToRecord(ctx context.Context, oTx *outgoingTx, c ClientInt _hydrateOutgoingWithSync(tx) - // Process the UTXOs if err := tx.processUtxos(ctx); err != nil { return nil, err } - // Set the values from the inputs/outputs and draft tx - tx.TotalValue, tx.Fee = tx.getValues() + tx.TotalValue, tx.Fee = tx.getValues() // @arkadiusz: why it's not inside ctor? investigate it - // Add values if found + // Add values if found // @arkadiusz: why it's not inside ctor? investigate it if tx.TransactionBase.parsedTx != nil { tx.NumberOfInputs = uint32(len(tx.TransactionBase.parsedTx.Inputs)) tx.NumberOfOutputs = uint32(len(tx.TransactionBase.parsedTx.Outputs)) @@ -137,7 +135,7 @@ func _hydrateOutgoingWithSync(tx *Transaction) { sync.transaction = tx tx.syncTransaction = sync - // @arkadiusz: my assumptium is we cannot skip sync here + // @arkadiusz: my assumption is we cannot skip sync here // // If all the options are skipped, do not make a new model (ignore the record) // if !sync.isSkipped() { // m.syncTransaction = sync diff --git a/sync_tx_repository.go b/sync_tx_repository.go index 67702325..6b6aee47 100644 --- a/sync_tx_repository.go +++ b/sync_tx_repository.go @@ -37,9 +37,9 @@ func GetSyncTransactionByID(ctx context.Context, id string, opts ...ModelOps) (* // getTransactionsToBroadcast will get the sync transactions to broadcast func getTransactionsToBroadcast(ctx context.Context, queryParams *datastore.QueryParams, opts ...ModelOps, -) (map[string][]*SyncTransaction, error) { +) ([]*SyncTransaction, error) { // Get the records by status - txs, err := _getSyncTransactionsByConditions( + scTxs, err := _getSyncTransactionsByConditions( ctx, map[string]interface{}{ broadcastStatusField: SyncStatusReady.String(), @@ -48,19 +48,25 @@ func getTransactionsToBroadcast(ctx context.Context, queryParams *datastore.Quer ) if err != nil { return nil, err + } else if len(scTxs) == 0 { + return nil, nil } - // group transactions by xpub and return including the tx itself - txsByXpub := make(map[string][]*SyncTransaction) - for _, tx := range txs { - if tx.transaction, err = getTransactionByID( - ctx, "", tx.ID, opts..., - ); err != nil { + // hydrate and see if it's ready to sync + res := make([]*SyncTransaction, 0, len(scTxs)) + + for _, sTx := range scTxs { + // hydrate + sTx.transaction, err = getTransactionByID( + ctx, "", sTx.ID, opts..., + ) + if err != nil { return nil, err + } else if sTx.transaction == nil { + return nil, ErrMissingTransaction // TODO: think about context in error } - var parentsBroadcast bool - parentsBroadcast, err = _areParentsBroadcast(ctx, tx, opts...) + parentsBroadcast, err := _areParentsBroadcasted(ctx, sTx.transaction, opts...) if err != nil { return nil, err } @@ -70,20 +76,10 @@ func getTransactionsToBroadcast(ctx context.Context, queryParams *datastore.Quer continue } - xPubID := "" // fallback if we have no input xpubs - if len(tx.transaction.XpubInIDs) > 0 { - // use the first xpub for the grouping - // in most cases when we are broadcasting, there should be only 1 xpub in - xPubID = tx.transaction.XpubInIDs[0] - } - - if txsByXpub[xPubID] == nil { - txsByXpub[xPubID] = make([]*SyncTransaction, 0) - } - txsByXpub[xPubID] = append(txsByXpub[xPubID], tx) + res = append(res, sTx) } - return txsByXpub, nil + return res, nil } // getTransactionsToSync will get the sync transactions to sync @@ -160,25 +156,15 @@ func _getSyncTransactionsByConditions(ctx context.Context, conditions map[string return txs, nil } -func _areParentsBroadcast(ctx context.Context, syncTx *SyncTransaction, opts ...ModelOps) (bool, error) { - tx, err := getTransactionByID(ctx, "", syncTx.ID, opts...) - if err != nil { - return false, err - } - - if tx == nil { - return false, ErrMissingTransaction - } - +func _areParentsBroadcasted(ctx context.Context, tx *Transaction, opts ...ModelOps) (bool, error) { // get the sync transaction of all inputs - var btTx *bt.Tx - btTx, err = bt.NewTxFromString(tx.Hex) + btTx, err := bt.NewTxFromString(tx.Hex) if err != nil { return false, err } // check that all inputs we handled have been broadcast, or are not handled by Bux - parentsBroadcast := true + parentsBroadcasted := true for _, input := range btTx.Inputs { var parentTx *SyncTransaction previousTxID := hex.EncodeToString(bt.ReverseBytes(input.PreviousTxID())) @@ -188,9 +174,9 @@ func _areParentsBroadcast(ctx context.Context, syncTx *SyncTransaction, opts ... } // if we have a sync transaction, and it is not complete, then we cannot broadcast if parentTx != nil && parentTx.BroadcastStatus != SyncStatusComplete { - parentsBroadcast = false + parentsBroadcasted = false } } - return parentsBroadcast, nil + return parentsBroadcasted, nil } diff --git a/sync_tx_service.go b/sync_tx_service.go index c4c9363b..437ce46f 100644 --- a/sync_tx_service.go +++ b/sync_tx_service.go @@ -7,8 +7,6 @@ import ( "errors" "fmt" "runtime" - "runtime/debug" - "strings" "sync" "time" @@ -60,20 +58,20 @@ func processBroadcastTransactions(ctx context.Context, maxTransactions int, opts } // Get maxTransactions records, grouped by xpub - txsByXpub, err := getTransactionsToBroadcast( - ctx, queryParams, opts..., - ) + snTxs, err := getTransactionsToBroadcast(ctx, queryParams, opts...) if err != nil { return err - } else if len(txsByXpub) == 0 { + } else if len(snTxs) == 0 { return nil } - wg := new(sync.WaitGroup) + // Process the transactions per xpub, in parallel + txsByXpub := _groupByXpub(snTxs) + // we limit the number of concurrent broadcasts to the number of cpus*2, since there is lots of IO wait limit := make(chan bool, runtime.NumCPU()*2) + wg := new(sync.WaitGroup) - // Process the transactions per xpub, in parallel for xPubID := range txsByXpub { limit <- true // limit the number of routines running at the same time wg.Add(1) @@ -82,9 +80,10 @@ func processBroadcastTransactions(ctx context.Context, maxTransactions int, opts defer func() { <-limit }() for _, tx := range txsByXpub[xPubID] { - if err = processBroadcastTransaction( + if err = broadcastSyncTransaction( ctx, tx, ); err != nil { + // TODO: do not rely on "model" logger tx.Client().Logger().Error(ctx, fmt.Sprintf("error running broadcast tx for xpub %s, tx %s: %s", xPubID, tx.ID, err.Error()), ) @@ -129,19 +128,10 @@ func processP2PTransactions(ctx context.Context, maxTransactions int, opts ...Mo return nil } -// processBroadcastTransaction will process a sync transaction record and broadcast it -func processBroadcastTransaction(ctx context.Context, syncTx *SyncTransaction) error { +// broadcastSyncTransaction will process a sync transaction record and broadcast it +func broadcastSyncTransaction(ctx context.Context, syncTx *SyncTransaction) error { // Successfully capture any panics, convert to readable string and log the error - defer func() { - if err := recover(); err != nil { - syncTx.Client().Logger().Error(ctx, - fmt.Sprintf( - "panic: %v - stack trace: %v", err, - strings.ReplaceAll(string(debug.Stack()), "\n", ""), - ), - ) - } - }() + defer recoverAndLog(ctx, syncTx.client.Logger()) // Create the lock and set the release for after the function completes unlock, err := newWriteLock( @@ -152,31 +142,26 @@ func processBroadcastTransaction(ctx context.Context, syncTx *SyncTransaction) e return err } - // Get the transaction - var transaction *Transaction - var incomingTransaction *IncomingTransaction + // Get the transaction transaction HEX var txHex string if syncTx.transaction != nil && syncTx.transaction.Hex != "" { // the transaction has already been retrieved and added to the syncTx object, just use that - transaction = syncTx.transaction - txHex = transaction.Hex + txHex = syncTx.transaction.Hex } else { - if transaction, err = getTransactionByID( + // else get hex from DB + transaction, err := getTransactionByID( ctx, "", syncTx.ID, syncTx.GetOptions(false)..., - ); err != nil { + ) + + if err != nil { return err - } else if transaction == nil { - // maybe this is only an incoming transaction, let's try to find that and broadcast - // the processing of incoming transactions should then pick it up in the next job run - if incomingTransaction, err = getIncomingTransactionByID(ctx, syncTx.ID, syncTx.GetOptions(false)...); err != nil { - return err - } else if incomingTransaction == nil { - return errors.New("transaction was expected but not found, using ID: " + syncTx.ID) - } - txHex = incomingTransaction.Hex - } else { - txHex = transaction.Hex } + + if transaction == nil { + return errors.New("transaction was expected but not found, using ID: " + syncTx.ID) + } + + txHex = transaction.Hex } // Broadcast @@ -193,19 +178,6 @@ func processBroadcastTransaction(ctx context.Context, syncTx *SyncTransaction) e // Create status message message := "broadcast success" - // process the incoming transaction before finishing the sync - if incomingTransaction != nil { - // give the transaction some time to propagate through the network - time.Sleep(3 * time.Second) - - // we don't need to handle the error here, this is only to speed up the processing - // job will pick it up later if needed - if err = processIncomingTransaction(ctx, nil, incomingTransaction); err == nil { - // again ignore the error, if an error occurs the transaction will be set and will be processed further - transaction, _ = getTransactionByID(ctx, "", syncTx.ID, syncTx.GetOptions(false)...) - } - } - // Update the sync information syncTx.BroadcastStatus = SyncStatusComplete syncTx.Results.LastMessage = message @@ -244,16 +216,6 @@ func processBroadcastTransaction(ctx context.Context, syncTx *SyncTransaction) e // Fire a notification notify(notifications.EventTypeBroadcast, syncTx) - // Notify any P2P paymail providers associated to the transaction - // but only if we actually found the transaction in the transactions' collection, otherwise this was an incoming - // transaction that needed to be broadcast and was not successfully processed after the broadcast - if transaction != nil { - if syncTx.P2PStatus == SyncStatusReady { - return _processP2PTransaction(ctx, syncTx, transaction) - } else if syncTx.P2PStatus == SyncStatusSkipped && syncTx.SyncStatus == SyncStatusReady { - return _processSyncTransaction(ctx, syncTx, transaction) - } - } return nil } @@ -262,16 +224,7 @@ func processBroadcastTransaction(ctx context.Context, syncTx *SyncTransaction) e // _processSyncTransaction will process the sync transaction record, or save the failure func _processSyncTransaction(ctx context.Context, syncTx *SyncTransaction, transaction *Transaction) error { // Successfully capture any panics, convert to readable string and log the error - defer func() { - if err := recover(); err != nil { - syncTx.Client().Logger().Error(ctx, - fmt.Sprintf( - "panic: %v - stack trace: %v", err, - strings.ReplaceAll(string(debug.Stack()), "\n", ""), - ), - ) - } - }() + defer recoverAndLog(ctx, syncTx.client.Logger()) // Create the lock and set the release for after the function completes unlock, err := newWriteLock( @@ -323,13 +276,7 @@ func _processSyncTransaction(ctx context.Context, syncTx *SyncTransaction, trans return nil } - // Add additional information (if found on-chain) - transaction.BlockHash = txInfo.BlockHash - transaction.BlockHeight = uint64(txInfo.BlockHeight) - transaction.MerkleProof = MerkleProof(*txInfo.MerkleProof) - bump := transaction.MerkleProof.ToBUMP() - bump.BlockHeight = transaction.BlockHeight - transaction.BUMP = bump + transaction.updateChainInfo(txInfo) // Create status message message := "transaction was found on-chain by " + chainstate.ProviderBroadcastClient @@ -366,16 +313,7 @@ func _processSyncTransaction(ctx context.Context, syncTx *SyncTransaction, trans // _processP2PTransaction will process the sync transaction record, or save the failure func _processP2PTransaction(ctx context.Context, syncTx *SyncTransaction, transaction *Transaction) error { // Successfully capture any panics, convert to readable string and log the error - defer func() { - if err := recover(); err != nil { - syncTx.Client().Logger().Error(ctx, - fmt.Sprintf( - "panic: %v - stack trace: %v", err, - strings.ReplaceAll(string(debug.Stack()), "\n", ""), - ), - ) - } - }() + defer recoverAndLog(ctx, syncTx.client.Logger()) // Create the lock and set the release for after the function completes unlock, err := newWriteLock( @@ -476,6 +414,27 @@ func _notifyPaymailProviders(ctx context.Context, transaction *Transaction) ([]* // utils +func _groupByXpub(scTxs []*SyncTransaction) map[string][]*SyncTransaction { + txsByXpub := make(map[string][]*SyncTransaction) + + // group transactions by xpub and return including the tx itself + for _, tx := range scTxs { + xPubID := "" // fallback if we have no input xpubs + if len(tx.transaction.XpubInIDs) > 0 { + // use the first xpub for the grouping + // in most cases when we are broadcasting, there should be only 1 xpub in + xPubID = tx.transaction.XpubInIDs[0] + } + + if txsByXpub[xPubID] == nil { + txsByXpub[xPubID] = make([]*SyncTransaction, 0) + } + txsByXpub[xPubID] = append(txsByXpub[xPubID], tx) + } + + return txsByXpub +} + // _bailAndSaveSyncTransaction will save the error message for a sync tx func _bailAndSaveSyncTransaction(ctx context.Context, syncTx *SyncTransaction, status SyncStatus, action, provider, message string, diff --git a/tx_service.go b/tx_service.go index 259d84f8..6e814e6b 100644 --- a/tx_service.go +++ b/tx_service.go @@ -195,8 +195,7 @@ func _processTransaction(ctx context.Context, transaction *Transaction) error { return err } - transaction.BlockHash = txInfo.BlockHash - transaction.BlockHeight = uint64(txInfo.BlockHeight) + transaction.updateChainInfo(txInfo) return transaction.Save(ctx) } diff --git a/utils.go b/utils.go new file mode 100644 index 00000000..67be2791 --- /dev/null +++ b/utils.go @@ -0,0 +1,21 @@ +package bux + +import ( + "context" + "fmt" + "runtime/debug" + "strings" + + zLogger "github.com/mrz1836/go-logger" +) + +func recoverAndLog(ctx context.Context, log zLogger.GormLoggerInterface) { + if err := recover(); err != nil { + log.Error(ctx, + fmt.Sprintf( + "panic: %v - stack trace: %v", err, + strings.ReplaceAll(string(debug.Stack()), "\n", ""), + ), + ) + } +}