From da5f7de391fce322ebfa64328ef5f154fa22d791 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Mon, 16 Oct 2023 09:11:46 -0700 Subject: [PATCH 01/14] #4909: initial wip on integrating loaders and builders into processor runners --- services/horizon/internal/ingest/fsm.go | 2 + .../internal/ingest/group_processors.go | 28 +++++-- .../internal/ingest/group_processors_test.go | 40 +++++---- .../internal/ingest/processor_runner.go | 68 +++++++++------- .../internal/ingest/processor_runner_test.go | 81 ++++++++++++++++--- .../ingest/processors/change_processors.go | 57 ------------- .../internal/ingest/processors/main.go | 65 +++++++++++++++ .../stats_ledger_transaction_processor.go | 7 +- ...stats_ledger_transaction_processor_test.go | 12 +-- 9 files changed, 231 insertions(+), 129 deletions(-) diff --git a/services/horizon/internal/ingest/fsm.go b/services/horizon/internal/ingest/fsm.go index 38e3fe9ed7..284ea7127b 100644 --- a/services/horizon/internal/ingest/fsm.go +++ b/services/horizon/internal/ingest/fsm.go @@ -459,6 +459,7 @@ func (r resumeState) run(s *system) (transition, error) { // Update cursor if there's more than one ingesting instance: either // Captive-Core or DB ingestion connected to another Stellar-Core. + // remove now? if err = s.updateCursor(lastIngestedLedger); err != nil { // Don't return updateCursor error. log.WithError(err).Warn("error updating stellar-core cursor") @@ -524,6 +525,7 @@ func (r resumeState) run(s *system) (transition, error) { return retryResume(r), err } + //TODO remove now? stellar-core-db-url is removed if err = s.updateCursor(ingestLedger); err != nil { // Don't return updateCursor error. log.WithError(err).Warn("error updating stellar-core cursor") diff --git a/services/horizon/internal/ingest/group_processors.go b/services/horizon/internal/ingest/group_processors.go index 86622810b5..af486a35cf 100644 --- a/services/horizon/internal/ingest/group_processors.go +++ b/services/horizon/internal/ingest/group_processors.go @@ -7,7 +7,9 @@ import ( "github.com/stellar/go/ingest" "github.com/stellar/go/services/horizon/internal/ingest/processors" + "github.com/stellar/go/support/db" "github.com/stellar/go/support/errors" + "github.com/stellar/go/xdr" ) type processorsRunDurations map[string]time.Duration @@ -51,21 +53,23 @@ func (g groupChangeProcessors) Commit(ctx context.Context) error { } type groupTransactionProcessors struct { - processors []horizonTransactionProcessor + processors []horizonTransactionProcessor + lazyLoaders []horizonLazyLoader processorsRunDurations } -func newGroupTransactionProcessors(processors []horizonTransactionProcessor) *groupTransactionProcessors { +func newGroupTransactionProcessors(processors []horizonTransactionProcessor, lazyLoaders []horizonLazyLoader) *groupTransactionProcessors { return &groupTransactionProcessors{ processors: processors, processorsRunDurations: make(map[string]time.Duration), + lazyLoaders: lazyLoaders, } } -func (g groupTransactionProcessors) ProcessTransaction(ctx context.Context, tx ingest.LedgerTransaction) error { +func (g groupTransactionProcessors) ProcessTransaction(lcm xdr.LedgerCloseMeta, tx ingest.LedgerTransaction) error { for _, p := range g.processors { startTime := time.Now() - if err := p.ProcessTransaction(ctx, tx); err != nil { + if err := p.ProcessTransaction(lcm, tx); err != nil { return errors.Wrapf(err, "error in %T.ProcessTransaction", p) } g.AddRunDuration(fmt.Sprintf("%T", p), startTime) @@ -73,11 +77,21 @@ func (g groupTransactionProcessors) ProcessTransaction(ctx context.Context, tx i return nil } -func (g groupTransactionProcessors) Commit(ctx context.Context) error { +func (g groupTransactionProcessors) Flush(ctx context.Context, session db.SessionInterface) error { + // need to trigger all lazy loaders to now resolve their future placeholders + // with real db values first + for _, loader := range g.lazyLoaders { + if err := loader.Exec(ctx, session); err != nil { + return errors.Wrapf(err, "error during lazy loader resolution, %T.Exec", loader) + } + } + + // now flush each processor which may call loader.GetNow(), which + // required the prior loader.Exec() to have been called. for _, p := range g.processors { startTime := time.Now() - if err := p.Commit(ctx); err != nil { - return errors.Wrapf(err, "error in %T.Commit", p) + if err := p.Flush(ctx, session); err != nil { + return errors.Wrapf(err, "error in %T.Flush", p) } g.AddRunDuration(fmt.Sprintf("%T", p), startTime) } diff --git a/services/horizon/internal/ingest/group_processors_test.go b/services/horizon/internal/ingest/group_processors_test.go index 6848c24a66..80d43a0d01 100644 --- a/services/horizon/internal/ingest/group_processors_test.go +++ b/services/horizon/internal/ingest/group_processors_test.go @@ -11,6 +11,8 @@ import ( "github.com/stretchr/testify/suite" "github.com/stellar/go/ingest" + "github.com/stellar/go/support/db" + "github.com/stellar/go/xdr" ) var _ horizonChangeProcessor = (*mockHorizonChangeProcessor)(nil) @@ -35,12 +37,12 @@ type mockHorizonTransactionProcessor struct { mock.Mock } -func (m *mockHorizonTransactionProcessor) ProcessTransaction(ctx context.Context, transaction ingest.LedgerTransaction) error { - args := m.Called(ctx, transaction) +func (m *mockHorizonTransactionProcessor) ProcessTransaction(lcm xdr.LedgerCloseMeta, transaction ingest.LedgerTransaction) error { + args := m.Called(lcm, transaction) return args.Error(0) } -func (m *mockHorizonTransactionProcessor) Commit(ctx context.Context) error { +func (m *mockHorizonTransactionProcessor) Flush(ctx context.Context, session db.SessionInterface) error { args := m.Called(ctx) return args.Error(0) } @@ -124,6 +126,7 @@ type GroupTransactionProcessorsTestSuiteLedger struct { processors *groupTransactionProcessors processorA *mockHorizonTransactionProcessor processorB *mockHorizonTransactionProcessor + session db.SessionInterface } func TestGroupTransactionProcessorsTestSuiteLedger(t *testing.T) { @@ -137,7 +140,8 @@ func (s *GroupTransactionProcessorsTestSuiteLedger) SetupTest() { s.processors = newGroupTransactionProcessors([]horizonTransactionProcessor{ s.processorA, s.processorB, - }) + }, nil) + s.session = &db.MockSession{} } func (s *GroupTransactionProcessorsTestSuiteLedger) TearDownTest() { @@ -147,46 +151,48 @@ func (s *GroupTransactionProcessorsTestSuiteLedger) TearDownTest() { func (s *GroupTransactionProcessorsTestSuiteLedger) TestProcessTransactionFails() { transaction := ingest.LedgerTransaction{} + closeMeta := xdr.LedgerCloseMeta{} s.processorA. - On("ProcessTransaction", s.ctx, transaction). + On("ProcessTransaction", closeMeta, transaction). Return(errors.New("transient error")).Once() - err := s.processors.ProcessTransaction(s.ctx, transaction) + err := s.processors.ProcessTransaction(closeMeta, transaction) s.Assert().Error(err) s.Assert().EqualError(err, "error in *ingest.mockHorizonTransactionProcessor.ProcessTransaction: transient error") } func (s *GroupTransactionProcessorsTestSuiteLedger) TestProcessTransactionSucceeds() { transaction := ingest.LedgerTransaction{} + closeMeta := xdr.LedgerCloseMeta{} s.processorA. - On("ProcessTransaction", s.ctx, transaction). + On("ProcessTransaction", closeMeta, transaction). Return(nil).Once() s.processorB. - On("ProcessTransaction", s.ctx, transaction). + On("ProcessTransaction", closeMeta, transaction). Return(nil).Once() - err := s.processors.ProcessTransaction(s.ctx, transaction) + err := s.processors.ProcessTransaction(closeMeta, transaction) s.Assert().NoError(err) } -func (s *GroupTransactionProcessorsTestSuiteLedger) TestCommitFails() { +func (s *GroupTransactionProcessorsTestSuiteLedger) TestFlushFails() { s.processorA. - On("Commit", s.ctx). + On("Flush", s.ctx, s.session). Return(errors.New("transient error")).Once() - err := s.processors.Commit(s.ctx) + err := s.processors.Flush(s.ctx, s.session) s.Assert().Error(err) - s.Assert().EqualError(err, "error in *ingest.mockHorizonTransactionProcessor.Commit: transient error") + s.Assert().EqualError(err, "error in *ingest.mockHorizonTransactionProcessor.Flush: transient error") } -func (s *GroupTransactionProcessorsTestSuiteLedger) TestCommitSucceeds() { +func (s *GroupTransactionProcessorsTestSuiteLedger) TestFlushSucceeds() { s.processorA. - On("Commit", s.ctx). + On("Flush", s.ctx, s.session). Return(nil).Once() s.processorB. - On("Commit", s.ctx). + On("Flush", s.ctx, s.session). Return(nil).Once() - err := s.processors.Commit(s.ctx) + err := s.processors.Flush(s.ctx, s.session) s.Assert().NoError(err) } diff --git a/services/horizon/internal/ingest/processor_runner.go b/services/horizon/internal/ingest/processor_runner.go index 481a4e7d52..28efe53a6e 100644 --- a/services/horizon/internal/ingest/processor_runner.go +++ b/services/horizon/internal/ingest/processor_runner.go @@ -32,7 +32,10 @@ type horizonChangeProcessor interface { type horizonTransactionProcessor interface { processors.LedgerTransactionProcessor - Commit(context.Context) error +} + +type horizonLazyLoader interface { + Exec(ctx context.Context, session db.SessionInterface) error } type statsChangeProcessor struct { @@ -47,10 +50,6 @@ type statsLedgerTransactionProcessor struct { *processors.StatsLedgerTransactionProcessor } -func (statsLedgerTransactionProcessor) Commit(ctx context.Context) error { - return nil -} - type ledgerStats struct { changeStats ingest.StatsChangeProcessorResults changeDurations processorsRunDurations @@ -135,24 +134,36 @@ func buildChangeProcessor( func (s *ProcessorRunner) buildTransactionProcessor( ledgerTransactionStats *processors.StatsLedgerTransactionProcessor, tradeProcessor *processors.TradeProcessor, - ledger xdr.LedgerHeaderHistoryEntry, + ledger xdr.LedgerCloseMeta, + txBuilder history.TransactionBatchInsertBuilder, ) *groupTransactionProcessors { + accountLoader := history.NewAccountLoader() + assetLoader := history.NewAssetLoader() + lpLoader := history.NewLiquidityPoolLoader() + + lazyLoaders := []horizonLazyLoader{accountLoader, assetLoader, lpLoader} + statsLedgerTransactionProcessor := &statsLedgerTransactionProcessor{ StatsLedgerTransactionProcessor: ledgerTransactionStats, } - *tradeProcessor = *processors.NewTradeProcessor(s.session, s.historyQ, ledger) - sequence := uint32(ledger.Header.LedgerSeq) - return newGroupTransactionProcessors([]horizonTransactionProcessor{ + *tradeProcessor = *processors.NewTradeProcessor(accountLoader, + lpLoader, assetLoader, s.historyQ.NewTradeBatchInsertBuilder()) + + processors := []horizonTransactionProcessor{ statsLedgerTransactionProcessor, - processors.NewEffectProcessor(s.session, s.historyQ, sequence), - processors.NewLedgerProcessor(s.session, s.historyQ, ledger, CurrentVersion), - processors.NewOperationProcessor(s.session, s.historyQ, sequence), + processors.NewEffectProcessor(accountLoader, s.historyQ.NewEffectBatchInsertBuilder()), + processors.NewLedgerProcessor(s.historyQ.NewLedgerBatchInsertBuilder(), int(ledger.LedgerHeaderHistoryEntry().Header.LedgerVersion)), + processors.NewOperationProcessor(s.historyQ.NewOperationBatchInsertBuilder()), tradeProcessor, - processors.NewParticipantsProcessor(s.session, s.historyQ, sequence), - processors.NewTransactionProcessor(s.session, s.historyQ, sequence), - processors.NewClaimableBalancesTransactionProcessor(s.session, s.historyQ, sequence), - processors.NewLiquidityPoolsTransactionProcessor(s.session, s.historyQ, sequence), - }) + processors.NewParticipantsProcessor(accountLoader, + s.historyQ.NewTransactionParticipantsBatchInsertBuilder(), s.historyQ.NewOperationParticipantBatchInsertBuilder()), + processors.NewTransactionProcessor(txBuilder), + processors.NewClaimableBalancesTransactionProcessor(history.NewClaimableBalanceLoader(), + s.historyQ.NewTransactionClaimableBalanceBatchInsertBuilder(), s.historyQ.NewOperationClaimableBalanceBatchInsertBuilder()), + processors.NewLiquidityPoolsTransactionProcessor(lpLoader, + s.historyQ.NewTransactionLiquidityPoolBatchInsertBuilder(), s.historyQ.NewOperationLiquidityPoolBatchInsertBuilder())} + + return newGroupTransactionProcessors(processors, lazyLoaders) } func (s *ProcessorRunner) buildTransactionFilterer() *groupTransactionFilterers { @@ -164,15 +175,15 @@ func (s *ProcessorRunner) buildTransactionFilterer() *groupTransactionFilterers return newGroupTransactionFilterers(f) } -func (s *ProcessorRunner) buildFilteredOutProcessor(ledger xdr.LedgerHeaderHistoryEntry) *groupTransactionProcessors { +func (s *ProcessorRunner) buildFilteredOutProcessor(txBuilder history.TransactionBatchInsertBuilder) *groupTransactionProcessors { // when in online mode, the submission result processor must always run (regardless of filtering) var p []horizonTransactionProcessor if s.config.EnableIngestionFiltering { - txSubProc := processors.NewTransactionFilteredTmpProcessor(s.session, s.historyQ, uint32(ledger.Header.LedgerSeq)) + txSubProc := processors.NewTransactionFilteredTmpProcessor(txBuilder) p = append(p, txSubProc) } - return newGroupTransactionProcessors(p) + return newGroupTransactionProcessors(p, nil) } // checkIfProtocolVersionSupported checks if this Horizon version supports the @@ -321,16 +332,18 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerClos err = errors.Wrap(err, "Error while checking for supported protocol version") return } - header := transactionReader.GetHeader() + + txBuilder := s.historyQ.NewTransactionBatchInsertBuilder() groupTransactionFilterers := s.buildTransactionFilterer() - groupFilteredOutProcessors := s.buildFilteredOutProcessor(header) + groupFilteredOutProcessors := s.buildFilteredOutProcessor(txBuilder) groupTransactionProcessors := s.buildTransactionProcessor( - &ledgerTransactionStats, &tradeProcessor, header) + &ledgerTransactionStats, &tradeProcessor, ledger, txBuilder) err = processors.StreamLedgerTransactions(s.ctx, groupTransactionFilterers, groupFilteredOutProcessors, groupTransactionProcessors, transactionReader, + ledger, ) if err != nil { err = errors.Wrap(err, "Error streaming changes from ledger") @@ -338,19 +351,14 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerClos } if s.config.EnableIngestionFiltering { - err = groupFilteredOutProcessors.Commit(s.ctx) - if err != nil { - err = errors.Wrap(err, "Error committing filtered changes from processor") - return - } if time.Since(s.lastTransactionsTmpGC) > transactionsFilteredTmpGCPeriod { s.historyQ.DeleteTransactionsFilteredTmpOlderThan(s.ctx, uint64(transactionsFilteredTmpGCPeriod.Seconds())) } } - err = groupTransactionProcessors.Commit(s.ctx) + err = groupTransactionProcessors.Flush(s.ctx, s.session) if err != nil { - err = errors.Wrap(err, "Error committing changes from processor") + err = errors.Wrap(err, "Error flushing changes from processor") return } diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index c01ee53730..3470cb9fec 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -234,17 +234,29 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) { func TestProcessorRunnerBuildTransactionProcessor(t *testing.T) { ctx := context.Background() - maxBatchSize := 100000 q := &mockDBQ{} defer mock.AssertExpectationsForObjects(t, q) + q.On("NewTradeBatchInsertBuilder").Return(&history.MockTradeBatchInsertBuilder{}) + q.MockQLedgers.On("NewLedgerBatchInsertBuilder"). + Return(&history.MockLedgersBatchInsertBuilder{}) + q.MockQEffects.On("NewEffectBatchInsertBuilder"). + Return(&history.MockEffectBatchInsertBuilder{}) q.MockQOperations.On("NewOperationBatchInsertBuilder"). - Return(&history.MockOperationsBatchInsertBuilder{}).Twice() // Twice = with/without failed - q.MockQTransactions.On("NewTransactionBatchInsertBuilder"). - Return(&history.MockTransactionsBatchInsertBuilder{}).Twice() - q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder", maxBatchSize). - Return(&history.MockClaimableBalanceClaimantBatchInsertBuilder{}).Twice() + Return(&history.MockOperationsBatchInsertBuilder{}) + q.On("NewTransactionParticipantsBatchInsertBuilder"). + Return(&history.MockTransactionParticipantsBatchInsertBuilder{}) + q.On("NewOperationParticipantBatchInsertBuilder"). + Return(&history.MockOperationParticipantBatchInsertBuilder{}) + q.MockQHistoryClaimableBalances.On("NewTransactionClaimableBalanceBatchInsertBuilder"). + Return(&history.MockTransactionClaimableBalanceBatchInsertBuilder{}) + q.MockQHistoryClaimableBalances.On("NewOperationClaimableBalanceBatchInsertBuilder"). + Return(&history.MockOperationClaimableBalanceBatchInsertBuilder{}) + q.MockQHistoryLiquidityPools.On("NewTransactionLiquidityPoolBatchInsertBuilder"). + Return(&history.MockTransactionLiquidityPoolBatchInsertBuilder{}) + q.MockQHistoryLiquidityPools.On("NewOperationLiquidityPoolBatchInsertBuilder"). + Return(&history.MockOperationLiquidityPoolBatchInsertBuilder{}) runner := ProcessorRunner{ ctx: ctx, @@ -252,10 +264,15 @@ func TestProcessorRunnerBuildTransactionProcessor(t *testing.T) { historyQ: q, } + txBuilder := &history.MockTransactionsBatchInsertBuilder{} stats := &processors.StatsLedgerTransactionProcessor{} trades := &processors.TradeProcessor{} - ledger := xdr.LedgerHeaderHistoryEntry{} - processor := runner.buildTransactionProcessor(stats, trades, ledger) + ledger := xdr.LedgerCloseMeta{ + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{}, + }, + } + processor := runner.buildTransactionProcessor(stats, trades, ledger, txBuilder) assert.IsType(t, &groupTransactionProcessors{}, processor) assert.IsType(t, &statsLedgerTransactionProcessor{}, processor.processors[0]) @@ -264,7 +281,8 @@ func TestProcessorRunnerBuildTransactionProcessor(t *testing.T) { assert.IsType(t, &processors.OperationProcessor{}, processor.processors[3]) assert.IsType(t, &processors.TradeProcessor{}, processor.processors[4]) assert.IsType(t, &processors.ParticipantsProcessor{}, processor.processors[5]) - assert.IsType(t, &processors.TransactionProcessor{}, processor.processors[6]) + assert.IsType(t, &processors.ClaimableBalancesTransactionProcessor{}, processor.processors[7]) + assert.IsType(t, &processors.LiquidityPoolsTransactionProcessor{}, processor.processors[8]) } func TestProcessorRunnerWithFilterEnabled(t *testing.T) { @@ -304,12 +322,12 @@ func TestProcessorRunnerWithFilterEnabled(t *testing.T) { mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{} defer mock.AssertExpectationsForObjects(t, mockTransactionsBatchInsertBuilder) - mockTransactionsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Twice() + mockTransactionsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) - q.MockQTransactions.On("NewTransactionBatchInsertBuilder"). + q.MockQTransactions.On("NewTransactionFilteredTmpBatchInsertBuilder"). Return(mockTransactionsBatchInsertBuilder) - q.MockQTransactions.On("NewTransactionFilteredTmpBatchInsertBuilder"). + q.MockQTransactions.On("NewTransactionBatchInsertBuilder"). Return(mockTransactionsBatchInsertBuilder) q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder", maxBatchSize). @@ -318,6 +336,45 @@ func TestProcessorRunnerWithFilterEnabled(t *testing.T) { q.On("DeleteTransactionsFilteredTmpOlderThan", ctx, mock.AnythingOfType("uint64")). Return(int64(0), nil) + q.On("NewTradeBatchInsertBuilder").Return(&history.MockTradeBatchInsertBuilder{}) + q.MockQLedgers.On("NewLedgerBatchInsertBuilder"). + Return(&history.MockLedgersBatchInsertBuilder{}) + + mockEffectsBatchInsertBuilder := &history.MockEffectBatchInsertBuilder{} + mockEffectsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() + q.MockQEffects.On("NewEffectBatchInsertBuilder"). + Return(mockEffectsBatchInsertBuilder) + + mockTransactionsParticipantsBatchInsertBuilder := &history.MockTransactionParticipantsBatchInsertBuilder{} + mockTransactionsParticipantsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + q.On("NewTransactionParticipantsBatchInsertBuilder"). + Return(mockTransactionsParticipantsBatchInsertBuilder) + + mockOperationParticipantBatchInsertBuilder := &history.MockOperationParticipantBatchInsertBuilder{} + mockOperationParticipantBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + q.On("NewOperationParticipantBatchInsertBuilder"). + Return(mockOperationParticipantBatchInsertBuilder) + + mockTransactionClaimableBalanceBatchInsertBuilder := &history.MockTransactionClaimableBalanceBatchInsertBuilder{} + mockTransactionClaimableBalanceBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + q.MockQHistoryClaimableBalances.On("NewTransactionClaimableBalanceBatchInsertBuilder"). + Return(mockTransactionClaimableBalanceBatchInsertBuilder) + + mockOperationClaimableBalanceBatchInsertBuilder := &history.MockOperationClaimableBalanceBatchInsertBuilder{} + mockOperationClaimableBalanceBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + q.MockQHistoryClaimableBalances.On("NewOperationClaimableBalanceBatchInsertBuilder"). + Return(mockOperationClaimableBalanceBatchInsertBuilder) + + mockTransactionLiquidityPoolBatchInsertBuilder := &history.MockTransactionLiquidityPoolBatchInsertBuilder{} + mockTransactionLiquidityPoolBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + q.MockQHistoryLiquidityPools.On("NewTransactionLiquidityPoolBatchInsertBuilder"). + Return(mockTransactionLiquidityPoolBatchInsertBuilder) + + mockOperationLiquidityPoolBatchInsertBuilder := &history.MockOperationLiquidityPoolBatchInsertBuilder{} + mockOperationLiquidityPoolBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + q.MockQHistoryLiquidityPools.On("NewOperationLiquidityPoolBatchInsertBuilder"). + Return(mockOperationLiquidityPoolBatchInsertBuilder) + mockBatchInsertBuilder := &history.MockLedgersBatchInsertBuilder{} q.MockQLedgers.On("NewLedgerBatchInsertBuilder").Return(mockBatchInsertBuilder) mockBatchInsertBuilder.On( diff --git a/services/horizon/internal/ingest/processors/change_processors.go b/services/horizon/internal/ingest/processors/change_processors.go index 2e5b126d8f..ee9eb127f1 100644 --- a/services/horizon/internal/ingest/processors/change_processors.go +++ b/services/horizon/internal/ingest/processors/change_processors.go @@ -8,63 +8,6 @@ import ( "github.com/stellar/go/support/errors" ) -type ChangeProcessor interface { - ProcessChange(ctx context.Context, change ingest.Change) error -} - -type LedgerTransactionProcessor interface { - ProcessTransaction(ctx context.Context, transaction ingest.LedgerTransaction) error -} - -type LedgerTransactionFilterer interface { - FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error) -} - -func StreamLedgerTransactions( - ctx context.Context, - txFilterer LedgerTransactionFilterer, - filteredTxProcessor LedgerTransactionProcessor, - txProcessor LedgerTransactionProcessor, - reader *ingest.LedgerTransactionReader, -) error { - for { - tx, err := reader.Read() - if err == io.EOF { - return nil - } - if err != nil { - return errors.Wrap(err, "could not read transaction") - } - include, err := txFilterer.FilterTransaction(ctx, tx) - if err != nil { - return errors.Wrapf( - err, - "could not filter transaction %v", - tx.Index, - ) - } - if !include { - if err = filteredTxProcessor.ProcessTransaction(ctx, tx); err != nil { - return errors.Wrapf( - err, - "could not process transaction %v", - tx.Index, - ) - } - log.Debugf("Filters did not find match on transaction, dropping this tx with hash %v", tx.Result.TransactionHash.HexString()) - continue - } - - if err = txProcessor.ProcessTransaction(ctx, tx); err != nil { - return errors.Wrapf( - err, - "could not process transaction %v", - tx.Index, - ) - } - } -} - func StreamChanges( ctx context.Context, changeProcessor ChangeProcessor, diff --git a/services/horizon/internal/ingest/processors/main.go b/services/horizon/internal/ingest/processors/main.go index 5088dd97aa..94f83f3fa9 100644 --- a/services/horizon/internal/ingest/processors/main.go +++ b/services/horizon/internal/ingest/processors/main.go @@ -1,7 +1,13 @@ package processors import ( + "context" + "io" + "github.com/guregu/null" + "github.com/stellar/go/ingest" + "github.com/stellar/go/support/db" + "github.com/stellar/go/support/errors" logpkg "github.com/stellar/go/support/log" "github.com/stellar/go/xdr" ) @@ -10,6 +16,65 @@ var log = logpkg.DefaultLogger.WithField("service", "ingest") const maxBatchSize = 100000 +type ChangeProcessor interface { + ProcessChange(ctx context.Context, change ingest.Change) error +} + +type LedgerTransactionProcessor interface { + ProcessTransaction(lcm xdr.LedgerCloseMeta, transaction ingest.LedgerTransaction) error + Flush(ctx context.Context, session db.SessionInterface) error +} + +type LedgerTransactionFilterer interface { + FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error) +} + +func StreamLedgerTransactions( + ctx context.Context, + txFilterer LedgerTransactionFilterer, + filteredTxProcessor LedgerTransactionProcessor, + txProcessor LedgerTransactionProcessor, + reader *ingest.LedgerTransactionReader, + ledger xdr.LedgerCloseMeta, +) error { + for { + tx, err := reader.Read() + if err == io.EOF { + return nil + } + if err != nil { + return errors.Wrap(err, "could not read transaction") + } + include, err := txFilterer.FilterTransaction(ctx, tx) + if err != nil { + return errors.Wrapf( + err, + "could not filter transaction %v", + tx.Index, + ) + } + if !include { + if err = filteredTxProcessor.ProcessTransaction(ledger, tx); err != nil { + return errors.Wrapf( + err, + "could not process transaction %v", + tx.Index, + ) + } + log.Debugf("Filters did not find match on transaction, dropping this tx with hash %v", tx.Result.TransactionHash.HexString()) + continue + } + + if err = txProcessor.ProcessTransaction(ledger, tx); err != nil { + return errors.Wrapf( + err, + "could not process transaction %v", + tx.Index, + ) + } + } +} + func ledgerEntrySponsorToNullString(entry xdr.LedgerEntry) null.String { sponsoringID := entry.SponsoringID() diff --git a/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor.go b/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor.go index b9585d4802..26118b11c4 100644 --- a/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor.go +++ b/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/stellar/go/ingest" + "github.com/stellar/go/support/db" "github.com/stellar/go/xdr" ) @@ -51,7 +52,11 @@ type StatsLedgerTransactionProcessorResults struct { OperationsLiquidityPoolWithdraw int64 } -func (p *StatsLedgerTransactionProcessor) ProcessTransaction(ctx context.Context, transaction ingest.LedgerTransaction) error { +func (p *StatsLedgerTransactionProcessor) Flush(ctx context.Context, session db.SessionInterface) error { + return nil +} + +func (p *StatsLedgerTransactionProcessor) ProcessTransaction(lcm xdr.LedgerCloseMeta, transaction ingest.LedgerTransaction) error { p.results.Transactions++ ops := int64(len(transaction.Envelope.Operations())) p.results.Operations += ops diff --git a/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor_test.go b/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor_test.go index f2bc2a5040..c7fc6d7967 100644 --- a/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor_test.go +++ b/services/horizon/internal/ingest/processors/stats_ledger_transaction_processor_test.go @@ -1,7 +1,6 @@ package processors import ( - "context" "testing" "github.com/stretchr/testify/assert" @@ -23,12 +22,14 @@ func TestStatsLedgerTransactionProcessoAllOpTypesCovered(t *testing.T) { }, }, } + lcm := xdr.LedgerCloseMeta{} + for typ, s := range xdr.OperationTypeToStringMap { tx := txTemplate txTemplate.Envelope.V1.Tx.Operations[0].Body.Type = xdr.OperationType(typ) f := func() { var p StatsLedgerTransactionProcessor - p.ProcessTransaction(context.Background(), tx) + p.ProcessTransaction(lcm, tx) } assert.NotPanics(t, f, s) } @@ -38,16 +39,17 @@ func TestStatsLedgerTransactionProcessoAllOpTypesCovered(t *testing.T) { txTemplate.Envelope.V1.Tx.Operations[0].Body.Type = 20000 f := func() { var p StatsLedgerTransactionProcessor - p.ProcessTransaction(context.Background(), tx) + p.ProcessTransaction(lcm, tx) } assert.Panics(t, f) } func TestStatsLedgerTransactionProcessor(t *testing.T) { processor := &StatsLedgerTransactionProcessor{} + lcm := xdr.LedgerCloseMeta{} // Successful - assert.NoError(t, processor.ProcessTransaction(context.Background(), ingest.LedgerTransaction{ + assert.NoError(t, processor.ProcessTransaction(lcm, ingest.LedgerTransaction{ Result: xdr.TransactionResultPair{ Result: xdr.TransactionResult{ Result: xdr.TransactionResultResult{ @@ -88,7 +90,7 @@ func TestStatsLedgerTransactionProcessor(t *testing.T) { })) // Failed - assert.NoError(t, processor.ProcessTransaction(context.Background(), ingest.LedgerTransaction{ + assert.NoError(t, processor.ProcessTransaction(lcm, ingest.LedgerTransaction{ Result: xdr.TransactionResultPair{ Result: xdr.TransactionResult{ Result: xdr.TransactionResultResult{ From c1d5ce7871cccc42eed0fb2f410d3ef9caa3c907 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Tue, 17 Oct 2023 09:29:08 -0700 Subject: [PATCH 02/14] #4909: working test results with byte arrays in fast batch builder copy statements --- .../internal/actions/transaction_test.go | 1 + .../internal/db2/history/fee_bump_scenario.go | 8 +- .../internal/ingest/group_processors_test.go | 2 +- .../internal/ingest/processor_runner.go | 17 +- .../internal/ingest/processor_runner_test.go | 164 ++++++++---------- support/db/batch_insert_builder_test.go | 1 + support/db/fast_batch_insert_builder_test.go | 9 +- support/db/internal_test.go | 1 + 8 files changed, 102 insertions(+), 101 deletions(-) diff --git a/services/horizon/internal/actions/transaction_test.go b/services/horizon/internal/actions/transaction_test.go index e029edef3a..b76cf1b0bf 100644 --- a/services/horizon/internal/actions/transaction_test.go +++ b/services/horizon/internal/actions/transaction_test.go @@ -149,6 +149,7 @@ func checkOuterHashResponse( } func TestFeeBumpTransactionPage(t *testing.T) { + tt := test.Start(t) defer tt.Finish() test.ResetHorizonDB(t, tt.HorizonDB) diff --git a/services/horizon/internal/db2/history/fee_bump_scenario.go b/services/horizon/internal/db2/history/fee_bump_scenario.go index 5d155ac5e8..be7a21ef14 100644 --- a/services/horizon/internal/db2/history/fee_bump_scenario.go +++ b/services/horizon/internal/db2/history/fee_bump_scenario.go @@ -4,6 +4,7 @@ import ( "context" "encoding/hex" "encoding/json" + "fmt" "testing" "time" @@ -269,6 +270,8 @@ func FeeBumpScenario(tt *test.T, q *Q, successful bool) FeeBumpFixture { details, err := json.Marshal(map[string]string{ "bump_to": "98", }) + + fmt.Print(string(details)) tt.Assert.NoError(err) tt.Assert.NoError(opBuilder.Add( @@ -296,9 +299,10 @@ func FeeBumpScenario(tt *test.T, q *Q, successful bool) FeeBumpFixture { EffectSequenceBumped, details, ) + tt.Assert.NoError(err) - tt.Assert.NoError(accountLoader.Exec(ctx, q)) - tt.Assert.NoError(effectBuilder.Exec(ctx, q)) + tt.Assert.NoError(accountLoader.Exec(ctx, q.SessionInterface)) + tt.Assert.NoError(effectBuilder.Exec(ctx, q.SessionInterface)) tt.Assert.NoError(q.Commit()) diff --git a/services/horizon/internal/ingest/group_processors_test.go b/services/horizon/internal/ingest/group_processors_test.go index 80d43a0d01..73d4f56f3f 100644 --- a/services/horizon/internal/ingest/group_processors_test.go +++ b/services/horizon/internal/ingest/group_processors_test.go @@ -43,7 +43,7 @@ func (m *mockHorizonTransactionProcessor) ProcessTransaction(lcm xdr.LedgerClose } func (m *mockHorizonTransactionProcessor) Flush(ctx context.Context, session db.SessionInterface) error { - args := m.Called(ctx) + args := m.Called(ctx, session) return args.Error(0) } diff --git a/services/horizon/internal/ingest/processor_runner.go b/services/horizon/internal/ingest/processor_runner.go index 28efe53a6e..a95f88cf6f 100644 --- a/services/horizon/internal/ingest/processor_runner.go +++ b/services/horizon/internal/ingest/processor_runner.go @@ -135,7 +135,6 @@ func (s *ProcessorRunner) buildTransactionProcessor( ledgerTransactionStats *processors.StatsLedgerTransactionProcessor, tradeProcessor *processors.TradeProcessor, ledger xdr.LedgerCloseMeta, - txBuilder history.TransactionBatchInsertBuilder, ) *groupTransactionProcessors { accountLoader := history.NewAccountLoader() assetLoader := history.NewAssetLoader() @@ -157,7 +156,7 @@ func (s *ProcessorRunner) buildTransactionProcessor( tradeProcessor, processors.NewParticipantsProcessor(accountLoader, s.historyQ.NewTransactionParticipantsBatchInsertBuilder(), s.historyQ.NewOperationParticipantBatchInsertBuilder()), - processors.NewTransactionProcessor(txBuilder), + processors.NewTransactionProcessor(s.historyQ.NewTransactionBatchInsertBuilder()), processors.NewClaimableBalancesTransactionProcessor(history.NewClaimableBalanceLoader(), s.historyQ.NewTransactionClaimableBalanceBatchInsertBuilder(), s.historyQ.NewOperationClaimableBalanceBatchInsertBuilder()), processors.NewLiquidityPoolsTransactionProcessor(lpLoader, @@ -175,11 +174,11 @@ func (s *ProcessorRunner) buildTransactionFilterer() *groupTransactionFilterers return newGroupTransactionFilterers(f) } -func (s *ProcessorRunner) buildFilteredOutProcessor(txBuilder history.TransactionBatchInsertBuilder) *groupTransactionProcessors { +func (s *ProcessorRunner) buildFilteredOutProcessor() *groupTransactionProcessors { // when in online mode, the submission result processor must always run (regardless of filtering) var p []horizonTransactionProcessor if s.config.EnableIngestionFiltering { - txSubProc := processors.NewTransactionFilteredTmpProcessor(txBuilder) + txSubProc := processors.NewTransactionFilteredTmpProcessor(s.historyQ.NewTransactionFilteredTmpBatchInsertBuilder()) p = append(p, txSubProc) } @@ -333,11 +332,10 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerClos return } - txBuilder := s.historyQ.NewTransactionBatchInsertBuilder() groupTransactionFilterers := s.buildTransactionFilterer() - groupFilteredOutProcessors := s.buildFilteredOutProcessor(txBuilder) + groupFilteredOutProcessors := s.buildFilteredOutProcessor() groupTransactionProcessors := s.buildTransactionProcessor( - &ledgerTransactionStats, &tradeProcessor, ledger, txBuilder) + &ledgerTransactionStats, &tradeProcessor, ledger) err = processors.StreamLedgerTransactions(s.ctx, groupTransactionFilterers, groupFilteredOutProcessors, @@ -351,6 +349,11 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerClos } if s.config.EnableIngestionFiltering { + err = groupFilteredOutProcessors.Flush(s.ctx, s.session) + if err != nil { + err = errors.Wrap(err, "Error flushing temp filtered tx from processor") + return + } if time.Since(s.lastTransactionsTmpGC) > transactionsFilteredTmpGCPeriod { s.historyQ.DeleteTransactionsFilteredTmpOlderThan(s.ctx, uint64(transactionsFilteredTmpGCPeriod.Seconds())) } diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index 3470cb9fec..326ad37583 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -238,6 +238,8 @@ func TestProcessorRunnerBuildTransactionProcessor(t *testing.T) { q := &mockDBQ{} defer mock.AssertExpectationsForObjects(t, q) + q.MockQTransactions.On("NewTransactionBatchInsertBuilder"). + Return(&history.MockTransactionsBatchInsertBuilder{}) q.On("NewTradeBatchInsertBuilder").Return(&history.MockTradeBatchInsertBuilder{}) q.MockQLedgers.On("NewLedgerBatchInsertBuilder"). Return(&history.MockLedgersBatchInsertBuilder{}) @@ -264,7 +266,6 @@ func TestProcessorRunnerBuildTransactionProcessor(t *testing.T) { historyQ: q, } - txBuilder := &history.MockTransactionsBatchInsertBuilder{} stats := &processors.StatsLedgerTransactionProcessor{} trades := &processors.TradeProcessor{} ledger := xdr.LedgerCloseMeta{ @@ -272,7 +273,7 @@ func TestProcessorRunnerBuildTransactionProcessor(t *testing.T) { LedgerHeader: xdr.LedgerHeaderHistoryEntry{}, }, } - processor := runner.buildTransactionProcessor(stats, trades, ledger, txBuilder) + processor := runner.buildTransactionProcessor(stats, trades, ledger) assert.IsType(t, &groupTransactionProcessors{}, processor) assert.IsType(t, &statsLedgerTransactionProcessor{}, processor.processors[0]) @@ -309,71 +310,16 @@ func TestProcessorRunnerWithFilterEnabled(t *testing.T) { } // Batches - mockAccountSignersBatchInsertBuilder := &history.MockAccountSignersBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockAccountSignersBatchInsertBuilder) - q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize). - Return(mockAccountSignersBatchInsertBuilder).Once() - - mockOperationsBatchInsertBuilder := &history.MockOperationsBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockOperationsBatchInsertBuilder) - mockOperationsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() - q.MockQOperations.On("NewOperationBatchInsertBuilder"). - Return(mockOperationsBatchInsertBuilder).Twice() - - mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockTransactionsBatchInsertBuilder) - mockTransactionsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) - + mockTransactionsFilteredTmpBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{} + defer mock.AssertExpectationsForObjects(t, mockTransactionsFilteredTmpBatchInsertBuilder) + mockTransactionsFilteredTmpBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() q.MockQTransactions.On("NewTransactionFilteredTmpBatchInsertBuilder"). - Return(mockTransactionsBatchInsertBuilder) - - q.MockQTransactions.On("NewTransactionBatchInsertBuilder"). - Return(mockTransactionsBatchInsertBuilder) - - q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder", maxBatchSize). - Return(&history.MockClaimableBalanceClaimantBatchInsertBuilder{}).Once() + Return(mockTransactionsFilteredTmpBatchInsertBuilder) q.On("DeleteTransactionsFilteredTmpOlderThan", ctx, mock.AnythingOfType("uint64")). Return(int64(0), nil) - q.On("NewTradeBatchInsertBuilder").Return(&history.MockTradeBatchInsertBuilder{}) - q.MockQLedgers.On("NewLedgerBatchInsertBuilder"). - Return(&history.MockLedgersBatchInsertBuilder{}) - - mockEffectsBatchInsertBuilder := &history.MockEffectBatchInsertBuilder{} - mockEffectsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() - q.MockQEffects.On("NewEffectBatchInsertBuilder"). - Return(mockEffectsBatchInsertBuilder) - - mockTransactionsParticipantsBatchInsertBuilder := &history.MockTransactionParticipantsBatchInsertBuilder{} - mockTransactionsParticipantsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) - q.On("NewTransactionParticipantsBatchInsertBuilder"). - Return(mockTransactionsParticipantsBatchInsertBuilder) - - mockOperationParticipantBatchInsertBuilder := &history.MockOperationParticipantBatchInsertBuilder{} - mockOperationParticipantBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) - q.On("NewOperationParticipantBatchInsertBuilder"). - Return(mockOperationParticipantBatchInsertBuilder) - - mockTransactionClaimableBalanceBatchInsertBuilder := &history.MockTransactionClaimableBalanceBatchInsertBuilder{} - mockTransactionClaimableBalanceBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) - q.MockQHistoryClaimableBalances.On("NewTransactionClaimableBalanceBatchInsertBuilder"). - Return(mockTransactionClaimableBalanceBatchInsertBuilder) - - mockOperationClaimableBalanceBatchInsertBuilder := &history.MockOperationClaimableBalanceBatchInsertBuilder{} - mockOperationClaimableBalanceBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) - q.MockQHistoryClaimableBalances.On("NewOperationClaimableBalanceBatchInsertBuilder"). - Return(mockOperationClaimableBalanceBatchInsertBuilder) - - mockTransactionLiquidityPoolBatchInsertBuilder := &history.MockTransactionLiquidityPoolBatchInsertBuilder{} - mockTransactionLiquidityPoolBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) - q.MockQHistoryLiquidityPools.On("NewTransactionLiquidityPoolBatchInsertBuilder"). - Return(mockTransactionLiquidityPoolBatchInsertBuilder) - - mockOperationLiquidityPoolBatchInsertBuilder := &history.MockOperationLiquidityPoolBatchInsertBuilder{} - mockOperationLiquidityPoolBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) - q.MockQHistoryLiquidityPools.On("NewOperationLiquidityPoolBatchInsertBuilder"). - Return(mockOperationLiquidityPoolBatchInsertBuilder) + defer mock.AssertExpectationsForObjects(t, mockBatchBuilders(q, mockSession, ctx, maxBatchSize)...) mockBatchInsertBuilder := &history.MockLedgersBatchInsertBuilder{} q.MockQLedgers.On("NewLedgerBatchInsertBuilder").Return(mockBatchInsertBuilder) @@ -421,25 +367,7 @@ func TestProcessorRunnerRunAllProcessorsOnLedger(t *testing.T) { } // Batches - mockAccountSignersBatchInsertBuilder := &history.MockAccountSignersBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockAccountSignersBatchInsertBuilder) - q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize). - Return(mockAccountSignersBatchInsertBuilder).Once() - - mockOperationsBatchInsertBuilder := &history.MockOperationsBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockOperationsBatchInsertBuilder) - mockOperationsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() - q.MockQOperations.On("NewOperationBatchInsertBuilder"). - Return(mockOperationsBatchInsertBuilder).Twice() - - mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockTransactionsBatchInsertBuilder) - mockTransactionsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() - q.MockQTransactions.On("NewTransactionBatchInsertBuilder"). - Return(mockTransactionsBatchInsertBuilder).Twice() - - q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder", maxBatchSize). - Return(&history.MockClaimableBalanceClaimantBatchInsertBuilder{}).Once() + defer mock.AssertExpectationsForObjects(t, mockBatchBuilders(q, mockSession, ctx, maxBatchSize)...) mockBatchInsertBuilder := &history.MockLedgersBatchInsertBuilder{} q.MockQLedgers.On("NewLedgerBatchInsertBuilder").Return(mockBatchInsertBuilder) @@ -486,21 +414,21 @@ func TestProcessorRunnerRunAllProcessorsOnLedgerProtocolVersionNotSupported(t *t } // Batches + mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{} + q.MockQTransactions.On("NewTransactionBatchInsertBuilder", maxBatchSize). + Return(mockTransactionsBatchInsertBuilder).Twice() mockAccountSignersBatchInsertBuilder := &history.MockAccountSignersBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockAccountSignersBatchInsertBuilder) q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize). Return(mockAccountSignersBatchInsertBuilder).Once() mockOperationsBatchInsertBuilder := &history.MockOperationsBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockOperationsBatchInsertBuilder) - q.MockQOperations.On("NewOperationBatchInsertBuilder", maxBatchSize). + q.MockQOperations.On("NewOperationBatchInsertBuilder"). Return(mockOperationsBatchInsertBuilder).Twice() - mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockTransactionsBatchInsertBuilder) - q.MockQTransactions.On("NewTransactionBatchInsertBuilder", maxBatchSize). - Return(mockTransactionsBatchInsertBuilder).Twice() + defer mock.AssertExpectationsForObjects(t, mockTransactionsBatchInsertBuilder, + mockAccountSignersBatchInsertBuilder, + mockOperationsBatchInsertBuilder) runner := ProcessorRunner{ ctx: ctx, @@ -517,3 +445,63 @@ func TestProcessorRunnerRunAllProcessorsOnLedgerProtocolVersionNotSupported(t *t ), ) } + +func mockBatchBuilders(q *mockDBQ, mockSession *db.MockSession, ctx context.Context, maxBatchSize int) []interface{} { + mockTransactionsBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{} + mockTransactionsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() + q.MockQTransactions.On("NewTransactionBatchInsertBuilder"). + Return(mockTransactionsBatchInsertBuilder) + + mockAccountSignersBatchInsertBuilder := &history.MockAccountSignersBatchInsertBuilder{} + q.MockQSigners.On("NewAccountSignersBatchInsertBuilder", maxBatchSize). + Return(mockAccountSignersBatchInsertBuilder).Once() + + mockOperationsBatchInsertBuilder := &history.MockOperationsBatchInsertBuilder{} + mockOperationsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() + q.MockQOperations.On("NewOperationBatchInsertBuilder"). + Return(mockOperationsBatchInsertBuilder).Twice() + + mockEffectBatchInsertBuilder := &history.MockEffectBatchInsertBuilder{} + mockEffectBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() + q.MockQEffects.On("NewEffectBatchInsertBuilder"). + Return(mockEffectBatchInsertBuilder) + + mockTransactionsParticipantsBatchInsertBuilder := &history.MockTransactionParticipantsBatchInsertBuilder{} + mockTransactionsParticipantsBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + q.On("NewTransactionParticipantsBatchInsertBuilder"). + Return(mockTransactionsParticipantsBatchInsertBuilder) + + mockOperationParticipantBatchInsertBuilder := &history.MockOperationParticipantBatchInsertBuilder{} + mockOperationParticipantBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + q.On("NewOperationParticipantBatchInsertBuilder"). + Return(mockOperationParticipantBatchInsertBuilder) + + mockTransactionClaimableBalanceBatchInsertBuilder := &history.MockTransactionClaimableBalanceBatchInsertBuilder{} + mockTransactionClaimableBalanceBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + q.MockQHistoryClaimableBalances.On("NewTransactionClaimableBalanceBatchInsertBuilder"). + Return(mockTransactionClaimableBalanceBatchInsertBuilder) + + mockOperationClaimableBalanceBatchInsertBuilder := &history.MockOperationClaimableBalanceBatchInsertBuilder{} + mockOperationClaimableBalanceBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + q.MockQHistoryClaimableBalances.On("NewOperationClaimableBalanceBatchInsertBuilder"). + Return(mockOperationClaimableBalanceBatchInsertBuilder) + + mockTransactionLiquidityPoolBatchInsertBuilder := &history.MockTransactionLiquidityPoolBatchInsertBuilder{} + mockTransactionLiquidityPoolBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + q.MockQHistoryLiquidityPools.On("NewTransactionLiquidityPoolBatchInsertBuilder"). + Return(mockTransactionLiquidityPoolBatchInsertBuilder) + + mockOperationLiquidityPoolBatchInsertBuilder := &history.MockOperationLiquidityPoolBatchInsertBuilder{} + mockOperationLiquidityPoolBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil) + q.MockQHistoryLiquidityPools.On("NewOperationLiquidityPoolBatchInsertBuilder"). + Return(mockOperationLiquidityPoolBatchInsertBuilder) + + q.MockQClaimableBalances.On("NewClaimableBalanceClaimantBatchInsertBuilder", maxBatchSize). + Return(&history.MockClaimableBalanceClaimantBatchInsertBuilder{}).Once() + + q.On("NewTradeBatchInsertBuilder").Return(&history.MockTradeBatchInsertBuilder{}) + + return []interface{}{mockAccountSignersBatchInsertBuilder, + mockOperationsBatchInsertBuilder, + mockTransactionsBatchInsertBuilder} +} diff --git a/support/db/batch_insert_builder_test.go b/support/db/batch_insert_builder_test.go index e283e8bf57..e0d28e145d 100644 --- a/support/db/batch_insert_builder_test.go +++ b/support/db/batch_insert_builder_test.go @@ -13,6 +13,7 @@ import ( type hungerRow struct { Name string `db:"name"` HungerLevel string `db:"hunger_level"` + JsonValue []byte `db:"json_value"` } type invalidHungerRow struct { diff --git a/support/db/fast_batch_insert_builder_test.go b/support/db/fast_batch_insert_builder_test.go index c31f502735..2d55c8446a 100644 --- a/support/db/fast_batch_insert_builder_test.go +++ b/support/db/fast_batch_insert_builder_test.go @@ -21,6 +21,7 @@ func TestFastBatchInsertBuilder(t *testing.T) { insertBuilder.Row(map[string]interface{}{ "name": "bubba", "hunger_level": "1", + "json_value": []byte(`{\"bump_to\":\"98\"}`), }), ) @@ -28,13 +29,14 @@ func TestFastBatchInsertBuilder(t *testing.T) { insertBuilder.Row(map[string]interface{}{ "name": "bubba", }), - "invalid number of columns (expected=2, actual=1)", + "invalid number of columns (expected=3, actual=1)", ) assert.EqualError(t, insertBuilder.Row(map[string]interface{}{ - "name": "bubba", - "city": "London", + "name": "bubba", + "city": "London", + "json_value": []byte(`{\"bump_to\":\"98\"}`), }), "column \"hunger_level\" does not exist", ) @@ -43,6 +45,7 @@ func TestFastBatchInsertBuilder(t *testing.T) { insertBuilder.RowStruct(hungerRow{ Name: "bubba2", HungerLevel: "9", + JsonValue: []byte(`{\"bump_to\":\"98\"}`), }), ) diff --git a/support/db/internal_test.go b/support/db/internal_test.go index 8ce0370a92..8884bb62c6 100644 --- a/support/db/internal_test.go +++ b/support/db/internal_test.go @@ -7,6 +7,7 @@ const testSchema = ` CREATE TABLE IF NOT EXISTS people ( name character varying NOT NULL, hunger_level integer NOT NULL, + json_value jsonb, PRIMARY KEY (name) ); DELETE FROM people; From 30ed2733d591124dc5a7ada7dfff38c72223296c Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Tue, 17 Oct 2023 14:33:53 -0700 Subject: [PATCH 03/14] #4909: fixed json column tests on fast batch builder --- support/db/fast_batch_insert_builder_test.go | 15 ++++++++------- support/db/main_test.go | 9 +++++---- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/support/db/fast_batch_insert_builder_test.go b/support/db/fast_batch_insert_builder_test.go index 2d55c8446a..bfd2f8407b 100644 --- a/support/db/fast_batch_insert_builder_test.go +++ b/support/db/fast_batch_insert_builder_test.go @@ -4,6 +4,7 @@ import ( "context" "testing" + "github.com/guregu/null" "github.com/stretchr/testify/assert" "github.com/stellar/go/support/db/dbtest" @@ -21,7 +22,7 @@ func TestFastBatchInsertBuilder(t *testing.T) { insertBuilder.Row(map[string]interface{}{ "name": "bubba", "hunger_level": "1", - "json_value": []byte(`{\"bump_to\":\"98\"}`), + "json_value": []byte(`{"bump_to": "97"}`), }), ) @@ -36,7 +37,7 @@ func TestFastBatchInsertBuilder(t *testing.T) { insertBuilder.Row(map[string]interface{}{ "name": "bubba", "city": "London", - "json_value": []byte(`{\"bump_to\":\"98\"}`), + "json_value": []byte(`{"bump_to": "98"}`), }), "column \"hunger_level\" does not exist", ) @@ -45,7 +46,7 @@ func TestFastBatchInsertBuilder(t *testing.T) { insertBuilder.RowStruct(hungerRow{ Name: "bubba2", HungerLevel: "9", - JsonValue: []byte(`{\"bump_to\":\"98\"}`), + JsonValue: []byte(`{"bump_to": "98"}`), }), ) @@ -77,8 +78,8 @@ func TestFastBatchInsertBuilder(t *testing.T) { t, found, []person{ - {Name: "bubba", HungerLevel: "1"}, - {Name: "bubba2", HungerLevel: "9"}, + {Name: "bubba", HungerLevel: "1", JsonValue: null.NewString(`{"bump_to": "97"}`, true)}, + {Name: "bubba2", HungerLevel: "9", JsonValue: null.NewString(`{"bump_to": "98"}`, true)}, }, ) @@ -119,8 +120,8 @@ func TestFastBatchInsertBuilder(t *testing.T) { t, found, []person{ - {Name: "bubba", HungerLevel: "1"}, - {Name: "bubba2", HungerLevel: "9"}, + {Name: "bubba", HungerLevel: "1", JsonValue: null.NewString(`{"bump_to": "97"}`, true)}, + {Name: "bubba2", HungerLevel: "9", JsonValue: null.NewString(`{"bump_to": "98"}`, true)}, }, ) assert.NoError(t, sess.Rollback()) diff --git a/support/db/main_test.go b/support/db/main_test.go index 68724d197d..301b533aa4 100644 --- a/support/db/main_test.go +++ b/support/db/main_test.go @@ -4,15 +4,16 @@ import ( "testing" "time" + "github.com/guregu/null" "github.com/stellar/go/support/db/dbtest" "github.com/stretchr/testify/assert" ) type person struct { - Name string `db:"name"` - HungerLevel string `db:"hunger_level"` - - SomethingIgnored int `db:"-"` + Name string `db:"name"` + HungerLevel string `db:"hunger_level"` + JsonValue null.String `db:"json_value"` + SomethingIgnored int `db:"-"` } func TestGetTable(t *testing.T) { From 1efb7a3c376e0ba79341a475f3d36c99e7c2e1df Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Wed, 18 Oct 2023 21:59:17 -0700 Subject: [PATCH 04/14] #4909: run ledger processor regardless of whether a ledger has 0 or more transactions --- services/horizon/docker/docker-compose.yml | 2 +- .../internal/ingest/processor_runner.go | 19 ++++++++++++------- .../internal/ingest/processor_runner_test.go | 5 +++-- .../ingest/processors/ledgers_processor.go | 14 +++++++++----- 4 files changed, 25 insertions(+), 15 deletions(-) diff --git a/services/horizon/docker/docker-compose.yml b/services/horizon/docker/docker-compose.yml index 40bced6677..377e26b0b4 100644 --- a/services/horizon/docker/docker-compose.yml +++ b/services/horizon/docker/docker-compose.yml @@ -1,7 +1,7 @@ version: '3' services: horizon-postgres: - image: postgres:9.6.17-alpine + image: postgres:postgres:12-bullseye restart: on-failure environment: - POSTGRES_HOST_AUTH_METHOD=trust diff --git a/services/horizon/internal/ingest/processor_runner.go b/services/horizon/internal/ingest/processor_runner.go index a95f88cf6f..4e360b7769 100644 --- a/services/horizon/internal/ingest/processor_runner.go +++ b/services/horizon/internal/ingest/processor_runner.go @@ -134,6 +134,7 @@ func buildChangeProcessor( func (s *ProcessorRunner) buildTransactionProcessor( ledgerTransactionStats *processors.StatsLedgerTransactionProcessor, tradeProcessor *processors.TradeProcessor, + ledgersProcessor *processors.LedgersProcessor, ledger xdr.LedgerCloseMeta, ) *groupTransactionProcessors { accountLoader := history.NewAccountLoader() @@ -151,7 +152,7 @@ func (s *ProcessorRunner) buildTransactionProcessor( processors := []horizonTransactionProcessor{ statsLedgerTransactionProcessor, processors.NewEffectProcessor(accountLoader, s.historyQ.NewEffectBatchInsertBuilder()), - processors.NewLedgerProcessor(s.historyQ.NewLedgerBatchInsertBuilder(), int(ledger.LedgerHeaderHistoryEntry().Header.LedgerVersion)), + ledgersProcessor, processors.NewOperationProcessor(s.historyQ.NewOperationBatchInsertBuilder()), tradeProcessor, processors.NewParticipantsProcessor(accountLoader, @@ -321,21 +322,25 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerClos transactionReader *ingest.LedgerTransactionReader ) - transactionReader, err = ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(s.config.NetworkPassphrase, ledger) - if err != nil { - err = errors.Wrap(err, "Error creating ledger reader") + if err = s.checkIfProtocolVersionSupported(ledger.ProtocolVersion()); err != nil { + err = errors.Wrap(err, "Error while checking for supported protocol version") return } - if err = s.checkIfProtocolVersionSupported(ledger.ProtocolVersion()); err != nil { - err = errors.Wrap(err, "Error while checking for supported protocol version") + // ensure capture of the ledger to history regardless of whether it has transactions. + ledgersProcessor := processors.NewLedgerProcessor(s.historyQ.NewLedgerBatchInsertBuilder(), int(ledger.LedgerHeaderHistoryEntry().Header.LedgerVersion)) + ledgersProcessor.ProcessLedger(ledger) + + transactionReader, err = ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(s.config.NetworkPassphrase, ledger) + if err != nil { + err = errors.Wrap(err, "Error creating ledger reader") return } groupTransactionFilterers := s.buildTransactionFilterer() groupFilteredOutProcessors := s.buildFilteredOutProcessor() groupTransactionProcessors := s.buildTransactionProcessor( - &ledgerTransactionStats, &tradeProcessor, ledger) + &ledgerTransactionStats, &tradeProcessor, ledgersProcessor, ledger) err = processors.StreamLedgerTransactions(s.ctx, groupTransactionFilterers, groupFilteredOutProcessors, diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index 326ad37583..689f08a285 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -273,9 +273,10 @@ func TestProcessorRunnerBuildTransactionProcessor(t *testing.T) { LedgerHeader: xdr.LedgerHeaderHistoryEntry{}, }, } - processor := runner.buildTransactionProcessor(stats, trades, ledger) - assert.IsType(t, &groupTransactionProcessors{}, processor) + ledgersProcessor := &processors.LedgersProcessor{} + processor := runner.buildTransactionProcessor(stats, trades, ledgersProcessor, ledger) + assert.IsType(t, &groupTransactionProcessors{}, processor) assert.IsType(t, &statsLedgerTransactionProcessor{}, processor.processors[0]) assert.IsType(t, &processors.EffectProcessor{}, processor.processors[1]) assert.IsType(t, &processors.LedgersProcessor{}, processor.processors[2]) diff --git a/services/horizon/internal/ingest/processors/ledgers_processor.go b/services/horizon/internal/ingest/processors/ledgers_processor.go index 1f14cc5518..085d97b9a5 100644 --- a/services/horizon/internal/ingest/processors/ledgers_processor.go +++ b/services/horizon/internal/ingest/processors/ledgers_processor.go @@ -10,7 +10,7 @@ import ( "github.com/stellar/go/xdr" ) -type ledgerInfo struct { +type LedgerInfo struct { header xdr.LedgerHeaderHistoryEntry successTxCount int failedTxCount int @@ -20,26 +20,30 @@ type ledgerInfo struct { type LedgersProcessor struct { batch history.LedgerBatchInsertBuilder - ledgers map[uint32]*ledgerInfo + ledgers map[uint32]*LedgerInfo ingestVersion int } func NewLedgerProcessor(batch history.LedgerBatchInsertBuilder, ingestVersion int) *LedgersProcessor { return &LedgersProcessor{ batch: batch, - ledgers: map[uint32]*ledgerInfo{}, + ledgers: map[uint32]*LedgerInfo{}, ingestVersion: ingestVersion, } } -func (p *LedgersProcessor) ProcessTransaction(lcm xdr.LedgerCloseMeta, transaction ingest.LedgerTransaction) error { +func (p *LedgersProcessor) ProcessLedger(lcm xdr.LedgerCloseMeta) *LedgerInfo { sequence := lcm.LedgerSequence() entry, ok := p.ledgers[sequence] if !ok { - entry = &ledgerInfo{header: lcm.LedgerHeaderHistoryEntry()} + entry = &LedgerInfo{header: lcm.LedgerHeaderHistoryEntry()} p.ledgers[sequence] = entry } + return entry +} +func (p *LedgersProcessor) ProcessTransaction(lcm xdr.LedgerCloseMeta, transaction ingest.LedgerTransaction) error { + entry := p.ProcessLedger(lcm) opCount := len(transaction.Envelope.Operations()) entry.txSetOpCount += opCount if transaction.Result.Successful() { From 7c3842b21e6d18f64ba4bc1411cd74e6383cf11f Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Thu, 19 Oct 2023 22:00:53 -0700 Subject: [PATCH 05/14] #4909: removed panic from Value() on loaders --- .../internal/db2/history/account_loader.go | 18 ++++++++--- .../db2/history/account_loader_test.go | 24 ++++++-------- .../internal/db2/history/asset_loader.go | 18 ++++++++--- .../internal/db2/history/asset_loader_test.go | 22 ++++++------- .../db2/history/claimable_balance_loader.go | 12 ++++--- .../history/claimable_balance_loader_test.go | 18 +++++------ .../db2/history/liquidity_pool_loader.go | 18 ++++++++--- .../db2/history/liquidity_pool_loader_test.go | 22 ++++++------- ...n_participant_batch_insert_builder_test.go | 4 ++- .../internal/db2/history/participants_test.go | 4 ++- .../internal/ingest/processor_runner.go | 10 +++--- .../ingest/processors/trades_processor.go | 31 ++++++++++++++++--- .../processors/trades_processor_test.go | 3 ++ 13 files changed, 123 insertions(+), 81 deletions(-) diff --git a/services/horizon/internal/db2/history/account_loader.go b/services/horizon/internal/db2/history/account_loader.go index e9fd9bedea..afba3c01e0 100644 --- a/services/horizon/internal/db2/history/account_loader.go +++ b/services/horizon/internal/db2/history/account_loader.go @@ -28,7 +28,7 @@ const loaderLookupBatchSize = 50000 // Value implements the database/sql/driver Valuer interface. func (a FutureAccountID) Value() (driver.Value, error) { - return a.loader.GetNow(a.address), nil + return a.loader.GetNow(a.address) } // AccountLoader will map account addresses to their history @@ -71,11 +71,15 @@ func (a *AccountLoader) GetFuture(address string) FutureAccountID { // GetNow should only be called on values which were registered by // GetFuture() calls. Also, Exec() must be called before any GetNow // call can succeed. -func (a *AccountLoader) GetNow(address string) int64 { - if id, ok := a.ids[address]; !ok { - panic(fmt.Errorf("address %v not present", address)) +func (a *AccountLoader) GetNow(address string) (int64, error) { + if !a.sealed { + return 0, fmt.Errorf(`invalid account loader state, + Exec was not called yet to properly seal and resolve %v id`, address) + } + if internalID, ok := a.ids[address]; !ok { + return 0, fmt.Errorf(`account loader address %q was not found`, address) } else { - return id + return internalID, nil } } @@ -207,3 +211,7 @@ func NewAccountLoaderStub() AccountLoaderStub { func (a AccountLoaderStub) Insert(address string, id int64) { a.Loader.ids[address] = id } + +func (a AccountLoaderStub) Sealed() { + a.Loader.sealed = true +} diff --git a/services/horizon/internal/db2/history/account_loader_test.go b/services/horizon/internal/db2/history/account_loader_test.go index 11047f3be2..54d2c7a143 100644 --- a/services/horizon/internal/db2/history/account_loader_test.go +++ b/services/horizon/internal/db2/history/account_loader_test.go @@ -22,16 +22,11 @@ func TestAccountLoader(t *testing.T) { } loader := NewAccountLoader() - var futures []FutureAccountID for _, address := range addresses { future := loader.GetFuture(address) - futures = append(futures, future) - assert.Panics(t, func() { - loader.GetNow(address) - }) - assert.Panics(t, func() { - future.Value() - }) + _, err := future.Value() + assert.Error(t, err) + assert.Contains(t, err.Error(), `invalid account loader state,`) duplicateFuture := loader.GetFuture(address) assert.Equal(t, future, duplicateFuture) } @@ -42,15 +37,16 @@ func TestAccountLoader(t *testing.T) { }) q := &Q{session} - for i, address := range addresses { - future := futures[i] - id := loader.GetNow(address) - val, err := future.Value() + for _, address := range addresses { + internalId, err := loader.GetNow(address) assert.NoError(t, err) - assert.Equal(t, id, val) var account Account assert.NoError(t, q.AccountByAddress(context.Background(), &account, address)) - assert.Equal(t, account.ID, id) + assert.Equal(t, account.ID, internalId) assert.Equal(t, account.Address, address) } + + _, err := loader.GetNow("not present") + assert.Error(t, err) + assert.Contains(t, err.Error(), `was not found`) } diff --git a/services/horizon/internal/db2/history/asset_loader.go b/services/horizon/internal/db2/history/asset_loader.go index 6ef3d7a350..ca30c671e9 100644 --- a/services/horizon/internal/db2/history/asset_loader.go +++ b/services/horizon/internal/db2/history/asset_loader.go @@ -41,7 +41,7 @@ type FutureAssetID struct { // Value implements the database/sql/driver Valuer interface. func (a FutureAssetID) Value() (driver.Value, error) { - return a.loader.GetNow(a.asset), nil + return a.loader.GetNow(a.asset) } // AssetLoader will map assets to their history @@ -81,11 +81,15 @@ func (a *AssetLoader) GetFuture(asset AssetKey) FutureAssetID { // GetNow should only be called on values which were registered by // GetFuture() calls. Also, Exec() must be called before any GetNow // call can succeed. -func (a *AssetLoader) GetNow(asset AssetKey) int64 { - if id, ok := a.ids[asset]; !ok { - panic(fmt.Errorf("asset %v not present", asset)) +func (a *AssetLoader) GetNow(asset AssetKey) (int64, error) { + if !a.sealed { + return 0, fmt.Errorf(`invalid asset loader state, + Exec was not called yet to properly seal and resolve %v id`, asset) + } + if internalID, ok := a.ids[asset]; !ok { + return 0, fmt.Errorf(`asset loader id %v was not found`, asset) } else { - return id + return internalID, nil } } @@ -213,3 +217,7 @@ func NewAssetLoaderStub() AssetLoaderStub { func (a AssetLoaderStub) Insert(asset AssetKey, id int64) { a.Loader.ids[asset] = id } + +func (a AssetLoaderStub) Sealed() { + a.Loader.sealed = true +} diff --git a/services/horizon/internal/db2/history/asset_loader_test.go b/services/horizon/internal/db2/history/asset_loader_test.go index 99f510266c..65932abbd3 100644 --- a/services/horizon/internal/db2/history/asset_loader_test.go +++ b/services/horizon/internal/db2/history/asset_loader_test.go @@ -36,16 +36,11 @@ func TestAssetLoader(t *testing.T) { } loader := NewAssetLoader() - var futures []FutureAssetID for _, key := range keys { future := loader.GetFuture(key) - futures = append(futures, future) - assert.Panics(t, func() { - loader.GetNow(key) - }) - assert.Panics(t, func() { - future.Value() - }) + _, err := future.Value() + assert.Error(t, err) + assert.Contains(t, err.Error(), `invalid asset loader state,`) duplicateFuture := loader.GetFuture(key) assert.Equal(t, future, duplicateFuture) } @@ -56,12 +51,9 @@ func TestAssetLoader(t *testing.T) { }) q := &Q{session} - for i, key := range keys { - future := futures[i] - internalID := loader.GetNow(key) - val, err := future.Value() + for _, key := range keys { + internalID, err := loader.GetNow(key) assert.NoError(t, err) - assert.Equal(t, internalID, val) var assetXDR xdr.Asset if key.Type == "native" { assetXDR = xdr.MustNewNativeAsset() @@ -72,4 +64,8 @@ func TestAssetLoader(t *testing.T) { assert.NoError(t, err) assert.Equal(t, assetID, internalID) } + + _, err := loader.GetNow(AssetKey{}) + assert.Error(t, err) + assert.Contains(t, err.Error(), `was not found`) } diff --git a/services/horizon/internal/db2/history/claimable_balance_loader.go b/services/horizon/internal/db2/history/claimable_balance_loader.go index a077eb683e..dd7dee4ea5 100644 --- a/services/horizon/internal/db2/history/claimable_balance_loader.go +++ b/services/horizon/internal/db2/history/claimable_balance_loader.go @@ -23,7 +23,7 @@ type FutureClaimableBalanceID struct { // Value implements the database/sql/driver Valuer interface. func (a FutureClaimableBalanceID) Value() (driver.Value, error) { - return a.loader.getNow(a.id), nil + return a.loader.getNow(a.id) } // ClaimableBalanceLoader will map claimable balance ids to their internal @@ -64,11 +64,15 @@ func (a *ClaimableBalanceLoader) GetFuture(id string) FutureClaimableBalanceID { // getNow should only be called on values which were registered by // GetFuture() calls. Also, Exec() must be called before any getNow // call can succeed. -func (a *ClaimableBalanceLoader) getNow(id string) int64 { +func (a *ClaimableBalanceLoader) getNow(id string) (int64, error) { + if !a.sealed { + return 0, fmt.Errorf(`invalid claimable balance loader state, + Exec was not called yet to properly seal and resolve %v id`, id) + } if internalID, ok := a.ids[id]; !ok { - panic(fmt.Errorf("id %v not present", id)) + return 0, fmt.Errorf(`claimable balance loader id %q was not found`, id) } else { - return internalID + return internalID, nil } } diff --git a/services/horizon/internal/db2/history/claimable_balance_loader_test.go b/services/horizon/internal/db2/history/claimable_balance_loader_test.go index b119daa674..4dd7324521 100644 --- a/services/horizon/internal/db2/history/claimable_balance_loader_test.go +++ b/services/horizon/internal/db2/history/claimable_balance_loader_test.go @@ -32,12 +32,9 @@ func TestClaimableBalanceLoader(t *testing.T) { for _, id := range ids { future := loader.GetFuture(id) futures = append(futures, future) - assert.Panics(t, func() { - loader.getNow(id) - }) - assert.Panics(t, func() { - future.Value() - }) + _, err := future.Value() + assert.Error(t, err) + assert.Contains(t, err.Error(), `invalid claimable balance loader state,`) duplicateFuture := loader.GetFuture(id) assert.Equal(t, future, duplicateFuture) } @@ -50,13 +47,16 @@ func TestClaimableBalanceLoader(t *testing.T) { q := &Q{session} for i, id := range ids { future := futures[i] - internalID := loader.getNow(id) - val, err := future.Value() + internalID, err := future.Value() assert.NoError(t, err) - assert.Equal(t, internalID, val) cb, err := q.ClaimableBalanceByID(context.Background(), id) assert.NoError(t, err) assert.Equal(t, cb.BalanceID, id) assert.Equal(t, cb.InternalID, internalID) } + + futureCb := &FutureClaimableBalanceID{id: "not-present", loader: loader} + _, err := futureCb.Value() + assert.Error(t, err) + assert.Contains(t, err.Error(), `was not found`) } diff --git a/services/horizon/internal/db2/history/liquidity_pool_loader.go b/services/horizon/internal/db2/history/liquidity_pool_loader.go index 7c2fe6fd4d..7355d10ffa 100644 --- a/services/horizon/internal/db2/history/liquidity_pool_loader.go +++ b/services/horizon/internal/db2/history/liquidity_pool_loader.go @@ -23,7 +23,7 @@ type FutureLiquidityPoolID struct { // Value implements the database/sql/driver Valuer interface. func (a FutureLiquidityPoolID) Value() (driver.Value, error) { - return a.loader.GetNow(a.id), nil + return a.loader.GetNow(a.id) } // LiquidityPoolLoader will map liquidity pools to their internal @@ -64,11 +64,15 @@ func (a *LiquidityPoolLoader) GetFuture(id string) FutureLiquidityPoolID { // GetNow should only be called on values which were registered by // GetFuture() calls. Also, Exec() must be called before any GetNow // call can succeed. -func (a *LiquidityPoolLoader) GetNow(id string) int64 { - if id, ok := a.ids[id]; !ok { - panic(fmt.Errorf("id %v not present", id)) +func (a *LiquidityPoolLoader) GetNow(id string) (int64, error) { + if !a.sealed { + return 0, fmt.Errorf(`invalid liquidity pool loader state, + Exec was not called yet to properly seal and resolve %v id`, id) + } + if internalID, ok := a.ids[id]; !ok { + return 0, fmt.Errorf(`liquidity pool loader id %q was not found`, id) } else { - return id + return internalID, nil } } @@ -158,3 +162,7 @@ func NewLiquidityPoolLoaderStub() LiquidityPoolLoaderStub { func (a LiquidityPoolLoaderStub) Insert(lp string, id int64) { a.Loader.ids[lp] = id } + +func (a LiquidityPoolLoaderStub) Sealed() { + a.Loader.sealed = true +} diff --git a/services/horizon/internal/db2/history/liquidity_pool_loader_test.go b/services/horizon/internal/db2/history/liquidity_pool_loader_test.go index e2b1e05beb..6e5b4addf7 100644 --- a/services/horizon/internal/db2/history/liquidity_pool_loader_test.go +++ b/services/horizon/internal/db2/history/liquidity_pool_loader_test.go @@ -25,16 +25,11 @@ func TestLiquidityPoolLoader(t *testing.T) { } loader := NewLiquidityPoolLoader() - var futures []FutureLiquidityPoolID for _, id := range ids { future := loader.GetFuture(id) - futures = append(futures, future) - assert.Panics(t, func() { - loader.GetNow(id) - }) - assert.Panics(t, func() { - future.Value() - }) + _, err := future.Value() + assert.Error(t, err) + assert.Contains(t, err.Error(), `invalid liquidity pool loader state,`) duplicateFuture := loader.GetFuture(id) assert.Equal(t, future, duplicateFuture) } @@ -45,15 +40,16 @@ func TestLiquidityPoolLoader(t *testing.T) { }) q := &Q{session} - for i, id := range ids { - future := futures[i] - internalID := loader.GetNow(id) - val, err := future.Value() + for _, id := range ids { + internalID, err := loader.GetNow(id) assert.NoError(t, err) - assert.Equal(t, internalID, val) lp, err := q.LiquidityPoolByID(context.Background(), id) assert.NoError(t, err) assert.Equal(t, lp.PoolID, id) assert.Equal(t, lp.InternalID, internalID) } + + _, err := loader.GetNow("not present") + assert.Error(t, err) + assert.Contains(t, err.Error(), `was not found`) } diff --git a/services/horizon/internal/db2/history/operation_participant_batch_insert_builder_test.go b/services/horizon/internal/db2/history/operation_participant_batch_insert_builder_test.go index fc2ca9c831..508bfa22cf 100644 --- a/services/horizon/internal/db2/history/operation_participant_batch_insert_builder_test.go +++ b/services/horizon/internal/db2/history/operation_participant_batch_insert_builder_test.go @@ -43,6 +43,8 @@ func TestAddOperationParticipants(t *testing.T) { op := ops[0] tt.Assert.Equal(int64(240518172673), op.OperationID) - tt.Assert.Equal(accountLoader.GetNow(address), op.AccountID) + val, err := accountLoader.GetNow(address) + tt.Assert.NoError(err) + tt.Assert.Equal(val, op.AccountID) } } diff --git a/services/horizon/internal/db2/history/participants_test.go b/services/horizon/internal/db2/history/participants_test.go index 16671098bf..07f6d59c3e 100644 --- a/services/horizon/internal/db2/history/participants_test.go +++ b/services/horizon/internal/db2/history/participants_test.go @@ -63,7 +63,9 @@ func TestTransactionParticipantsBatch(t *testing.T) { {TransactionID: 2}, } for i := range expected { - expected[i].AccountID = accountLoader.GetNow(addresses[i]) + val, err := accountLoader.GetNow(addresses[i]) + tt.Assert.NoError(err) + expected[i].AccountID = val } tt.Assert.ElementsMatch(expected, participants) } diff --git a/services/horizon/internal/ingest/processor_runner.go b/services/horizon/internal/ingest/processor_runner.go index 4e360b7769..f5be5ae6c3 100644 --- a/services/horizon/internal/ingest/processor_runner.go +++ b/services/horizon/internal/ingest/processor_runner.go @@ -140,8 +140,9 @@ func (s *ProcessorRunner) buildTransactionProcessor( accountLoader := history.NewAccountLoader() assetLoader := history.NewAssetLoader() lpLoader := history.NewLiquidityPoolLoader() + cbLoader := history.NewClaimableBalanceLoader() - lazyLoaders := []horizonLazyLoader{accountLoader, assetLoader, lpLoader} + lazyLoaders := []horizonLazyLoader{accountLoader, assetLoader, lpLoader, cbLoader} statsLedgerTransactionProcessor := &statsLedgerTransactionProcessor{ StatsLedgerTransactionProcessor: ledgerTransactionStats, @@ -158,7 +159,7 @@ func (s *ProcessorRunner) buildTransactionProcessor( processors.NewParticipantsProcessor(accountLoader, s.historyQ.NewTransactionParticipantsBatchInsertBuilder(), s.historyQ.NewOperationParticipantBatchInsertBuilder()), processors.NewTransactionProcessor(s.historyQ.NewTransactionBatchInsertBuilder()), - processors.NewClaimableBalancesTransactionProcessor(history.NewClaimableBalanceLoader(), + processors.NewClaimableBalancesTransactionProcessor(cbLoader, s.historyQ.NewTransactionClaimableBalanceBatchInsertBuilder(), s.historyQ.NewOperationClaimableBalanceBatchInsertBuilder()), processors.NewLiquidityPoolsTransactionProcessor(lpLoader, s.historyQ.NewTransactionLiquidityPoolBatchInsertBuilder(), s.historyQ.NewOperationLiquidityPoolBatchInsertBuilder())} @@ -328,7 +329,7 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerClos } // ensure capture of the ledger to history regardless of whether it has transactions. - ledgersProcessor := processors.NewLedgerProcessor(s.historyQ.NewLedgerBatchInsertBuilder(), int(ledger.LedgerHeaderHistoryEntry().Header.LedgerVersion)) + ledgersProcessor := processors.NewLedgerProcessor(s.historyQ.NewLedgerBatchInsertBuilder(), CurrentVersion) ledgersProcessor.ProcessLedger(ledger) transactionReader, err = ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(s.config.NetworkPassphrase, ledger) @@ -406,9 +407,6 @@ func (s *ProcessorRunner) RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) ( stats.transactionStats, stats.transactionDurations, stats.tradeStats, err = s.RunTransactionProcessorsOnLedger(ledger) - if err != nil { - return - } return } diff --git a/services/horizon/internal/ingest/processors/trades_processor.go b/services/horizon/internal/ingest/processors/trades_processor.go index b1084c6e08..d5ee89f51e 100644 --- a/services/horizon/internal/ingest/processors/trades_processor.go +++ b/services/horizon/internal/ingest/processors/trades_processor.go @@ -92,17 +92,38 @@ func (p *TradeProcessor) Flush(ctx context.Context, session db.SessionInterface) for _, trade := range p.trades { row := trade.row if trade.sellerAccount != "" { - row.BaseAccountID = null.IntFrom(p.accountLoader.GetNow(trade.sellerAccount)) + val, err := p.accountLoader.GetNow(trade.sellerAccount) + if err != nil { + return err + } + row.BaseAccountID = null.IntFrom(val) } if trade.buyerAccount != "" { - row.CounterAccountID = null.IntFrom(p.accountLoader.GetNow(trade.buyerAccount)) + val, err := p.accountLoader.GetNow(trade.buyerAccount) + if err != nil { + return err + } + row.CounterAccountID = null.IntFrom(val) } if trade.liquidityPoolID != "" { - row.BaseLiquidityPoolID = null.IntFrom(p.lpLoader.GetNow(trade.liquidityPoolID)) + val, err := p.lpLoader.GetNow(trade.liquidityPoolID) + if err != nil { + return err + } + row.BaseLiquidityPoolID = null.IntFrom(val) + } + + val, err := p.assetLoader.GetNow(history.AssetKeyFromXDR(trade.soldAsset)) + if err != nil { + return err } + row.BaseAssetID = val - row.BaseAssetID = p.assetLoader.GetNow(history.AssetKeyFromXDR(trade.soldAsset)) - row.CounterAssetID = p.assetLoader.GetNow(history.AssetKeyFromXDR(trade.boughtAsset)) + val, err = p.assetLoader.GetNow(history.AssetKeyFromXDR(trade.boughtAsset)) + if err != nil { + return err + } + row.CounterAssetID = val if row.BaseAssetID > row.CounterAssetID { row.BaseIsSeller = false diff --git a/services/horizon/internal/ingest/processors/trades_processor_test.go b/services/horizon/internal/ingest/processors/trades_processor_test.go index 5b2a2f20e3..864985f367 100644 --- a/services/horizon/internal/ingest/processors/trades_processor_test.go +++ b/services/horizon/internal/ingest/processors/trades_processor_test.go @@ -727,12 +727,15 @@ func (s *TradeProcessorTestSuiteLedger) mockReadTradeTransactions() []history.In } func (s *TradeProcessorTestSuiteLedger) stubLoaders() { + s.accountLoader.Sealed() for key, id := range s.unmuxedAccountToID { s.accountLoader.Insert(key, id) } + s.assetLoader.Sealed() for key, id := range s.assetToID { s.assetLoader.Insert(key, id.ID) } + s.lpLoader.Sealed() for key, id := range s.lpToID { s.lpLoader.Insert(PoolIDToString(key), id) } From bf0f64cfce60c8f63ed449af460499d9618d70b9 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Fri, 20 Oct 2023 13:32:33 -0700 Subject: [PATCH 06/14] #4909: fix asset loader on null terminated codes --- .../internal/db2/history/asset_loader.go | 3 ++- .../internal/db2/history/asset_loader_test.go | 23 +++++++++++++------ 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/services/horizon/internal/db2/history/asset_loader.go b/services/horizon/internal/db2/history/asset_loader.go index ca30c671e9..ec29e5560b 100644 --- a/services/horizon/internal/db2/history/asset_loader.go +++ b/services/horizon/internal/db2/history/asset_loader.go @@ -5,6 +5,7 @@ import ( "database/sql/driver" "fmt" "sort" + "strings" sq "github.com/Masterminds/squirrel" @@ -25,7 +26,7 @@ type AssetKey struct { func AssetKeyFromXDR(asset xdr.Asset) AssetKey { return AssetKey{ Type: xdr.AssetTypeToString[asset.Type], - Code: asset.GetCode(), + Code: strings.TrimRight(asset.GetCode(), "\x00"), Issuer: asset.GetIssuer(), } } diff --git a/services/horizon/internal/db2/history/asset_loader_test.go b/services/horizon/internal/db2/history/asset_loader_test.go index 65932abbd3..f28ce6c022 100644 --- a/services/horizon/internal/db2/history/asset_loader_test.go +++ b/services/horizon/internal/db2/history/asset_loader_test.go @@ -22,15 +22,24 @@ func TestAssetLoader(t *testing.T) { for i := 0; i < 100; i++ { var key AssetKey if i == 0 { - key.Type = "native" + key = AssetKeyFromXDR(xdr.Asset{Type: xdr.AssetTypeAssetTypeNative}) } else if i%2 == 0 { - key.Type = "credit_alphanum4" - key.Code = fmt.Sprintf("ab%d", i) - key.Issuer = keypair.MustRandom().Address() + code := [4]byte{0,0,0,0} + copy(code[:], fmt.Sprintf("ab%d", i)) + key = AssetKeyFromXDR(xdr.Asset{ + Type: xdr.AssetTypeAssetTypeCreditAlphanum4, + AlphaNum4: &xdr.AlphaNum4{ + AssetCode: code, + Issuer: xdr.MustAddress(keypair.MustRandom().Address())}}) } else { - key.Type = "credit_alphanum12" - key.Code = fmt.Sprintf("abcdef%d", i) - key.Issuer = keypair.MustRandom().Address() + code := [12]byte{0,0,0,0,0,0,0,0,0,0,0,0} + copy(code[:], fmt.Sprintf("abcdef%d", i)) + key = AssetKeyFromXDR(xdr.Asset{ + Type: xdr.AssetTypeAssetTypeCreditAlphanum12, + AlphaNum12: &xdr.AlphaNum12{ + AssetCode: code, + Issuer: xdr.MustAddress(keypair.MustRandom().Address())}}) + } keys = append(keys, key) } From df07d8b9fddc835ec28acf1dd008249bc8750f9f Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Fri, 20 Oct 2023 14:26:30 -0700 Subject: [PATCH 07/14] #4909: removed out of sequence order integration test, no longer supported --- .../internal/db2/history/asset_loader_test.go | 4 +- .../transaction_preconditions_test.go | 71 ------------------- 2 files changed, 2 insertions(+), 73 deletions(-) diff --git a/services/horizon/internal/db2/history/asset_loader_test.go b/services/horizon/internal/db2/history/asset_loader_test.go index f28ce6c022..c7458c7789 100644 --- a/services/horizon/internal/db2/history/asset_loader_test.go +++ b/services/horizon/internal/db2/history/asset_loader_test.go @@ -24,7 +24,7 @@ func TestAssetLoader(t *testing.T) { if i == 0 { key = AssetKeyFromXDR(xdr.Asset{Type: xdr.AssetTypeAssetTypeNative}) } else if i%2 == 0 { - code := [4]byte{0,0,0,0} + code := [4]byte{0, 0, 0, 0} copy(code[:], fmt.Sprintf("ab%d", i)) key = AssetKeyFromXDR(xdr.Asset{ Type: xdr.AssetTypeAssetTypeCreditAlphanum4, @@ -32,7 +32,7 @@ func TestAssetLoader(t *testing.T) { AssetCode: code, Issuer: xdr.MustAddress(keypair.MustRandom().Address())}}) } else { - code := [12]byte{0,0,0,0,0,0,0,0,0,0,0,0} + code := [12]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} copy(code[:], fmt.Sprintf("abcdef%d", i)) key = AssetKeyFromXDR(xdr.Asset{ Type: xdr.AssetTypeAssetTypeCreditAlphanum12, diff --git a/services/horizon/internal/integration/transaction_preconditions_test.go b/services/horizon/internal/integration/transaction_preconditions_test.go index 44a6baac82..94c0a09c35 100644 --- a/services/horizon/internal/integration/transaction_preconditions_test.go +++ b/services/horizon/internal/integration/transaction_preconditions_test.go @@ -5,89 +5,18 @@ import ( "encoding/base64" "math" "strconv" - "sync" "testing" "time" sdk "github.com/stellar/go/clients/horizonclient" "github.com/stellar/go/keypair" "github.com/stellar/go/network" - "github.com/stellar/go/protocols/horizon" "github.com/stellar/go/services/horizon/internal/test/integration" "github.com/stellar/go/txnbuild" "github.com/stellar/go/xdr" "github.com/stretchr/testify/assert" ) -func TestTransactionPreconditionsMinSeq(t *testing.T) { - tt := assert.New(t) - itest := integration.NewTest(t, integration.Config{}) - if itest.GetEffectiveProtocolVersion() < 19 { - t.Skip("Can't run with protocol < 19") - } - master := itest.Master() - masterAccount := itest.MasterAccount() - currentAccountSeq, err := masterAccount.GetSequenceNumber() - tt.NoError(err) - - // Ensure that the minSequence of the transaction is enough - // but the sequence isn't - txParams := buildTXParams(master, masterAccount, currentAccountSeq+100) - - // this errors because the tx.seqNum is more than +1 from sourceAccoubnt.seqNum - _, err = itest.SubmitTransaction(master, txParams) - tt.Error(err) - - // Now the transaction should be submitted without problems - txParams.Preconditions.MinSequenceNumber = ¤tAccountSeq - tx := itest.MustSubmitTransaction(master, txParams) - - txHistory, err := itest.Client().TransactionDetail(tx.Hash) - assert.NoError(t, err) - assert.Equal(t, txHistory.Preconditions.MinAccountSequence, strconv.FormatInt(*txParams.Preconditions.MinSequenceNumber, 10)) - - // Test the transaction submission queue by sending transactions out of order - // and making sure they are all executed properly - masterAccount = itest.MasterAccount() - currentAccountSeq, err = masterAccount.GetSequenceNumber() - tt.NoError(err) - - seqs := []struct { - minSeq int64 - seq int64 - }{ - {0, currentAccountSeq + 9}, // sent first, executed second - {0, currentAccountSeq + 10}, // sent second, executed third - {currentAccountSeq, currentAccountSeq + 8}, // sent third, executed first - } - - // Send the transactions in parallel since otherwise they are admitted sequentially - var results []horizon.Transaction - var resultsMx sync.Mutex - var wg sync.WaitGroup - wg.Add(len(seqs)) - for _, s := range seqs { - sLocal := s - go func() { - params := buildTXParams(master, masterAccount, sLocal.seq) - if sLocal.minSeq > 0 { - params.Preconditions.MinSequenceNumber = &sLocal.minSeq - } - result := itest.MustSubmitTransaction(master, params) - resultsMx.Lock() - results = append(results, result) - resultsMx.Unlock() - wg.Done() - }() - // Space out requests to ensure the queue receives the transactions - // in the planned order - time.Sleep(time.Millisecond * 50) - } - wg.Wait() - - tt.Len(results, len(seqs)) -} - func TestTransactionPreconditionsTimeBounds(t *testing.T) { tt := assert.New(t) itest := integration.NewTest(t, integration.Config{}) From 9a1010b3ca815ef9fae36e63e5fb6c3530313903 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Sun, 22 Oct 2023 22:56:15 -0700 Subject: [PATCH 08/14] #4909: fixed integration test to not submit multiple tx per account per ledger --- services/horizon/internal/integration/txsub_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/services/horizon/internal/integration/txsub_test.go b/services/horizon/internal/integration/txsub_test.go index 60b8717b18..069aa8be1b 100644 --- a/services/horizon/internal/integration/txsub_test.go +++ b/services/horizon/internal/integration/txsub_test.go @@ -14,11 +14,11 @@ func TestTxsub(t *testing.T) { itest := integration.NewTest(t, integration.Config{}) master := itest.Master() - // Sanity check: create 20 accounts and submit 2 txs from each of them as - // a source at the same time. Then check if the results are correct. t.Run("Sanity", func(t *testing.T) { + // simplify this to one tx per account, to align with core capabilities of one + // tx per account per ledger. testAccounts := 20 - subsPerAccont := 2 + subsPerAccont := 1 keys, accounts := itest.CreateAccounts(testAccounts, "1000") var wg sync.WaitGroup From 80cfc01e752bb06d8a2990248d490737811028c4 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Mon, 23 Oct 2023 21:11:05 -0700 Subject: [PATCH 09/14] #4909: fixed bulk asset insertion order id's after sorting by asset texts, trade processor depends on ordered id's for buyer/seller delineation --- .../internal/db2/history/asset_loader.go | 23 ++++++++----------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/services/horizon/internal/db2/history/asset_loader.go b/services/horizon/internal/db2/history/asset_loader.go index ec29e5560b..e54319af12 100644 --- a/services/horizon/internal/db2/history/asset_loader.go +++ b/services/horizon/internal/db2/history/asset_loader.go @@ -22,6 +22,10 @@ type AssetKey struct { Issuer string } +func (key AssetKey) String() string { + return key.Type + "/" + key.Code + "/" + key.Issuer +} + // AssetKeyFromXDR constructs an AssetKey from an xdr asset func AssetKeyFromXDR(asset xdr.Asset) AssetKey { return AssetKey{ @@ -142,6 +146,11 @@ func (a *AssetLoader) Exec(ctx context.Context, session db.SessionInterface) err assetTypes := make([]string, 0, len(a.set)-len(a.ids)) assetCodes := make([]string, 0, len(a.set)-len(a.ids)) assetIssuers := make([]string, 0, len(a.set)-len(a.ids)) + // sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock + // https://github.com/stellar/go/issues/2370 + sort.Slice(keys, func(i, j int) bool { + return keys[i].String() < keys[j].String() + }) insert := 0 for _, key := range keys { if _, ok := a.ids[key]; ok { @@ -157,20 +166,6 @@ func (a *AssetLoader) Exec(ctx context.Context, session db.SessionInterface) err return nil } keys = keys[:insert] - // sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock - // https://github.com/stellar/go/issues/2370 - sort.Slice(keys, func(i, j int) bool { - if keys[i].Type < keys[j].Type { - return true - } - if keys[i].Code < keys[j].Code { - return true - } - if keys[i].Issuer < keys[j].Issuer { - return true - } - return false - }) err := bulkInsert( ctx, From 208f48ce370e23ab6ac77b6f31335efe6122732a Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Tue, 24 Oct 2023 11:40:42 -0700 Subject: [PATCH 10/14] #4909: follow xdr Asset for String serialization of loader AssetKey --- .../internal/db2/history/asset_loader.go | 3 +++ .../internal/db2/history/asset_loader_test.go | 22 +++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/services/horizon/internal/db2/history/asset_loader.go b/services/horizon/internal/db2/history/asset_loader.go index e54319af12..39703d530f 100644 --- a/services/horizon/internal/db2/history/asset_loader.go +++ b/services/horizon/internal/db2/history/asset_loader.go @@ -23,6 +23,9 @@ type AssetKey struct { } func (key AssetKey) String() string { + if key.Type == xdr.AssetTypeToString[xdr.AssetTypeAssetTypeNative] { + return key.Type + } return key.Type + "/" + key.Code + "/" + key.Issuer } diff --git a/services/horizon/internal/db2/history/asset_loader_test.go b/services/horizon/internal/db2/history/asset_loader_test.go index c7458c7789..d67163d764 100644 --- a/services/horizon/internal/db2/history/asset_loader_test.go +++ b/services/horizon/internal/db2/history/asset_loader_test.go @@ -12,6 +12,28 @@ import ( "github.com/stellar/go/xdr" ) +func TestAssetKeyToString(t *testing.T) { + num4key := AssetKey{ + Type: "credit_alphanum4", + Code: "USD", + Issuer: "A1B2C3", + } + + num12key := AssetKey{ + Type: "credit_alphanum12", + Code: "USDABC", + Issuer: "A1B2C3", + } + + nativekey := AssetKey{ + Type: "native", + } + + assert.Equal(t, num4key.String(), "credit_alphanum4/USD/A1B2C3") + assert.Equal(t, num12key.String(), "credit_alphanum12/USDABC/A1B2C3") + assert.Equal(t, nativekey.String(), "native") +} + func TestAssetLoader(t *testing.T) { tt := test.Start(t) defer tt.Finish() From 9a4d0eb69840cadc623df0bf81e04735ee3db0e7 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Tue, 24 Oct 2023 15:06:31 -0700 Subject: [PATCH 11/14] #4909: resolved verify-range job not running --- services/horizon/docker/verify-range/Dockerfile | 4 +--- services/horizon/docker/verify-range/dependencies | 9 ++++++--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/services/horizon/docker/verify-range/Dockerfile b/services/horizon/docker/verify-range/Dockerfile index 499f86881e..60fc6df745 100644 --- a/services/horizon/docker/verify-range/Dockerfile +++ b/services/horizon/docker/verify-range/Dockerfile @@ -1,14 +1,12 @@ FROM ubuntu:20.04 -MAINTAINER Bartek Nowotarski - ARG STELLAR_CORE_VERSION ENV STELLAR_CORE_VERSION=${STELLAR_CORE_VERSION:-*} # to remove tzdata interactive flow ENV DEBIAN_FRONTEND=noninteractive ADD dependencies / -RUN ["chmod", "+x", "dependencies"] +RUN ["chmod", "+x", "/dependencies"] RUN /dependencies ADD stellar-core.cfg / diff --git a/services/horizon/docker/verify-range/dependencies b/services/horizon/docker/verify-range/dependencies index fa622f9d2e..1937342990 100644 --- a/services/horizon/docker/verify-range/dependencies +++ b/services/horizon/docker/verify-range/dependencies @@ -11,8 +11,8 @@ echo "deb https://apt.stellar.org $(lsb_release -cs) stable" | sudo tee -a /etc/ apt-get update apt-get install -y stellar-core=${STELLAR_CORE_VERSION} -wget -q https://dl.google.com/go/go1.18.linux-amd64.tar.gz -tar -C /usr/local -xzf go1.18.linux-amd64.tar.gz +wget -q https://dl.google.com/go/go1.20.linux-amd64.tar.gz +tar -C /usr/local -xzf go1.20.linux-amd64.tar.gz git clone https://github.com/stellar/go.git stellar-go cd stellar-go @@ -20,4 +20,7 @@ cd stellar-go # Below ensures we also fetch PR refs git config --add remote.origin.fetch "+refs/pull/*/head:refs/remotes/origin/pull/*" git fetch --force --quiet origin -/usr/local/go/bin/go build -v ./services/horizon +/usr/local/go/bin/go build -v ./services/horizon/. + + + From dd30b6a13b63ddc92bcb4d98358b7e088868f782 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Wed, 25 Oct 2023 10:56:16 -0700 Subject: [PATCH 12/14] #4909: review feedback, cleanup --- services/horizon/docker/verify-range/dependencies | 5 +---- .../horizon/internal/db2/history/fee_bump_scenario.go | 2 -- .../internal/ingest/processors/ledgers_processor.go | 10 +++++----- support/db/internal_test.go | 2 +- 4 files changed, 7 insertions(+), 12 deletions(-) diff --git a/services/horizon/docker/verify-range/dependencies b/services/horizon/docker/verify-range/dependencies index 1937342990..e17c6f4b5f 100644 --- a/services/horizon/docker/verify-range/dependencies +++ b/services/horizon/docker/verify-range/dependencies @@ -20,7 +20,4 @@ cd stellar-go # Below ensures we also fetch PR refs git config --add remote.origin.fetch "+refs/pull/*/head:refs/remotes/origin/pull/*" git fetch --force --quiet origin -/usr/local/go/bin/go build -v ./services/horizon/. - - - +/usr/local/go/bin/go build -v ./services/horizon/. \ No newline at end of file diff --git a/services/horizon/internal/db2/history/fee_bump_scenario.go b/services/horizon/internal/db2/history/fee_bump_scenario.go index be7a21ef14..75dcc20d61 100644 --- a/services/horizon/internal/db2/history/fee_bump_scenario.go +++ b/services/horizon/internal/db2/history/fee_bump_scenario.go @@ -4,7 +4,6 @@ import ( "context" "encoding/hex" "encoding/json" - "fmt" "testing" "time" @@ -271,7 +270,6 @@ func FeeBumpScenario(tt *test.T, q *Q, successful bool) FeeBumpFixture { "bump_to": "98", }) - fmt.Print(string(details)) tt.Assert.NoError(err) tt.Assert.NoError(opBuilder.Add( diff --git a/services/horizon/internal/ingest/processors/ledgers_processor.go b/services/horizon/internal/ingest/processors/ledgers_processor.go index 085d97b9a5..942a5f8522 100644 --- a/services/horizon/internal/ingest/processors/ledgers_processor.go +++ b/services/horizon/internal/ingest/processors/ledgers_processor.go @@ -10,7 +10,7 @@ import ( "github.com/stellar/go/xdr" ) -type LedgerInfo struct { +type ledgerInfo struct { header xdr.LedgerHeaderHistoryEntry successTxCount int failedTxCount int @@ -20,23 +20,23 @@ type LedgerInfo struct { type LedgersProcessor struct { batch history.LedgerBatchInsertBuilder - ledgers map[uint32]*LedgerInfo + ledgers map[uint32]*ledgerInfo ingestVersion int } func NewLedgerProcessor(batch history.LedgerBatchInsertBuilder, ingestVersion int) *LedgersProcessor { return &LedgersProcessor{ batch: batch, - ledgers: map[uint32]*LedgerInfo{}, + ledgers: map[uint32]*ledgerInfo{}, ingestVersion: ingestVersion, } } -func (p *LedgersProcessor) ProcessLedger(lcm xdr.LedgerCloseMeta) *LedgerInfo { +func (p *LedgersProcessor) ProcessLedger(lcm xdr.LedgerCloseMeta) *ledgerInfo { sequence := lcm.LedgerSequence() entry, ok := p.ledgers[sequence] if !ok { - entry = &LedgerInfo{header: lcm.LedgerHeaderHistoryEntry()} + entry = &ledgerInfo{header: lcm.LedgerHeaderHistoryEntry()} p.ledgers[sequence] = entry } return entry diff --git a/support/db/internal_test.go b/support/db/internal_test.go index 8884bb62c6..3e1a06dabc 100644 --- a/support/db/internal_test.go +++ b/support/db/internal_test.go @@ -7,7 +7,7 @@ const testSchema = ` CREATE TABLE IF NOT EXISTS people ( name character varying NOT NULL, hunger_level integer NOT NULL, - json_value jsonb, + json_value jsonb, PRIMARY KEY (name) ); DELETE FROM people; From 625ce1f5f983a9cebb22e7f3bd9d9810c6a3a4fe Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Wed, 25 Oct 2023 21:32:40 -0700 Subject: [PATCH 13/14] #4909: removed stub loader Sealed() method for test purposes --- services/horizon/internal/db2/history/account_loader.go | 5 +---- services/horizon/internal/db2/history/asset_loader.go | 5 +---- .../horizon/internal/db2/history/liquidity_pool_loader.go | 5 +---- .../internal/ingest/processors/trades_processor_test.go | 3 --- 4 files changed, 3 insertions(+), 15 deletions(-) diff --git a/services/horizon/internal/db2/history/account_loader.go b/services/horizon/internal/db2/history/account_loader.go index afba3c01e0..f3946b0448 100644 --- a/services/horizon/internal/db2/history/account_loader.go +++ b/services/horizon/internal/db2/history/account_loader.go @@ -209,9 +209,6 @@ func NewAccountLoaderStub() AccountLoaderStub { // Insert updates the wrapped AccountLoader so that the given account // address is mapped to the provided history account id func (a AccountLoaderStub) Insert(address string, id int64) { - a.Loader.ids[address] = id -} - -func (a AccountLoaderStub) Sealed() { a.Loader.sealed = true + a.Loader.ids[address] = id } diff --git a/services/horizon/internal/db2/history/asset_loader.go b/services/horizon/internal/db2/history/asset_loader.go index 39703d530f..b5ee9a8326 100644 --- a/services/horizon/internal/db2/history/asset_loader.go +++ b/services/horizon/internal/db2/history/asset_loader.go @@ -214,9 +214,6 @@ func NewAssetLoaderStub() AssetLoaderStub { // Insert updates the wrapped AssetLoaderStub so that the given asset // address is mapped to the provided history asset id func (a AssetLoaderStub) Insert(asset AssetKey, id int64) { - a.Loader.ids[asset] = id -} - -func (a AssetLoaderStub) Sealed() { a.Loader.sealed = true + a.Loader.ids[asset] = id } diff --git a/services/horizon/internal/db2/history/liquidity_pool_loader.go b/services/horizon/internal/db2/history/liquidity_pool_loader.go index 7355d10ffa..cf89ae67b4 100644 --- a/services/horizon/internal/db2/history/liquidity_pool_loader.go +++ b/services/horizon/internal/db2/history/liquidity_pool_loader.go @@ -160,9 +160,6 @@ func NewLiquidityPoolLoaderStub() LiquidityPoolLoaderStub { // Insert updates the wrapped LiquidityPoolLoader so that the given liquidity pool // is mapped to the provided history liquidity pool id func (a LiquidityPoolLoaderStub) Insert(lp string, id int64) { - a.Loader.ids[lp] = id -} - -func (a LiquidityPoolLoaderStub) Sealed() { a.Loader.sealed = true + a.Loader.ids[lp] = id } diff --git a/services/horizon/internal/ingest/processors/trades_processor_test.go b/services/horizon/internal/ingest/processors/trades_processor_test.go index 864985f367..5b2a2f20e3 100644 --- a/services/horizon/internal/ingest/processors/trades_processor_test.go +++ b/services/horizon/internal/ingest/processors/trades_processor_test.go @@ -727,15 +727,12 @@ func (s *TradeProcessorTestSuiteLedger) mockReadTradeTransactions() []history.In } func (s *TradeProcessorTestSuiteLedger) stubLoaders() { - s.accountLoader.Sealed() for key, id := range s.unmuxedAccountToID { s.accountLoader.Insert(key, id) } - s.assetLoader.Sealed() for key, id := range s.assetToID { s.assetLoader.Insert(key, id.ID) } - s.lpLoader.Sealed() for key, id := range s.lpToID { s.lpLoader.Insert(PoolIDToString(key), id) } From d7cfc1cf06078179d9a158c1ea078fa4a3f28231 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Thu, 26 Oct 2023 09:48:29 -0700 Subject: [PATCH 14/14] #4909: fixed removed unused method parameter --- services/horizon/internal/ingest/processor_runner.go | 3 +-- services/horizon/internal/ingest/processor_runner_test.go | 8 ++------ 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/services/horizon/internal/ingest/processor_runner.go b/services/horizon/internal/ingest/processor_runner.go index f5be5ae6c3..f66b10c1e3 100644 --- a/services/horizon/internal/ingest/processor_runner.go +++ b/services/horizon/internal/ingest/processor_runner.go @@ -135,7 +135,6 @@ func (s *ProcessorRunner) buildTransactionProcessor( ledgerTransactionStats *processors.StatsLedgerTransactionProcessor, tradeProcessor *processors.TradeProcessor, ledgersProcessor *processors.LedgersProcessor, - ledger xdr.LedgerCloseMeta, ) *groupTransactionProcessors { accountLoader := history.NewAccountLoader() assetLoader := history.NewAssetLoader() @@ -341,7 +340,7 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerClos groupTransactionFilterers := s.buildTransactionFilterer() groupFilteredOutProcessors := s.buildFilteredOutProcessor() groupTransactionProcessors := s.buildTransactionProcessor( - &ledgerTransactionStats, &tradeProcessor, ledgersProcessor, ledger) + &ledgerTransactionStats, &tradeProcessor, ledgersProcessor) err = processors.StreamLedgerTransactions(s.ctx, groupTransactionFilterers, groupFilteredOutProcessors, diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index 689f08a285..46796807fc 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -268,14 +268,10 @@ func TestProcessorRunnerBuildTransactionProcessor(t *testing.T) { stats := &processors.StatsLedgerTransactionProcessor{} trades := &processors.TradeProcessor{} - ledger := xdr.LedgerCloseMeta{ - V0: &xdr.LedgerCloseMetaV0{ - LedgerHeader: xdr.LedgerHeaderHistoryEntry{}, - }, - } + ledgersProcessor := &processors.LedgersProcessor{} - processor := runner.buildTransactionProcessor(stats, trades, ledgersProcessor, ledger) + processor := runner.buildTransactionProcessor(stats, trades, ledgersProcessor) assert.IsType(t, &groupTransactionProcessors{}, processor) assert.IsType(t, &statsLedgerTransactionProcessor{}, processor.processors[0]) assert.IsType(t, &processors.EffectProcessor{}, processor.processors[1])