diff --git a/services/horizon/internal/db2/history/ingestion.go b/services/horizon/internal/db2/history/ingestion.go index d82188d83e..b35b05ece2 100644 --- a/services/horizon/internal/db2/history/ingestion.go +++ b/services/horizon/internal/db2/history/ingestion.go @@ -17,6 +17,8 @@ func (q *Q) TruncateIngestStateTables(ctx context.Context) error { "claimable_balances", "claimable_balance_claimants", "exp_asset_stats", + "contract_asset_balances", + "contract_asset_stats", "liquidity_pools", "offers", "trust_lines", diff --git a/services/horizon/internal/ingest/verify_test.go b/services/horizon/internal/ingest/verify_test.go index 901f21a0ca..f32bccdc8f 100644 --- a/services/horizon/internal/ingest/verify_test.go +++ b/services/horizon/internal/ingest/verify_test.go @@ -283,6 +283,39 @@ func ttlForContractData(tt *test.T, gen randxdr.Generator, contractData xdr.Ledg return ttl } +func TestTruncateIngestStateTables(t *testing.T) { + tt := test.Start(t) + defer tt.Finish() + test.ResetHorizonDB(t, tt.HorizonDB) + q := &history.Q{&db.Session{DB: tt.HorizonDB}} + + ledgerEntries := generateRandomLedgerEntries(tt) + // insert ledger entries of all types into the DB + tt.Assert.NoError(q.BeginTx(tt.Ctx, &sql.TxOptions{})) + checkpointLedger := uint32(63) + changeProcessor := buildChangeProcessor(q, &ingest.StatsChangeProcessor{}, historyArchiveSource, checkpointLedger, "") + for _, change := range ingest.GetChangesFromLedgerEntryChanges(ledgerEntries) { + tt.Assert.NoError(changeProcessor.ProcessChange(tt.Ctx, change)) + } + tt.Assert.NoError(changeProcessor.Commit(tt.Ctx)) + tt.Assert.NoError(q.Commit()) + + // clear out the state tables + q.TruncateIngestStateTables(tt.Ctx) + + // reinsert the same ledger entries from before + tt.Assert.NoError(q.BeginTx(tt.Ctx, &sql.TxOptions{})) + changeProcessor = buildChangeProcessor(q, &ingest.StatsChangeProcessor{}, historyArchiveSource, checkpointLedger, "") + for _, change := range ingest.GetChangesFromLedgerEntryChanges(ledgerEntries) { + tt.Assert.NoError(changeProcessor.ProcessChange(tt.Ctx, change)) + } + // this should succeed if we cleared out the state tables properly + // otherwise, there will be a duplicate key error when we attempt to + // insert a row that is already present + tt.Assert.NoError(changeProcessor.Commit(tt.Ctx)) + tt.Assert.NoError(q.Commit()) +} + func TestStateVerifierLockBusy(t *testing.T) { tt := test.Start(t) defer tt.Finish() @@ -292,21 +325,9 @@ func TestStateVerifierLockBusy(t *testing.T) { tt.Assert.NoError(q.BeginTx(tt.Ctx, &sql.TxOptions{})) checkpointLedger := uint32(63) - changeProcessor := buildChangeProcessor(q, &ingest.StatsChangeProcessor{}, ledgerSource, checkpointLedger, "") + changeProcessor := buildChangeProcessor(q, &ingest.StatsChangeProcessor{}, historyArchiveSource, checkpointLedger, "") - gen := randxdr.NewGenerator() - var changes []xdr.LedgerEntryChange - for i := 0; i < 10; i++ { - changes = append(changes, - genLiquidityPool(tt, gen), - genClaimableBalance(tt, gen), - genOffer(tt, gen), - genTrustLine(tt, gen), - genAccount(tt, gen), - genAccountData(tt, gen), - ) - } - for _, change := range ingest.GetChangesFromLedgerEntryChanges(changes) { + for _, change := range ingest.GetChangesFromLedgerEntryChanges(generateRandomLedgerEntries(tt)) { tt.Assert.NoError(changeProcessor.ProcessChange(tt.Ctx, change)) } tt.Assert.NoError(changeProcessor.Commit(tt.Ctx)) @@ -350,34 +371,14 @@ func TestStateVerifier(t *testing.T) { ledger := rand.Int31() checkpointLedger := uint32(ledger - (ledger % 64) - 1) - changeProcessor := buildChangeProcessor(q, &ingest.StatsChangeProcessor{}, ledgerSource, checkpointLedger, "") + changeProcessor := buildChangeProcessor(q, &ingest.StatsChangeProcessor{}, historyArchiveSource, checkpointLedger, "") mockChangeReader := &ingest.MockChangeReader{} - gen := randxdr.NewGenerator() - var changes []xdr.LedgerEntryChange - for i := 0; i < 100; i++ { - changes = append(changes, - genLiquidityPool(tt, gen), - genClaimableBalance(tt, gen), - genOffer(tt, gen), - genTrustLine(tt, gen), - genAccount(tt, gen), - genAccountData(tt, gen), - genContractCode(tt, gen), - genConfigSetting(tt, gen), - genTTL(tt, gen), - ) - changes = append(changes, genAssetContractMetadata(tt, gen)...) - } - - coverage := map[xdr.LedgerEntryType]int{} - for _, change := range ingest.GetChangesFromLedgerEntryChanges(changes) { + for _, change := range ingest.GetChangesFromLedgerEntryChanges(generateRandomLedgerEntries(tt)) { mockChangeReader.On("Read").Return(change, nil).Once() tt.Assert.NoError(changeProcessor.ProcessChange(tt.Ctx, change)) - coverage[change.Type]++ } tt.Assert.NoError(changeProcessor.Commit(tt.Ctx)) - tt.Assert.Equal(len(xdr.LedgerEntryTypeMap), len(coverage)) tt.Assert.NoError(q.Commit()) @@ -402,3 +403,31 @@ func TestStateVerifier(t *testing.T) { mockChangeReader.AssertExpectations(t) mockHistoryAdapter.AssertExpectations(t) } + +func generateRandomLedgerEntries(tt *test.T) []xdr.LedgerEntryChange { + gen := randxdr.NewGenerator() + + var changes []xdr.LedgerEntryChange + for i := 0; i < 100; i++ { + changes = append(changes, + genLiquidityPool(tt, gen), + genClaimableBalance(tt, gen), + genOffer(tt, gen), + genTrustLine(tt, gen), + genAccount(tt, gen), + genAccountData(tt, gen), + genContractCode(tt, gen), + genConfigSetting(tt, gen), + genTTL(tt, gen), + ) + changes = append(changes, genAssetContractMetadata(tt, gen)...) + } + + coverage := map[xdr.LedgerEntryType]int{} + for _, change := range changes { + coverage[change.Created.Data.Type]++ + } + tt.Assert.Equal(len(xdr.LedgerEntryTypeMap), len(coverage)) + + return changes +}