Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
feat(BUX-291): remove business logic from DB hooks / adjust BL to new…
Browse files Browse the repository at this point in the history
… requirements
  • Loading branch information
arkadiuszos4chain committed Nov 13, 2023
1 parent fbf0286 commit 69fdc32
Show file tree
Hide file tree
Showing 10 changed files with 187 additions and 282 deletions.
74 changes: 0 additions & 74 deletions db_model_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,80 +59,6 @@ func (m *Transaction) BeforeCreating(ctx context.Context) error {
return err
}

// m.xPubID is the xpub of the user registering the transaction
if len(m.XPubID) > 0 && len(m.DraftID) > 0 {
// Only get the draft if we haven't already
if m.draftTransaction == nil {
if m.draftTransaction, err = getDraftTransactionID(
ctx, m.XPubID, m.DraftID, m.GetOptions(false)...,
); err != nil {
return err
} else if m.draftTransaction == nil {
return ErrDraftNotFound
}
}
}

// Validations and broadcast config check
if m.draftTransaction != nil {

// No config set? Use the default from the client
if m.draftTransaction.Configuration.Sync == nil {
m.draftTransaction.Configuration.Sync = m.Client().DefaultSyncConfig()
}

// Create the sync transaction model
sync := newSyncTransaction(
m.GetID(),
m.draftTransaction.Configuration.Sync,
m.GetOptions(true)...,
)

// Found any p2p outputs?
p2pStatus := SyncStatusSkipped
if m.draftTransaction.Configuration.Outputs != nil {
for _, output := range m.draftTransaction.Configuration.Outputs {
if output.PaymailP4 != nil && output.PaymailP4.ResolutionType == ResolutionTypeP2P {
p2pStatus = SyncStatusPending
}
}
}
sync.P2PStatus = p2pStatus

// Use the same metadata
sync.Metadata = m.Metadata

// set this transaction on the sync transaction object. This is needed for the first broadcast
sync.transaction = m

// If all the options are skipped, do not make a new model (ignore the record)
if !sync.isSkipped() {
m.syncTransaction = sync
}
}

// If we are external and the user disabled incoming transaction checking, check outputs
if m.isExternal() && !m.Client().IsITCEnabled() {
// Check that the transaction has >= 1 known destination
if !m.TransactionBase.hasOneKnownDestination(ctx, m.Client(), m.GetOptions(false)...) {
return ErrNoMatchingOutputs
}
}

// Process the UTXOs
if err = m.processUtxos(ctx); err != nil {
return err
}

// Set the values from the inputs/outputs and draft tx
m.TotalValue, m.Fee = m.getValues()

// Add values if found
if m.TransactionBase.parsedTx != nil {
m.NumberOfInputs = uint32(len(m.TransactionBase.parsedTx.Inputs))
m.NumberOfOutputs = uint32(len(m.TransactionBase.parsedTx.Outputs))
}

m.DebugLog("end: " + m.Name() + " BeforeCreating hook")
m.beforeCreateCalled = true
return nil
Expand Down
119 changes: 69 additions & 50 deletions model_incoming_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ package bux

