Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Print error in case XRPL tx scanning is failed and retry after timeout. #202

Merged
merged 6 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 33 additions & 32 deletions relayer/xrpl/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"go.uber.org/zap"

"github.com/CoreumFoundation/coreum-tools/pkg/parallel"
"github.com/CoreumFoundation/coreum-tools/pkg/retry"
"github.com/CoreumFoundation/coreumbridge-xrpl/relayer/logger"
)

Expand Down Expand Up @@ -121,32 +120,47 @@ func (s *AccountScanner) scanRecentHistory(
currentLedger int64,
ch chan<- rippledata.TransactionWithMetaData,
) {
// in case we don't have enough ledges for the window we start from the initial
// in case we don't have enough ledgers for the window we start from the initial
minLedger := int64(0)
if currentLedger > s.cfg.RecentScanWindow {
minLedger = currentLedger - s.cfg.RecentScanWindow
}

s.doWithRepeat(ctx, s.cfg.RepeatRecentScan, func() {
s.doWithRepeat(ctx, s.cfg.RepeatRecentScan, func() error {
s.log.Debug(
ctx,
"Scanning recent XRPL account history",
zap.Int64("minLedger", minLedger),
zap.String("account", s.cfg.Account.String()),
)
lastLedger := s.scanTransactions(ctx, minLedger, s.metricRegistry.SetXRPLAccountRecentHistoryScanLedgerIndex, ch)
if lastLedger != 0 {
lastLedger, err := s.scanTransactions(ctx, minLedger, s.metricRegistry.SetXRPLAccountRecentHistoryScanLedgerIndex, ch)
// set minLedger to start with it in next iteration
// even if the error was returned we still re-scan from the lastLedger
if lastLedger > 0 {
minLedger = lastLedger + 1
}
if err != nil {
return err
}
s.log.Debug(ctx, "Scanning of the recent history is done", zap.Int64("lastLedger", lastLedger))
return nil
})
}

func (s *AccountScanner) scanFullHistory(ctx context.Context, ch chan<- rippledata.TransactionWithMetaData) {
s.doWithRepeat(ctx, s.cfg.RepeatFullScan, func() {
minLedger := int64(-1)
s.doWithRepeat(ctx, s.cfg.RepeatFullScan, func() error {
s.log.Debug(ctx, "Scanning XRPL account full history", zap.String("account", s.cfg.Account.String()))
lastLedger := s.scanTransactions(ctx, -1, s.metricRegistry.SetXRPLAccountFullHistoryScanLedgerIndex, ch)
lastLedger, err := s.scanTransactions(ctx, minLedger, s.metricRegistry.SetXRPLAccountFullHistoryScanLedgerIndex, ch)
if err != nil {
// set minLedger to start with it in next iteration to complete the scanning
minLedger = lastLedger + 1
return err
}
// if scanning was done successfully update minLedger to start form the beginning in the next iteration
minLedger = int64(-1)
s.log.Debug(ctx, "Scanning of full history is done", zap.Int64("lastLedger", lastLedger))
return nil
})
}

Expand All @@ -155,7 +169,7 @@ func (s *AccountScanner) scanTransactions(
minLedger int64,
indexRegistryFunc func(float64),
ch chan<- rippledata.TransactionWithMetaData,
) int64 {
) (int64, error) {
if minLedger <= 0 {
minLedger = -1
}
Expand All @@ -165,28 +179,13 @@ func (s *AccountScanner) scanTransactions(
prevProcessedLedger int64
)
for {
var accountTxResult AccountTxResult
err := retry.Do(ctx, s.cfg.RetryDelay, func() error {
var err error
accountTxResult, err = s.rpcTxProvider.AccountTx(ctx, s.cfg.Account, minLedger, -1, marker)
if err != nil {
return retry.Retryable(
errors.Wrapf(err, "failed to get account transactions, account:%s, minLedger:%d, marker:%+v",
s.cfg.Account.String(), minLedger, marker),
)
}
return nil
})
accountTxResult, err := s.rpcTxProvider.AccountTx(ctx, s.cfg.Account, minLedger, -1, marker)
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return lastLedger
}
// this panic is unexpected
panic(errors.Wrapf(
return lastLedger, errors.Wrapf(
err,
"unexpected error received for the get account transactions with retry, err:%s",
err.Error(),
))
"failed to get account transactions, account:%s, minLedger:%d, marker:%+v",
s.cfg.Account.String(), minLedger, marker,
)
}
// we accept the transaction from the validated ledger only
if accountTxResult.Validated {
Expand All @@ -205,7 +204,7 @@ func (s *AccountScanner) scanTransactions(
}
select {
case <-ctx.Done():
return lastLedger
return lastLedger, ctx.Err()
case ch <- *tx:
}
}
Expand All @@ -217,16 +216,18 @@ func (s *AccountScanner) scanTransactions(
marker = accountTxResult.Marker
}

return lastLedger
return lastLedger, nil
}

func (s *AccountScanner) doWithRepeat(ctx context.Context, shouldRepeat bool, f func()) {
func (s *AccountScanner) doWithRepeat(ctx context.Context, shouldRepeat bool, f func() error) {
for {
select {
case <-ctx.Done():
return
default:
f()
if err := f(); err != nil {
s.log.Error(ctx, "XRPL scanner is failed in do with repeat", zap.Error(err))
}
if !shouldRepeat {
s.log.Info(ctx, "Execution is fully stopped.")
return
Expand Down
101 changes: 75 additions & 26 deletions relayer/xrpl/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/hex"
"reflect"
"sort"
"strings"
"testing"
"time"
Expand All @@ -13,13 +14,17 @@ import (
rippledata "github.com/rubblelabs/ripple/data"
"github.com/samber/lo"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/CoreumFoundation/coreum-tools/pkg/parallel"
"github.com/CoreumFoundation/coreumbridge-xrpl/relayer/logger"
"github.com/CoreumFoundation/coreumbridge-xrpl/relayer/xrpl"
)

type txTemplate struct {
Hash string
LedgerSequence uint32
}

func TestAccountScanner_ScanTxs(t *testing.T) {
t.Parallel()

Expand All @@ -38,6 +43,7 @@ func TestAccountScanner_ScanTxs(t *testing.T) {
cfg: xrpl.AccountScannerConfig{
Account: account,
FullScanEnabled: true,
RepeatFullScan: true,
RetryDelay: time.Millisecond,
},
rpcTxProvider: func(ctrl *gomock.Controller) xrpl.RPCTxProvider {
Expand All @@ -53,24 +59,42 @@ func TestAccountScanner_ScanTxs(t *testing.T) {
callNumber++
switch callNumber {
case 1:
require.Equal(t, int64(-1), minLedger)
return xrpl.AccountTxResult{
Validated: true,
Transactions: buildEmptyTransactions(map[string]uint32{
"1": 1,
"2": 2,
"3": 3,
Transactions: buildEmptyTransactions([]txTemplate{
{
Hash: "1",
LedgerSequence: 1,
},
{
Hash: "2",
LedgerSequence: 2,
},
{
Hash: "3",
LedgerSequence: 3,
},
}),
Marker: notEmptyMarker,
}, nil
// emulate error
case 2:
require.Equal(t, int64(-1), minLedger)
return xrpl.AccountTxResult{}, errors.New("error")
case 3:
require.Equal(t, int64(3), minLedger)
return xrpl.AccountTxResult{
Validated: true,
Transactions: buildEmptyTransactions(map[string]uint32{
"4": 3,
"5": 4,
Transactions: buildEmptyTransactions([]txTemplate{
{
Hash: "4",
LedgerSequence: 3,
},
{
Hash: "5",
LedgerSequence: 4,
},
}),
}, nil
default:
Expand Down Expand Up @@ -114,20 +138,35 @@ func TestAccountScanner_ScanTxs(t *testing.T) {
require.Equal(t, int64(100-10), minLedger)
return xrpl.AccountTxResult{
Validated: true,
Transactions: buildEmptyTransactions(map[string]uint32{
"1": 90,
"2": 91,
"3": 91,
"4": 92,
Transactions: buildEmptyTransactions([]txTemplate{
{
Hash: "1",
LedgerSequence: 90,
},
{
Hash: "2",
LedgerSequence: 91,
},
{
Hash: "3",
LedgerSequence: 91,
},
{
Hash: "4",
LedgerSequence: 92,
},
}),
Marker: notEmptyMarker,
}, nil
case 2:
require.Equal(t, int64(100-10), minLedger)
return xrpl.AccountTxResult{
Validated: true,
Transactions: buildEmptyTransactions(map[string]uint32{
"5": 92,
Transactions: buildEmptyTransactions([]txTemplate{
{
Hash: "5",
LedgerSequence: 92,
},
}),
// finish
Marker: nil,
Expand All @@ -139,8 +178,11 @@ func TestAccountScanner_ScanTxs(t *testing.T) {
require.Equal(t, int64(93), minLedger)
return xrpl.AccountTxResult{
Validated: true,
Transactions: buildEmptyTransactions(map[string]uint32{
"6": 93,
Transactions: buildEmptyTransactions([]txTemplate{
{
Hash: "6",
LedgerSequence: 93,
},
}),
// finish
Marker: nil,
Expand All @@ -149,8 +191,11 @@ func TestAccountScanner_ScanTxs(t *testing.T) {
require.Equal(t, int64(94), minLedger)
return xrpl.AccountTxResult{
Validated: true,
Transactions: buildEmptyTransactions(map[string]uint32{
"7": 94,
Transactions: buildEmptyTransactions([]txTemplate{
{
Hash: "7",
LedgerSequence: 94,
},
}),
}, nil
default:
Expand All @@ -172,8 +217,8 @@ func TestAccountScanner_ScanTxs(t *testing.T) {

ctrl := gomock.NewController(t)
defer ctrl.Finish()
zapDevLogger, err := zap.NewDevelopment()
require.NoError(t, err)
logMock := logger.NewAnyLogMock(ctrl)
logMock.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any())
rpcTxProvider := tt.rpcTxProvider(ctrl)

metricRegistryMock := NewMockMetricRegistry(ctrl)
Expand All @@ -182,7 +227,7 @@ func TestAccountScanner_ScanTxs(t *testing.T) {

s := xrpl.NewAccountScanner(
tt.cfg,
logger.NewZapLoggerFromLogger(zapDevLogger),
logMock,
rpcTxProvider,
metricRegistryMock,
)
Expand Down Expand Up @@ -232,19 +277,23 @@ func readTxHashesFromChannels(
}
}

func buildEmptyTransactions(txsData map[string]uint32) []*rippledata.TransactionWithMetaData {
func buildEmptyTransactions(txsData []txTemplate) []*rippledata.TransactionWithMetaData {
txs := make([]*rippledata.TransactionWithMetaData, 0, len(txsData))
for hash, ledgerSequence := range txsData {
for _, tx := range txsData {
var txHash rippledata.Hash256
copy(txHash[:], hash)
copy(txHash[:], tx.Hash)
txs = append(txs, &rippledata.TransactionWithMetaData{
LedgerSequence: ledgerSequence,
LedgerSequence: tx.LedgerSequence,
Transaction: &rippledata.Payment{
TxBase: rippledata.TxBase{
Hash: txHash,
},
},
})
}
// order by ledger sequence
sort.Slice(txs, func(i, j int) bool {
return txs[i].LedgerSequence < txs[j].LedgerSequence
})
return txs
}
Loading