Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
feat(BUX-291): implement strategies for tx recording
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadiuszos4chain committed Nov 13, 2023
1 parent 9bbbedf commit 9f23468
Show file tree
Hide file tree
Showing 4 changed files with 469 additions and 0 deletions.
98 changes: 98 additions & 0 deletions record_tx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package bux

import (
"context"
"fmt"
"time"

"github.com/libsv/go-bt"
)

type recordTxStrategy interface {
TxID() string
Validate() error
Execute(ctx context.Context, c ClientInterface, opts []ModelOps) (*Transaction, error)
}

type recordIncomingTxStrategy interface {
ForceBroadcast(force bool)
}

func recordTransaction(ctx context.Context, c ClientInterface, strategy recordTxStrategy, opts ...ModelOps) (*Transaction, error) {
unlock := waitForRecordTxWriteLock(ctx, c, strategy.TxID())
defer unlock()

transaction, err := strategy.Execute(ctx, c, opts)
return transaction, err
}

func getRecordTxStrategy(ctx context.Context, c ClientInterface, xPubKey, txHex, draftID string) (recordTxStrategy, error) {
var rts recordTxStrategy

if draftID != "" {
rts = &outgoingTx{
Hex: txHex,
RelatedDraftID: draftID,
XPubKey: xPubKey,
}
} else {
tx, err := getTransactionByHex(ctx, c, txHex)
if err != nil {
return nil, err
}

if tx != nil {
rts = &internalIncomingTx{
Tx: tx,
BroadcastNow: false,
}
} else {
rts = &externalIncomingTx{
Hex: txHex,
BroadcastNow: false,
}
}
}

if err := rts.Validate(); err != nil {
return nil, err
}

return rts, nil
}

func getTransactionByHex(ctx context.Context, c ClientInterface, hex string) (*Transaction, error) {
// @arkadiusz: maybe we should actually search by hex?
btTx, err := bt.NewTxFromString(hex)
if err != nil {
return nil, err
}

// Get the transaction by ID
transaction, err := getTransactionByID(
ctx, "", btTx.GetTxID(), c.DefaultModelOptions()...,
)

return transaction, err
}

func waitForRecordTxWriteLock(ctx context.Context, c ClientInterface, key string) func() {
var (
unlock func()
err error
)
// Create the lock and set the release for after the function completes
// Waits for the moment when the transaction is unlocked and creates a new lock
// Relevant for bux to bux transactions, as we have 1 tx but need to record 2 txs - outgoing and incoming
for {
unlock, err = newWriteLock(
ctx, fmt.Sprintf(lockKeyRecordTx, key), c.Cachestore(),
)
if err == nil {
break
}
time.Sleep(time.Second * 1)
}

return unlock
}
124 changes: 124 additions & 0 deletions record_tx_strategy_external_incoming_tx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package bux

import (
"context"
"fmt"

"github.com/libsv/go-bt/v2"
)

type externalIncomingTx struct {
Hex string
BroadcastNow bool // e.g. BEEF must be broadcasted now
}

func (tx *externalIncomingTx) Execute(ctx context.Context, c ClientInterface, opts []ModelOps) (*Transaction, error) {
// process
if !tx.BroadcastNow && c.IsITCEnabled() { // do not save transaction to database now, save IncomingTransaction instead and let task manager handle and process it
return _addTxToCheck(ctx, tx, c, opts)
}

transaction, err := _createExternalTxToRecord(ctx, tx, c, opts)

if err != nil {
return nil, fmt.Errorf("ExternalIncomingTx.Execute(): creation of external incoming tx failed. Reason: %w", err)
}

if transaction.syncTransaction.BroadcastStatus == SyncStatusReady {
if err = broadcastSyncTransaction(ctx, transaction.syncTransaction); err != nil {
// ignore error, transaction will be broadcaset by cron task - @arkadiusz: to chyba nie do końca prawda
transaction.client.Logger().
Warn(ctx, fmt.Sprintf("ExternalIncomingTx.Execute(): broadcasting failed. Reason: %s", err)) // TODO: add transaction info to log context
}
}

// record
if err = transaction.Save(ctx); err != nil {
return nil, fmt.Errorf("ExternalIncomingTx.Execute(): saving of Transaction failed. Reason: %w", err)
}

return transaction, nil
}

func (tx *externalIncomingTx) Validate() error {
if tx.Hex == "" {
return ErrMissingFieldHex
}

return nil // is valid
}

func (tx *externalIncomingTx) TxID() string {
btTx, _ := bt.NewTxFromString(tx.Hex)
return btTx.TxID()
}

func (tx *externalIncomingTx) ForceBroadcast(force bool) {
tx.BroadcastNow = force
}