import (
"context"
"encoding/json"
"errors"
"fmt"
"runtime/debug"
"strings"
"time"

"github.com/BuxOrg/bux/chainstate"
Expand All @@ -31,21 +30,21 @@ type IncomingTransaction struct {
}

// newIncomingTransaction will start a new model
func newIncomingTransaction(txID, hex string, opts ...ModelOps) (tx *IncomingTransaction) {
func newIncomingTransaction(hex string, opts ...ModelOps) (tx *IncomingTransaction) {

// Create the model
tx = &IncomingTransaction{
Model: *NewBaseModel(ModelIncomingTransaction, opts...),
TransactionBase: TransactionBase{
ID: txID,
Hex: hex,
},
Status: SyncStatusReady,
}

// Attempt to parse
if len(hex) > 0 {
tx.TransactionBase.parsedTx, _ = bt.NewTxFromString(hex)
tx.parsedTx, _ = bt.NewTxFromString(hex)
tx.ID = tx.parsedTx.TxID()
}

return
Expand All @@ -54,7 +53,7 @@ func newIncomingTransaction(txID, hex string, opts ...ModelOps) (tx *IncomingTra
// getIncomingTransactionByID will get the incoming transactions to process
func getIncomingTransactionByID(ctx context.Context, id string, opts ...ModelOps) (*IncomingTransaction, error) {
// Construct an empty tx
tx := newIncomingTransaction("", "", opts...)
tx := newIncomingTransaction("", opts...)
tx.ID = id

// Get the record
Expand Down Expand Up @@ -128,6 +127,23 @@ func (m *IncomingTransaction) GetID() string {
return m.ID
}

func (m *IncomingTransaction) toTransactionDto() *Transaction {
t := Transaction{}
t.Hex = m.Hex

// @arkadiusz: check if we need to set these fields here
t.parsedTx = m.parsedTx
t.rawXpubKey = m.rawXpubKey
t.setXPubID()
t.setID()

t.Metadata = m.Metadata
t.NumberOfOutputs = uint32(len(m.parsedTx.Outputs))
t.NumberOfInputs = uint32(len(m.parsedTx.Inputs))

return &t
}

// BeforeCreating will fire before the model is being inserted into the Datastore
func (m *IncomingTransaction) BeforeCreating(ctx context.Context) error {
m.DebugLog("starting: [" + m.name.String() + "] BeforeCreating hook...")
Expand Down Expand Up @@ -189,7 +205,7 @@ func (m *IncomingTransaction) AfterCreated(ctx context.Context) error {

// todo: this should be refactored into a task
// go func(incomingTx *IncomingTransaction) {
if err := processIncomingTransaction(context.Background(), nil, m); err != nil {
if err := processIncomingTransaction(context.Background(), m.Client().Logger(), m); err != nil {
m.Client().Logger().Error(ctx, "error processing incoming transaction: "+err.Error())
}
// }(m)
Expand Down Expand Up @@ -274,21 +290,14 @@ func processIncomingTransactions(ctx context.Context, logClient zLogger.GormLogg
func processIncomingTransaction(ctx context.Context, logClient zLogger.GormLoggerInterface,
incomingTx *IncomingTransaction) error {

if logClient != nil {
logClient.Info(ctx, fmt.Sprintf("processing incoming transaction: %v", incomingTx))
if logClient == nil {
logClient = incomingTx.client.Logger()
}

logClient.Info(ctx, fmt.Sprintf("processIncomingTransaction(): transaction: %v", incomingTx))

// Successfully capture any panics, convert to readable string and log the error
defer func() {
if err := recover(); err != nil {
incomingTx.Client().Logger().Error(ctx,
fmt.Sprintf(
"panic: %v - stack trace: %v", err,
strings.ReplaceAll(string(debug.Stack()), "\n", ""),
),
)
}
}()
defer recoverAndLog(ctx, incomingTx.client.Logger())

// Create the lock and set the release for after the function completes
unlock, err := newWriteLock(
Expand All @@ -301,13 +310,11 @@ func processIncomingTransaction(ctx context.Context, logClient zLogger.GormLogge

// Find in mempool or on-chain
var txInfo *chainstate.TransactionInfo
if txInfo, err = incomingTx.Client().Chainstate().QueryTransactionFastest(
if txInfo, err = incomingTx.Client().Chainstate().QueryTransactionFastest( // @arkadiusz: why QyeryTransactionFastest() here? In syncTx we use QueryTransaction()
ctx, incomingTx.ID, chainstate.RequiredInMempool, defaultQueryTxTimeout,
); err != nil {

if logClient != nil {
logClient.Error(ctx, fmt.Sprintf("error finding transaction %s on chain: %s", incomingTx.ID, err.Error()))
}
logClient.Error(ctx, fmt.Sprintf("processIncomingTransaction(): error finding transaction %s on chain. Reason: %s", incomingTx.ID, err))

// TX might not have been broadcast yet? (race condition, or it was never broadcast...)
if errors.Is(err, chainstate.ErrTransactionNotFound) {
Expand All @@ -322,42 +329,54 @@ func processIncomingTransaction(ctx context.Context, logClient zLogger.GormLogge
}

// Broadcast was successful, so the transaction was accepted by the network, continue processing like before
if logClient != nil {
logClient.Info(ctx, fmt.Sprintf("broadcast of transaction was successful using %s", provider))
}
logClient.Info(ctx, fmt.Sprintf("processIncomingTransaction(): broadcast of transaction %s was successful using %s. Incoming tx will be processed again.", incomingTx.ID, provider))

// allow propagation
time.Sleep(3 * time.Second)
if txInfo, err = incomingTx.Client().Chainstate().QueryTransactionFastest(
ctx, incomingTx.ID, chainstate.RequiredInMempool, defaultQueryTxTimeout,
); err != nil {
incomingTx.Status = statusReady
incomingTx.StatusMessage = "tx was not found on-chain, attempting to broadcast using provider: " + provider
_ = incomingTx.Save(ctx)
return err
}
} else {
// Actual error occurred
bailAndSaveIncomingTransaction(ctx, incomingTx, err.Error())
return err
return nil // reprocess it when triggering the task again
}

// Actual error occurred
bailAndSaveIncomingTransaction(ctx, incomingTx, err.Error())
return err
}

if logClient != nil {
logClient.Info(ctx, fmt.Sprintf("found incoming transaction %s in %s", incomingTx.ID, txInfo.Provider))
// validate txInfo
if txInfo.BlockHash == "" || txInfo.MerkleProof == nil || txInfo.MerkleProof.TxOrID == "" || len(txInfo.MerkleProof.Nodes) == 0 {
logClient.Warn(ctx, fmt.Sprintf("processIncomingTransaction(): txInfo for %s is invalid, will try again later", incomingTx.ID))

if incomingTx.client.IsDebug() {
txInfoJSON, _ := json.Marshal(txInfo) //nolint:nolintlint,nilerr // error is not needed
incomingTx.DebugLog(string(txInfoJSON))
}
return nil
}

// Create the new transaction model
transaction := newTransactionFromIncomingTransaction(incomingTx)
logClient.Info(ctx, fmt.Sprintf("found incoming transaction %s in %s", incomingTx.ID, txInfo.Provider))

// Get the transaction by ID
if tx, _ := getTransactionByID(
ctx, transaction.rawXpubKey, transaction.TransactionBase.ID, transaction.client.DefaultModelOptions()...,
); tx != nil {
transaction = tx
// Check if we have transaction in DB already (@arkadiusz: it always should be false - to confirm in next refactorization iteration)
transaction, _ := getTransactionByID(
ctx, incomingTx.rawXpubKey, incomingTx.ID, incomingTx.client.DefaultModelOptions()...,
)

if transaction == nil {
// Create the new transaction model
transaction = newTransactionFromIncomingTransaction(incomingTx)

if err = transaction.processUtxos(ctx); err != nil {
logClient.Error(ctx, fmt.Sprintf("processIncomingTransaction(): processUtxos() for %s failed. Reason: %s", incomingTx.ID, err))
return err
}

// Set the values from the inputs/outputs and draft tx // @arkadiusz: why it's not inside ctor? investigate it
transaction.TotalValue, transaction.Fee = transaction.getValues()

// Set the fields // @arkadiusz: why it's not inside ctor? investigate it
transaction.NumberOfOutputs = uint32(len(transaction.TransactionBase.parsedTx.Outputs))
transaction.NumberOfInputs = uint32(len(transaction.TransactionBase.parsedTx.Inputs))
}
// Add additional information (if found on-chain)
transaction.BlockHeight = uint64(txInfo.BlockHeight)
transaction.BlockHash = txInfo.BlockHash

transaction.updateChainInfo(txInfo)

// Create status message
onChain := len(transaction.BlockHash) > 0 || transaction.BlockHeight > 0
Expand Down
13 changes: 1 addition & 12 deletions model_sync_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func newSyncTransaction(txID string, config *SyncConfig, opts ...ModelOps) *Sync
}

// Sync
ss := SyncStatusReady
ss := SyncStatusReady // @arkadiusz: as I understand without this we won't be able to determine if transaction is mined or not
if !config.SyncOnChain {
ss = SyncStatusSkipped
}
Expand Down Expand Up @@ -108,17 +108,6 @@ func (m *SyncTransaction) BeforeCreating(_ context.Context) error {
func (m *SyncTransaction) AfterCreated(ctx context.Context) error {
m.DebugLog("starting: " + m.Name() + " AfterCreated hook...")

// Should we broadcast immediately?
if m.Configuration.Broadcast &&
m.Configuration.BroadcastInstant {
if err := processBroadcastTransaction( // TODO: remove business logic
ctx, m,
); err != nil {
// return err (do not return and fail the record creation)
m.Client().Logger().Error(ctx, "error running broadcast tx: "+err.Error())
}
}

m.DebugLog("end: " + m.Name() + " AfterCreated hook")
return nil
}
Expand Down
20 changes: 15 additions & 5 deletions model_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package bux
import (
"context"

"github.com/BuxOrg/bux/chainstate"
"github.com/BuxOrg/bux/taskmanager"
"github.com/BuxOrg/bux/utils"
"github.com/libsv/go-bt/v2"
Expand Down Expand Up @@ -122,11 +123,6 @@ func newTransactionFromIncomingTransaction(incomingTx *IncomingTransaction) *Tra
_ = tx.setID()
}

// Set the fields
tx.NumberOfOutputs = uint32(len(tx.TransactionBase.parsedTx.Outputs))
tx.NumberOfInputs = uint32(len(tx.TransactionBase.parsedTx.Inputs))
tx.Status = statusProcessing

return tx
}

Expand Down Expand Up @@ -227,6 +223,20 @@ func (m *Transaction) isExternal() bool {
return m.draftTransaction == nil
}

func (m *Transaction) updateChainInfo(txInfo *chainstate.TransactionInfo) {
m.BlockHash = txInfo.BlockHash
m.BlockHeight = uint64(txInfo.BlockHeight)

if txInfo.MerkleProof != nil {
mp := MerkleProof(*txInfo.MerkleProof)
m.MerkleProof = mp

bump := mp.ToBUMP()
bump.BlockHeight = m.BlockHeight
m.BUMP = bump
}
}

// IsXpubAssociated will check if this key is associated to this transaction
func (m *Transaction) IsXpubAssociated(rawXpubKey string) bool {
// Hash the raw key
Expand Down
14 changes: 6 additions & 8 deletions record_tx_strategy_external_incoming_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ func _addTxToCheck(ctx context.Context, tx *externalIncomingTx, c ClientInterfac
return nil, fmt.Errorf("ExternalIncomingTx.Execute(): addind new IncomingTx to check queue failed. Reason: %w", err)
}

// TODO: ensure I don't need syncTransaction here

return newTransactionFromIncomingTransaction(incomingTx), nil // TODO: change incoming processing
result := incomingTx.toTransactionDto()
result.Status = statusProcessing
return result, nil
}

func _createExternalTxToRecord(ctx context.Context, eTx *externalIncomingTx, c ClientInterface, opts []ModelOps) (*Transaction, error) {
Expand All @@ -79,15 +79,13 @@ func _createExternalTxToRecord(ctx context.Context, eTx *externalIncomingTx, c C
return nil, ErrNoMatchingOutputs
}

// Process the UTXOs
if err := tx.processUtxos(ctx); err != nil {
return nil, err
}

// Set the values from the inputs/outputs and draft tx
tx.TotalValue, tx.Fee = tx.getValues()
tx.TotalValue, tx.Fee = tx.getValues() // @arkadiusz: why it's not inside ctor? investigate it

// Add values if found
// Add values if found // @arkadiusz: why it's not inside ctor? investigate it
if tx.TransactionBase.parsedTx != nil {
tx.NumberOfInputs = uint32(len(tx.TransactionBase.parsedTx.Inputs))
tx.NumberOfOutputs = uint32(len(tx.TransactionBase.parsedTx.Outputs))
Expand All @@ -113,7 +111,7 @@ func _hydrateExternalWithSync(tx *Transaction) {
// Use the same metadata
sync.Metadata = tx.Metadata

// @arkadiusz: my assumptium is we cannot skip sync here
// @arkadiusz: my assumption is we cannot skip sync here
// // If all the options are skipped, do not make a new model (ignore the record)
// if !sync.isSkipped() {
// m.syncTransaction = sync
Expand Down
Loading

0 comments on commit 69fdc32

Please sign in to comment.