Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
feat: add transaction check task
Browse files Browse the repository at this point in the history
  • Loading branch information
pawellewandowski98 committed Jul 31, 2023
1 parent 9e32289 commit c9599fb
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 1 deletion.
1 change: 1 addition & 0 deletions client_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func defaultClientOptions() *clientOptions {
ModelSyncTransaction.String() + "_" + syncActionBroadcast: taskIntervalSyncActionBroadcast,
ModelSyncTransaction.String() + "_" + syncActionP2P: taskIntervalSyncActionP2P,
ModelSyncTransaction.String() + "_" + syncActionSync: taskIntervalSyncActionSync,
ModelTransaction.String() + "_" + TransactionActionCheck: taskIntervalTransactionCheck,
},
},

Expand Down
2 changes: 2 additions & 0 deletions definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
taskIntervalSyncActionBroadcast = 30 * time.Second // Default task time for cron jobs (seconds)
taskIntervalSyncActionP2P = 35 * time.Second // Default task time for cron jobs (seconds)
taskIntervalSyncActionSync = 40 * time.Second // Default task time for cron jobs (seconds)
taskIntervalTransactionCheck = 60 * time.Second // Default task time for cron jobs (seconds)
)

// All the base models
Expand Down Expand Up @@ -108,6 +109,7 @@ const (
typeField = "type"
xPubIDField = "xpub_id"
xPubMetadataField = "xpub_metadata"
blockHeightField = "block_height"

// Universal statuses
statusCanceled = "canceled"
Expand Down
6 changes: 6 additions & 0 deletions model_transaction_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ const (
ChangeStrategyNominations ChangeStrategy = "nominations"
)

// Types of Transaction actions
const (
// TransactionActionCheck Get on-chain data about the transaction which have height == 0(IE: block hash, height, etc)
TransactionActionCheck = "check"
)

// ScriptOutput is the actual script record (could be several for one output record)
type ScriptOutput struct {
Address string `json:"address,omitempty"` // Hex encoded locking script
Expand Down
143 changes: 142 additions & 1 deletion model_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import (
"context"
"encoding/hex"
"errors"

"github.com/BuxOrg/bux/chainstate"
"github.com/BuxOrg/bux/notifications"
"github.com/BuxOrg/bux/taskmanager"
"github.com/BuxOrg/bux/utils"
"github.com/libsv/go-bt/v2"
"github.com/mrz1836/go-datastore"
Expand Down Expand Up @@ -907,3 +908,143 @@ func (m *TransactionBase) hasOneKnownDestination(ctx context.Context, client Cli
}
return false
}

// RegisterTasks will register the model specific tasks on client initialization
func (m *Transaction) RegisterTasks() error {
// No task manager loaded?
tm := m.Client().Taskmanager()
if tm == nil {
return nil
}

ctx := context.Background()
checkTask := m.Name() + "_" + TransactionActionCheck

if err := tm.RegisterTask(&taskmanager.Task{
Name: checkTask,
RetryLimit: 1,
Handler: func(client ClientInterface) error {
if taskErr := taskCheckTransactions(ctx, client.Logger(), WithClient(client)); taskErr != nil {
client.Logger().Error(ctx, "error running "+checkTask+" task: "+taskErr.Error())
}
return nil
},
}); err != nil {
return err
}

return tm.RunTask(ctx, &taskmanager.TaskOptions{
Arguments: []interface{}{m.Client()},
RunEveryPeriod: m.Client().GetTaskPeriod(checkTask),
TaskName: checkTask,
})
}

// processTransactions will process transaction records
func processTransactions(ctx context.Context, maxTransactions int, opts ...ModelOps) error {
queryParams := &datastore.QueryParams{
Page: 1,
PageSize: maxTransactions,
OrderByField: "created_at",
SortDirection: "asc",
}

records, err := getTransactionsToCheck(
ctx, queryParams, opts...,
)
if err != nil {
return err
} else if len(records) == 0 {
return nil
}

for index := range records {
if err = processTransaction(
ctx, records[index],
); err != nil {
return err
}
}

return nil
}

// getTransactionsToCheck will get the transactions which do not have filled columns
func getTransactionsToCheck(ctx context.Context, queryParams *datastore.QueryParams,
opts ...ModelOps,
) ([]*Transaction, error) {

txs, err := getTransactionsByConditions(
ctx,
map[string]interface{}{
blockHeightField: 0,
},
queryParams, opts...,
)
if err != nil {
return nil, err
}
return txs, nil
}

// getTransactionsByConditions will get the transactions to check
func getTransactionsByConditions(ctx context.Context, conditions map[string]interface{},
queryParams *datastore.QueryParams, opts ...ModelOps,
) ([]*Transaction, error) {
if queryParams == nil {
queryParams = &datastore.QueryParams{
OrderByField: createdAtField,
SortDirection: datastore.SortAsc,
}
} else if queryParams.OrderByField == "" || queryParams.SortDirection == "" {
queryParams.OrderByField = createdAtField
queryParams.SortDirection = datastore.SortAsc
}

var models []Transaction
if err := getModels(
ctx, NewBaseModel(ModelNameEmpty, opts...).Client().Datastore(),
&models, conditions, queryParams, defaultDatabaseReadTimeout,
); err != nil {
if errors.Is(err, datastore.ErrNoResults) {
return nil, nil
}
return nil, err
}

txs := make([]*Transaction, 0)
for index := range models {
models[index].enrich(ModelSyncTransaction, opts...)
txs = append(txs, &models[index])
}

return txs, nil
}

// processTransaction will process the sync transaction record, or save the failure
func processTransaction(ctx context.Context, transaction *Transaction) error {
// Find on-chain
var txInfo *chainstate.TransactionInfo
txInfo, err := transaction.Client().Chainstate().QueryTransactionFastest(
ctx, transaction.ID, chainstate.RequiredOnChain, defaultQueryTxTimeout,
)
if err != nil {
if errors.Is(err, chainstate.ErrTransactionNotFound) {
return nil
}
return err
}

if transaction == nil {
return ErrMissingTransaction
}

transaction.BlockHash = txInfo.BlockHash
transaction.BlockHeight = uint64(txInfo.BlockHeight)

if err := transaction.Save(ctx); err != nil {
return err
}

return nil
}
12 changes: 12 additions & 0 deletions tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,15 @@ func taskSyncTransactions(ctx context.Context, logClient zLogger.GormLoggerInter
}
return err
}

// taskCheckTransactions will check any transactions
func taskCheckTransactions(ctx context.Context, logClient zLogger.GormLoggerInterface, opts ...ModelOps) error {

logClient.Info(ctx, "running check transaction(s) task...")

err := processTransactions(ctx, 10, opts...)
if err == nil || errors.Is(err, datastore.ErrNoResults) {
return nil
}
return err
}

0 comments on commit c9599fb

Please sign in to comment.