func _addTxToCheck(ctx context.Context, tx *externalIncomingTx, c ClientInterface, opts []ModelOps) (*Transaction, error) {
incomingTx := newIncomingTransaction(tx.Hex, c.DefaultModelOptions(append(opts, New())...)...)

if err := incomingTx.Save(ctx); err != nil {
return nil, fmt.Errorf("ExternalIncomingTx.Execute(): addind new IncomingTx to check queue failed. Reason: %w", err)
}

// TODO: ensure I don't need syncTransaction here

return newTransactionFromIncomingTransaction(incomingTx), nil // TODO: change incoming processing
}

func _createExternalTxToRecord(ctx context.Context, eTx *externalIncomingTx, c ClientInterface, opts []ModelOps) (*Transaction, error) {
// Create NEW tx model
tx := newTransaction(eTx.Hex, c.DefaultModelOptions(append(opts, New())...)...)
_hydrateExternalWithSync(tx)

// Check that the transaction has >= 1 known destination
if !tx.TransactionBase.hasOneKnownDestination(ctx, c, tx.GetOptions(false)...) {
return nil, ErrNoMatchingOutputs
}

// Process the UTXOs
if err := tx.processUtxos(ctx); err != nil {
return nil, err
}

// Set the values from the inputs/outputs and draft tx
tx.TotalValue, tx.Fee = tx.getValues()

// Add values if found
if tx.TransactionBase.parsedTx != nil {
tx.NumberOfInputs = uint32(len(tx.TransactionBase.parsedTx.Inputs))
tx.NumberOfOutputs = uint32(len(tx.TransactionBase.parsedTx.Outputs))
}

return tx, nil
}

func _hydrateExternalWithSync(tx *Transaction) {
// Create the sync transaction model
sync := newSyncTransaction(
tx.ID,
tx.Client().DefaultSyncConfig(),
tx.GetOptions(true)...,
)

// to simplfy: broadcast every external incoming txs
sync.BroadcastStatus = SyncStatusReady

sync.P2PStatus = SyncStatusSkipped // The owner of the Tx should have already notified paymail providers
//sync.SyncStatus = SyncStatusReady

// Use the same metadata
sync.Metadata = tx.Metadata

// @arkadiusz: my assumptium is we cannot skip sync here
// // If all the options are skipped, do not make a new model (ignore the record)
// if !sync.isSkipped() {
// m.syncTransaction = sync
// }

sync.transaction = tx
tx.syncTransaction = sync
}
63 changes: 63 additions & 0 deletions record_tx_strategy_internal_incoming_tx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package bux

import (
"context"
"errors"
"fmt"
)

type internalIncomingTx struct {
Tx *Transaction
BroadcastNow bool // e.g. BEEF must be broadcasted now
}

func (tx *internalIncomingTx) Execute(ctx context.Context, c ClientInterface, opts []ModelOps) (*Transaction, error) {
// if I'm a classic - can I even be recorded this way?
// if I'm a paymail - I can be broadcasted but not have to, P2P providers are notified for sure

transaction := tx.Tx

// process
syncTx, err := GetSyncTransactionByID(ctx, transaction.ID, transaction.GetOptions(false)...)
if err != nil {
return nil, fmt.Errorf("InternalIncomingTx.Execute(): getting syncTx failed. Reason: %w", err)
}

if tx.BroadcastNow || syncTx.BroadcastStatus == SyncStatusReady {
syncTx.transaction = transaction
err := broadcastSyncTransaction(ctx, syncTx)
if err != nil {
transaction.client.Logger().
Warn(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): broadcasting failed. Reason: %s", err)) // TODO: add transaction info to log context

if syncTx.BroadcastStatus == SyncStatusPending { // revert status to ready after fail to re-run broadcasting
syncTx.BroadcastStatus = SyncStatusReady

if err = syncTx.Save(ctx); err != nil {
transaction.client.Logger().
Error(ctx, fmt.Sprintf("InternalIncomingTx.Execute(): changing synctx.BroadcastStatus from Pending to Ready failed. Reason: %s", err)) // TODO: add transaction info to log context
}
}

// ignore broadcast error - will be repeted by task manager
}
}

return transaction, nil
}

func (tx *internalIncomingTx) Validate() error {
if tx.Tx == nil {
return errors.New("Tx cannot be nil")
}

return nil // is valid
}

func (tx *internalIncomingTx) TxID() string {
return tx.Tx.ID
}

func (tx *internalIncomingTx) ForceBroadcast(force bool) {
tx.BroadcastNow = force
}
Loading

0 comments on commit 9f23468

Please sign in to comment.