From b1a0d74852a95796552578a2ac2fa1583ec8ae7a Mon Sep 17 00:00:00 2001 From: Dzmitry Hil Date: Thu, 4 Apr 2024 12:18:53 +0300 Subject: [PATCH 1/4] Print error in case XRPL tx scanning is failed and retry after timeout. --- relayer/xrpl/scanner.go | 63 ++++++++++++++++++------------------ relayer/xrpl/scanner_test.go | 11 ++++--- 2 files changed, 39 insertions(+), 35 deletions(-) diff --git a/relayer/xrpl/scanner.go b/relayer/xrpl/scanner.go index 95ba5d66..4cb6387a 100644 --- a/relayer/xrpl/scanner.go +++ b/relayer/xrpl/scanner.go @@ -9,7 +9,6 @@ import ( "go.uber.org/zap" "github.com/CoreumFoundation/coreum-tools/pkg/parallel" - "github.com/CoreumFoundation/coreum-tools/pkg/retry" "github.com/CoreumFoundation/coreumbridge-xrpl/relayer/logger" ) @@ -121,32 +120,47 @@ func (s *AccountScanner) scanRecentHistory( currentLedger int64, ch chan<- rippledata.TransactionWithMetaData, ) { - // in case we don't have enough ledges for the window we start from the initial + // in case we don't have enough ledgers for the window we start from the initial minLedger := int64(0) if currentLedger > s.cfg.RecentScanWindow { minLedger = currentLedger - s.cfg.RecentScanWindow } - s.doWithRepeat(ctx, s.cfg.RepeatRecentScan, func() { + s.doWithRepeat(ctx, s.cfg.RepeatRecentScan, func() error { s.log.Debug( ctx, "Scanning recent XRPL account history", zap.Int64("minLedger", minLedger), zap.String("account", s.cfg.Account.String()), ) - lastLedger := s.scanTransactions(ctx, minLedger, s.metricRegistry.SetXRPLAccountRecentHistoryScanLedgerIndex, ch) + lastLedger, err := s.scanTransactions(ctx, minLedger, s.metricRegistry.SetXRPLAccountRecentHistoryScanLedgerIndex, ch) + // set minLedger to start with it in next iteration + // even if the error was returned we still re-scan from the lastLedger if lastLedger != 0 { minLedger = lastLedger + 1 } + if err != nil { + return err + } s.log.Debug(ctx, "Scanning of the recent history is done", zap.Int64("lastLedger", lastLedger)) + return nil }) } func (s *AccountScanner) scanFullHistory(ctx context.Context, ch chan<- rippledata.TransactionWithMetaData) { - s.doWithRepeat(ctx, s.cfg.RepeatFullScan, func() { + minLedger := int64(-1) + s.doWithRepeat(ctx, s.cfg.RepeatFullScan, func() error { s.log.Debug(ctx, "Scanning XRPL account full history", zap.String("account", s.cfg.Account.String())) - lastLedger := s.scanTransactions(ctx, -1, s.metricRegistry.SetXRPLAccountFullHistoryScanLedgerIndex, ch) + lastLedger, err := s.scanTransactions(ctx, minLedger, s.metricRegistry.SetXRPLAccountFullHistoryScanLedgerIndex, ch) + if err != nil { + // set minLedger to start with it in next iteration to complete the scanning + minLedger = lastLedger + 1 + return err + } + // if scanning was done successfully update minLedger to start form the beginning in the next iteration + minLedger = int64(-1) s.log.Debug(ctx, "Scanning of full history is done", zap.Int64("lastLedger", lastLedger)) + return nil }) } @@ -155,7 +169,7 @@ func (s *AccountScanner) scanTransactions( minLedger int64, indexRegistryFunc func(float64), ch chan<- rippledata.TransactionWithMetaData, -) int64 { +) (int64, error) { if minLedger <= 0 { minLedger = -1 } @@ -165,28 +179,13 @@ func (s *AccountScanner) scanTransactions( prevProcessedLedger int64 ) for { - var accountTxResult AccountTxResult - err := retry.Do(ctx, s.cfg.RetryDelay, func() error { - var err error - accountTxResult, err = s.rpcTxProvider.AccountTx(ctx, s.cfg.Account, minLedger, -1, marker) - if err != nil { - return retry.Retryable( - errors.Wrapf(err, "failed to get account transactions, account:%s, minLedger:%d, marker:%+v", - s.cfg.Account.String(), minLedger, marker), - ) - } - return nil - }) + accountTxResult, err := s.rpcTxProvider.AccountTx(ctx, s.cfg.Account, minLedger, -1, marker) if err != nil { - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - return lastLedger - } - // this panic is unexpected - panic(errors.Wrapf( + return lastLedger, errors.Wrapf( err, - "unexpected error received for the get account transactions with retry, err:%s", - err.Error(), - )) + "failed to get account transactions, account:%s, minLedger:%d, marker:%+v", + s.cfg.Account.String(), minLedger, marker, + ) } // we accept the transaction from the validated ledger only if accountTxResult.Validated { @@ -205,7 +204,7 @@ func (s *AccountScanner) scanTransactions( } select { case <-ctx.Done(): - return lastLedger + return lastLedger, ctx.Err() case ch <- *tx: } } @@ -217,16 +216,18 @@ func (s *AccountScanner) scanTransactions( marker = accountTxResult.Marker } - return lastLedger + return lastLedger, nil } -func (s *AccountScanner) doWithRepeat(ctx context.Context, shouldRepeat bool, f func()) { +func (s *AccountScanner) doWithRepeat(ctx context.Context, shouldRepeat bool, f func() error) { for { select { case <-ctx.Done(): return default: - f() + if err := f(); err != nil { + s.log.Error(ctx, "XRPL scanner is failed in do with repeat", zap.Error(err)) + } if !shouldRepeat { s.log.Info(ctx, "Execution is fully stopped.") return diff --git a/relayer/xrpl/scanner_test.go b/relayer/xrpl/scanner_test.go index ab8cdfd4..2c9f6df0 100644 --- a/relayer/xrpl/scanner_test.go +++ b/relayer/xrpl/scanner_test.go @@ -13,7 +13,6 @@ import ( rippledata "github.com/rubblelabs/ripple/data" "github.com/samber/lo" "github.com/stretchr/testify/require" - "go.uber.org/zap" "github.com/CoreumFoundation/coreum-tools/pkg/parallel" "github.com/CoreumFoundation/coreumbridge-xrpl/relayer/logger" @@ -38,6 +37,7 @@ func TestAccountScanner_ScanTxs(t *testing.T) { cfg: xrpl.AccountScannerConfig{ Account: account, FullScanEnabled: true, + RepeatFullScan: true, RetryDelay: time.Millisecond, }, rpcTxProvider: func(ctrl *gomock.Controller) xrpl.RPCTxProvider { @@ -53,6 +53,7 @@ func TestAccountScanner_ScanTxs(t *testing.T) { callNumber++ switch callNumber { case 1: + require.Equal(t, int64(-1), minLedger) return xrpl.AccountTxResult{ Validated: true, Transactions: buildEmptyTransactions(map[string]uint32{ @@ -64,8 +65,10 @@ func TestAccountScanner_ScanTxs(t *testing.T) { }, nil // emulate error case 2: + require.Equal(t, int64(-1), minLedger) return xrpl.AccountTxResult{}, errors.New("error") case 3: + require.Equal(t, int64(3), minLedger) return xrpl.AccountTxResult{ Validated: true, Transactions: buildEmptyTransactions(map[string]uint32{ @@ -172,8 +175,8 @@ func TestAccountScanner_ScanTxs(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - zapDevLogger, err := zap.NewDevelopment() - require.NoError(t, err) + logMock := logger.NewAnyLogMock(ctrl) + logMock.EXPECT().Error(gomock.Any(), gomock.Any(), gomock.Any()) rpcTxProvider := tt.rpcTxProvider(ctrl) metricRegistryMock := NewMockMetricRegistry(ctrl) @@ -182,7 +185,7 @@ func TestAccountScanner_ScanTxs(t *testing.T) { s := xrpl.NewAccountScanner( tt.cfg, - logger.NewZapLoggerFromLogger(zapDevLogger), + logMock, rpcTxProvider, metricRegistryMock, ) From f332a4d4efdf5ad8e546a6b510f11ab2e3e26fc2 Mon Sep 17 00:00:00 2001 From: Dzmitry Hil Date: Thu, 4 Apr 2024 14:23:00 +0300 Subject: [PATCH 2/4] Use is positive check for lastLedger --- relayer/xrpl/scanner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relayer/xrpl/scanner.go b/relayer/xrpl/scanner.go index 4cb6387a..f216cf6e 100644 --- a/relayer/xrpl/scanner.go +++ b/relayer/xrpl/scanner.go @@ -136,7 +136,7 @@ func (s *AccountScanner) scanRecentHistory( lastLedger, err := s.scanTransactions(ctx, minLedger, s.metricRegistry.SetXRPLAccountRecentHistoryScanLedgerIndex, ch) // set minLedger to start with it in next iteration // even if the error was returned we still re-scan from the lastLedger - if lastLedger != 0 { + if lastLedger > 0 { minLedger = lastLedger + 1 } if err != nil { From 727767ff1e5c15295ca8aad19b3495d442755310 Mon Sep 17 00:00:00 2001 From: Dzmitry Hil Date: Fri, 5 Apr 2024 14:50:14 +0300 Subject: [PATCH 3/4] Order txs by ledger seq --- relayer/xrpl/scanner_test.go | 165 ++++++++++++++++++----------------- 1 file changed, 85 insertions(+), 80 deletions(-) diff --git a/relayer/xrpl/scanner_test.go b/relayer/xrpl/scanner_test.go index 2c9f6df0..6a0409fe 100644 --- a/relayer/xrpl/scanner_test.go +++ b/relayer/xrpl/scanner_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/hex" "reflect" + "sort" "strings" "testing" "time" @@ -87,86 +88,86 @@ func TestAccountScanner_ScanTxs(t *testing.T) { }, wantErr: false, }, - { - name: "recent_scan_positive_with_retry_four_pages", - cfg: xrpl.AccountScannerConfig{ - Account: account, - RecentScanEnabled: true, - RecentScanWindow: 10, - RepeatRecentScan: true, - RetryDelay: time.Millisecond, - }, - rpcTxProvider: func(ctrl *gomock.Controller) xrpl.RPCTxProvider { - mockedProvider := NewMockRPCTxProvider(ctrl) - - mockedProvider.EXPECT().LedgerCurrent(gomock.Any()).Return(xrpl.LedgerCurrentResult{ - LedgerCurrentIndex: 100, - }, nil) - - callNumber := 0 - mockedProvider.EXPECT().AccountTx(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - func( - ctx context.Context, - account rippledata.Account, - minLedger, maxLedger int64, - marker map[string]any, - ) (xrpl.AccountTxResult, error) { - callNumber++ - switch callNumber { - case 1: - require.Equal(t, int64(100-10), minLedger) - return xrpl.AccountTxResult{ - Validated: true, - Transactions: buildEmptyTransactions(map[string]uint32{ - "1": 90, - "2": 91, - "3": 91, - "4": 92, - }), - Marker: notEmptyMarker, - }, nil - case 2: - require.Equal(t, int64(100-10), minLedger) - return xrpl.AccountTxResult{ - Validated: true, - Transactions: buildEmptyTransactions(map[string]uint32{ - "5": 92, - }), - // finish - Marker: nil, - }, nil - case 3: - require.Equal(t, int64(93), minLedger) - return xrpl.AccountTxResult{}, errors.New("error") - case 4: - require.Equal(t, int64(93), minLedger) - return xrpl.AccountTxResult{ - Validated: true, - Transactions: buildEmptyTransactions(map[string]uint32{ - "6": 93, - }), - // finish - Marker: nil, - }, nil - case 5: - require.Equal(t, int64(94), minLedger) - return xrpl.AccountTxResult{ - Validated: true, - Transactions: buildEmptyTransactions(map[string]uint32{ - "7": 94, - }), - }, nil - default: - panic("unexpected call") - } - }).AnyTimes() - return mockedProvider - }, - wantTxHashes: []string{ - "1", "2", "3", "4", "5", "6", "7", - }, - wantErr: false, - }, + //{ + // name: "recent_scan_positive_with_retry_four_pages", + // cfg: xrpl.AccountScannerConfig{ + // Account: account, + // RecentScanEnabled: true, + // RecentScanWindow: 10, + // RepeatRecentScan: true, + // RetryDelay: time.Millisecond, + // }, + // rpcTxProvider: func(ctrl *gomock.Controller) xrpl.RPCTxProvider { + // mockedProvider := NewMockRPCTxProvider(ctrl) + // + // mockedProvider.EXPECT().LedgerCurrent(gomock.Any()).Return(xrpl.LedgerCurrentResult{ + // LedgerCurrentIndex: 100, + // }, nil) + // + // callNumber := 0 + // mockedProvider.EXPECT().AccountTx(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + // func( + // ctx context.Context, + // account rippledata.Account, + // minLedger, maxLedger int64, + // marker map[string]any, + // ) (xrpl.AccountTxResult, error) { + // callNumber++ + // switch callNumber { + // case 1: + // require.Equal(t, int64(100-10), minLedger) + // return xrpl.AccountTxResult{ + // Validated: true, + // Transactions: buildEmptyTransactions(map[string]uint32{ + // "1": 90, + // "2": 91, + // "3": 91, + // "4": 92, + // }), + // Marker: notEmptyMarker, + // }, nil + // case 2: + // require.Equal(t, int64(100-10), minLedger) + // return xrpl.AccountTxResult{ + // Validated: true, + // Transactions: buildEmptyTransactions(map[string]uint32{ + // "5": 92, + // }), + // // finish + // Marker: nil, + // }, nil + // case 3: + // require.Equal(t, int64(93), minLedger) + // return xrpl.AccountTxResult{}, errors.New("error") + // case 4: + // require.Equal(t, int64(93), minLedger) + // return xrpl.AccountTxResult{ + // Validated: true, + // Transactions: buildEmptyTransactions(map[string]uint32{ + // "6": 93, + // }), + // // finish + // Marker: nil, + // }, nil + // case 5: + // require.Equal(t, int64(94), minLedger) + // return xrpl.AccountTxResult{ + // Validated: true, + // Transactions: buildEmptyTransactions(map[string]uint32{ + // "7": 94, + // }), + // }, nil + // default: + // panic("unexpected call") + // } + // }).AnyTimes() + // return mockedProvider + // }, + // wantTxHashes: []string{ + // "1", "2", "3", "4", "5", "6", "7", + // }, + // wantErr: false, + //}, } for _, tt := range tests { tt := tt @@ -249,5 +250,9 @@ func buildEmptyTransactions(txsData map[string]uint32) []*rippledata.Transaction }, }) } + // order by ledger sequence + sort.Slice(txs, func(i, j int) bool { + return txs[i].LedgerSequence < txs[j].LedgerSequence + }) return txs } From 17ba565f9fb65176846e1057fdf5075f944e3798 Mon Sep 17 00:00:00 2001 From: Dzmitry Hil Date: Fri, 5 Apr 2024 15:22:06 +0300 Subject: [PATCH 4/4] Fix TestAccountScanner_ScanTxs map usage --- relayer/xrpl/scanner_test.go | 223 +++++++++++++++++++++-------------- 1 file changed, 132 insertions(+), 91 deletions(-) diff --git a/relayer/xrpl/scanner_test.go b/relayer/xrpl/scanner_test.go index 6a0409fe..5bda421e 100644 --- a/relayer/xrpl/scanner_test.go +++ b/relayer/xrpl/scanner_test.go @@ -20,6 +20,11 @@ import ( "github.com/CoreumFoundation/coreumbridge-xrpl/relayer/xrpl" ) +type txTemplate struct { + Hash string + LedgerSequence uint32 +} + func TestAccountScanner_ScanTxs(t *testing.T) { t.Parallel() @@ -57,10 +62,19 @@ func TestAccountScanner_ScanTxs(t *testing.T) { require.Equal(t, int64(-1), minLedger) return xrpl.AccountTxResult{ Validated: true, - Transactions: buildEmptyTransactions(map[string]uint32{ - "1": 1, - "2": 2, - "3": 3, + Transactions: buildEmptyTransactions([]txTemplate{ + { + Hash: "1", + LedgerSequence: 1, + }, + { + Hash: "2", + LedgerSequence: 2, + }, + { + Hash: "3", + LedgerSequence: 3, + }, }), Marker: notEmptyMarker, }, nil @@ -72,9 +86,15 @@ func TestAccountScanner_ScanTxs(t *testing.T) { require.Equal(t, int64(3), minLedger) return xrpl.AccountTxResult{ Validated: true, - Transactions: buildEmptyTransactions(map[string]uint32{ - "4": 3, - "5": 4, + Transactions: buildEmptyTransactions([]txTemplate{ + { + Hash: "4", + LedgerSequence: 3, + }, + { + Hash: "5", + LedgerSequence: 4, + }, }), }, nil default: @@ -88,86 +108,107 @@ func TestAccountScanner_ScanTxs(t *testing.T) { }, wantErr: false, }, - //{ - // name: "recent_scan_positive_with_retry_four_pages", - // cfg: xrpl.AccountScannerConfig{ - // Account: account, - // RecentScanEnabled: true, - // RecentScanWindow: 10, - // RepeatRecentScan: true, - // RetryDelay: time.Millisecond, - // }, - // rpcTxProvider: func(ctrl *gomock.Controller) xrpl.RPCTxProvider { - // mockedProvider := NewMockRPCTxProvider(ctrl) - // - // mockedProvider.EXPECT().LedgerCurrent(gomock.Any()).Return(xrpl.LedgerCurrentResult{ - // LedgerCurrentIndex: 100, - // }, nil) - // - // callNumber := 0 - // mockedProvider.EXPECT().AccountTx(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( - // func( - // ctx context.Context, - // account rippledata.Account, - // minLedger, maxLedger int64, - // marker map[string]any, - // ) (xrpl.AccountTxResult, error) { - // callNumber++ - // switch callNumber { - // case 1: - // require.Equal(t, int64(100-10), minLedger) - // return xrpl.AccountTxResult{ - // Validated: true, - // Transactions: buildEmptyTransactions(map[string]uint32{ - // "1": 90, - // "2": 91, - // "3": 91, - // "4": 92, - // }), - // Marker: notEmptyMarker, - // }, nil - // case 2: - // require.Equal(t, int64(100-10), minLedger) - // return xrpl.AccountTxResult{ - // Validated: true, - // Transactions: buildEmptyTransactions(map[string]uint32{ - // "5": 92, - // }), - // // finish - // Marker: nil, - // }, nil - // case 3: - // require.Equal(t, int64(93), minLedger) - // return xrpl.AccountTxResult{}, errors.New("error") - // case 4: - // require.Equal(t, int64(93), minLedger) - // return xrpl.AccountTxResult{ - // Validated: true, - // Transactions: buildEmptyTransactions(map[string]uint32{ - // "6": 93, - // }), - // // finish - // Marker: nil, - // }, nil - // case 5: - // require.Equal(t, int64(94), minLedger) - // return xrpl.AccountTxResult{ - // Validated: true, - // Transactions: buildEmptyTransactions(map[string]uint32{ - // "7": 94, - // }), - // }, nil - // default: - // panic("unexpected call") - // } - // }).AnyTimes() - // return mockedProvider - // }, - // wantTxHashes: []string{ - // "1", "2", "3", "4", "5", "6", "7", - // }, - // wantErr: false, - //}, + { + name: "recent_scan_positive_with_retry_four_pages", + cfg: xrpl.AccountScannerConfig{ + Account: account, + RecentScanEnabled: true, + RecentScanWindow: 10, + RepeatRecentScan: true, + RetryDelay: time.Millisecond, + }, + rpcTxProvider: func(ctrl *gomock.Controller) xrpl.RPCTxProvider { + mockedProvider := NewMockRPCTxProvider(ctrl) + + mockedProvider.EXPECT().LedgerCurrent(gomock.Any()).Return(xrpl.LedgerCurrentResult{ + LedgerCurrentIndex: 100, + }, nil) + + callNumber := 0 + mockedProvider.EXPECT().AccountTx(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func( + ctx context.Context, + account rippledata.Account, + minLedger, maxLedger int64, + marker map[string]any, + ) (xrpl.AccountTxResult, error) { + callNumber++ + switch callNumber { + case 1: + require.Equal(t, int64(100-10), minLedger) + return xrpl.AccountTxResult{ + Validated: true, + Transactions: buildEmptyTransactions([]txTemplate{ + { + Hash: "1", + LedgerSequence: 90, + }, + { + Hash: "2", + LedgerSequence: 91, + }, + { + Hash: "3", + LedgerSequence: 91, + }, + { + Hash: "4", + LedgerSequence: 92, + }, + }), + Marker: notEmptyMarker, + }, nil + case 2: + require.Equal(t, int64(100-10), minLedger) + return xrpl.AccountTxResult{ + Validated: true, + Transactions: buildEmptyTransactions([]txTemplate{ + { + Hash: "5", + LedgerSequence: 92, + }, + }), + // finish + Marker: nil, + }, nil + case 3: + require.Equal(t, int64(93), minLedger) + return xrpl.AccountTxResult{}, errors.New("error") + case 4: + require.Equal(t, int64(93), minLedger) + return xrpl.AccountTxResult{ + Validated: true, + Transactions: buildEmptyTransactions([]txTemplate{ + { + Hash: "6", + LedgerSequence: 93, + }, + }), + // finish + Marker: nil, + }, nil + case 5: + require.Equal(t, int64(94), minLedger) + return xrpl.AccountTxResult{ + Validated: true, + Transactions: buildEmptyTransactions([]txTemplate{ + { + Hash: "7", + LedgerSequence: 94, + }, + }), + }, nil + default: + panic("unexpected call") + } + }).AnyTimes() + return mockedProvider + }, + wantTxHashes: []string{ + "1", "2", "3", "4", "5", "6", "7", + }, + wantErr: false, + }, } for _, tt := range tests { tt := tt @@ -236,13 +277,13 @@ func readTxHashesFromChannels( } } -func buildEmptyTransactions(txsData map[string]uint32) []*rippledata.TransactionWithMetaData { +func buildEmptyTransactions(txsData []txTemplate) []*rippledata.TransactionWithMetaData { txs := make([]*rippledata.TransactionWithMetaData, 0, len(txsData)) - for hash, ledgerSequence := range txsData { + for _, tx := range txsData { var txHash rippledata.Hash256 - copy(txHash[:], hash) + copy(txHash[:], tx.Hash) txs = append(txs, &rippledata.TransactionWithMetaData{ - LedgerSequence: ledgerSequence, + LedgerSequence: tx.LedgerSequence, Transaction: &rippledata.Payment{ TxBase: rippledata.TxBase{ Hash: txHash,