diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index 35f32f5434..47e8952b07 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -282,7 +282,7 @@ type IngestionQ interface { NewTradeBatchInsertBuilder() TradeBatchInsertBuilder RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis, roundingSlippageFilter int) error RebuildTradeAggregationBuckets(ctx context.Context, fromLedger, toLedger uint32, roundingSlippageFilter int) error - ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]int64, map[string]int64, error) + ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]LookupTableReapResult, error) CreateAssets(ctx context.Context, assets []xdr.Asset, batchSize int) (map[string]Asset, error) QTransactions QTrustLines @@ -971,27 +971,27 @@ type tableObjectFieldPair struct { objectField string } +type LookupTableReapResult struct { + Offset int64 + RowsDeleted int64 + Duration time.Duration +} + // ReapLookupTables removes rows from lookup tables like history_claimable_balances // which aren't used (orphaned), i.e. history entries for them were reaped. // This method must be executed inside ingestion transaction. Otherwise it may // create invalid state in lookup and history tables. func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) ( - map[string]int64, // deleted rows count - map[string]int64, // new offsets + map[string]LookupTableReapResult, error, ) { if q.GetTx() == nil { - return nil, nil, errors.New("cannot be called outside of an ingestion transaction") + return nil, errors.New("cannot be called outside of an ingestion transaction") } const batchSize = 1000 - deletedCount := make(map[string]int64) - - if offsets == nil { - offsets = make(map[string]int64) - } - + results := map[string]LookupTableReapResult{} for table, historyTables := range map[string][]tableObjectFieldPair{ "history_accounts": { { @@ -1054,9 +1054,10 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) ( }, }, } { + startTime := time.Now() query, err := constructReapLookupTablesQuery(table, historyTables, batchSize, offsets[table]) if err != nil { - return nil, nil, errors.Wrap(err, "error constructing a query") + return nil, errors.Wrap(err, "error constructing a query") } // Find new offset before removing the rows @@ -1066,7 +1067,7 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) ( if q.NoRows(err) { newOffset = 0 } else { - return nil, nil, err + return nil, err } } @@ -1075,18 +1076,21 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) ( query, ) if err != nil { - return nil, nil, errors.Wrapf(err, "error running query: %s", query) + return nil, errors.Wrapf(err, "error running query: %s", query) } rows, err := res.RowsAffected() if err != nil { - return nil, nil, errors.Wrapf(err, "error running RowsAffected after query: %s", query) + return nil, errors.Wrapf(err, "error running RowsAffected after query: %s", query) } - deletedCount[table] = rows - offsets[table] = newOffset + results[table] = LookupTableReapResult{ + Offset: newOffset, + RowsDeleted: rows, + Duration: time.Since(startTime), + } } - return deletedCount, offsets, nil + return results, nil } // constructReapLookupTablesQuery creates a query like (using history_claimable_balances diff --git a/services/horizon/internal/db2/history/reap_test.go b/services/horizon/internal/db2/history/reap_test.go index 508041eb22..0f033c3629 100644 --- a/services/horizon/internal/db2/history/reap_test.go +++ b/services/horizon/internal/db2/history/reap_test.go @@ -52,7 +52,7 @@ func TestReapLookupTables(t *testing.T) { err = q.Begin(tt.Ctx) tt.Require.NoError(err) - deletedCount, newOffsets, err := q.ReapLookupTables(tt.Ctx, nil) + results, err := q.ReapLookupTables(tt.Ctx, nil) tt.Require.NoError(err) err = q.Commit() @@ -77,23 +77,23 @@ func TestReapLookupTables(t *testing.T) { tt.Assert.Equal(25, prevAccounts, "prevAccounts") tt.Assert.Equal(1, curAccounts, "curAccounts") - tt.Assert.Equal(int64(24), deletedCount["history_accounts"], `deletedCount["history_accounts"]`) + tt.Assert.Equal(int64(24), results["history_accounts"].RowsDeleted, `deletedCount["history_accounts"]`) tt.Assert.Equal(7, prevAssets, "prevAssets") tt.Assert.Equal(0, curAssets, "curAssets") - tt.Assert.Equal(int64(7), deletedCount["history_assets"], `deletedCount["history_assets"]`) + tt.Assert.Equal(int64(7), results["history_assets"].RowsDeleted, `deletedCount["history_assets"]`) tt.Assert.Equal(1, prevClaimableBalances, "prevClaimableBalances") tt.Assert.Equal(0, curClaimableBalances, "curClaimableBalances") - tt.Assert.Equal(int64(1), deletedCount["history_claimable_balances"], `deletedCount["history_claimable_balances"]`) + tt.Assert.Equal(int64(1), results["history_claimable_balances"].RowsDeleted, `deletedCount["history_claimable_balances"]`) tt.Assert.Equal(1, prevLiquidityPools, "prevLiquidityPools") tt.Assert.Equal(0, curLiquidityPools, "curLiquidityPools") - tt.Assert.Equal(int64(1), deletedCount["history_liquidity_pools"], `deletedCount["history_liquidity_pools"]`) + tt.Assert.Equal(int64(1), results["history_liquidity_pools"].RowsDeleted, `deletedCount["history_liquidity_pools"]`) - tt.Assert.Len(newOffsets, 4) - tt.Assert.Equal(int64(0), newOffsets["history_accounts"]) - tt.Assert.Equal(int64(0), newOffsets["history_assets"]) - tt.Assert.Equal(int64(0), newOffsets["history_claimable_balances"]) - tt.Assert.Equal(int64(0), newOffsets["history_liquidity_pools"]) + tt.Assert.Len(results, 4) + tt.Assert.Equal(int64(0), results["history_accounts"].Offset) + tt.Assert.Equal(int64(0), results["history_assets"].Offset) + tt.Assert.Equal(int64(0), results["history_claimable_balances"].Offset) + tt.Assert.Equal(int64(0), results["history_liquidity_pools"].Offset) } diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 650a08b426..dec3123f34 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -145,9 +145,8 @@ type Metrics struct { // duration of rebuilding trade aggregation buckets. LedgerIngestionTradeAggregationDuration prometheus.Summary - // LedgerIngestionReapLookupTablesDuration exposes timing metrics about the rate and - // duration of reaping lookup tables. - LedgerIngestionReapLookupTablesDuration prometheus.Summary + ReapDurationByLookupTable *prometheus.SummaryVec + RowsReapedByLookupTable *prometheus.SummaryVec // StateVerifyDuration exposes timing metrics about the rate and // duration of state verification. @@ -228,7 +227,7 @@ type system struct { runStateVerificationOnLedger func(uint32) bool - reapOffsets map[string]int64 + reapOffsetByTable map[string]int64 maxLedgerPerFlush uint32 reaper *Reaper @@ -327,6 +326,7 @@ func NewSystem(config Config) (System, error) { config.ReapConfig, config.HistorySession, ), + reapOffsetByTable: map[string]int64{}, } system.initMetrics() @@ -367,11 +367,17 @@ func (s *system) initMetrics() { Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, }) - s.metrics.LedgerIngestionReapLookupTablesDuration = prometheus.NewSummary(prometheus.SummaryOpts{ - Namespace: "horizon", Subsystem: "ingest", Name: "ledger_ingestion_reap_lookup_tables_duration_seconds", - Help: "ledger ingestion reap lookup tables durations, sliding window = 10m", + s.metrics.ReapDurationByLookupTable = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: "horizon", Subsystem: "ingest", Name: "reap_lookup_tables_duration_seconds", + Help: "reap lookup tables durations, sliding window = 10m", Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, - }) + }, []string{"table"}) + + s.metrics.RowsReapedByLookupTable = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: "horizon", Subsystem: "ingest", Name: "reap_lookup_tables_rows_reaped", + Help: "rows deleted during lookup tables reap, sliding window = 10m", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }, []string{"table"}) s.metrics.StateVerifyDuration = prometheus.NewSummary(prometheus.SummaryOpts{ Namespace: "horizon", Subsystem: "ingest", Name: "state_verify_duration_seconds", @@ -490,7 +496,8 @@ func (s *system) RegisterMetrics(registry *prometheus.Registry) { registry.MustRegister(s.metrics.LocalLatestLedger) registry.MustRegister(s.metrics.LedgerIngestionDuration) registry.MustRegister(s.metrics.LedgerIngestionTradeAggregationDuration) - registry.MustRegister(s.metrics.LedgerIngestionReapLookupTablesDuration) + registry.MustRegister(s.metrics.ReapDurationByLookupTable) + registry.MustRegister(s.metrics.RowsReapedByLookupTable) registry.MustRegister(s.metrics.StateVerifyDuration) registry.MustRegister(s.metrics.StateInvalidGauge) registry.MustRegister(s.metrics.LedgerStatsCounter) @@ -793,7 +800,7 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) { defer cancel() reapStart := time.Now() - deletedCount, newOffsets, err := s.historyQ.ReapLookupTables(ctx, s.reapOffsets) + results, err := s.historyQ.ReapLookupTables(ctx, s.reapOffsetByTable) if err != nil { log.WithError(err).Warn("Error reaping lookup tables") return @@ -807,18 +814,20 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) { totalDeleted := int64(0) reapLog := log - for table, c := range deletedCount { - totalDeleted += c - reapLog = reapLog.WithField(table, c) + for table, result := range results { + totalDeleted += result.RowsDeleted + reapLog = reapLog.WithField(table, result) + s.reapOffsetByTable[table] = result.Offset + s.Metrics().RowsReapedByLookupTable.With(prometheus.Labels{"table": table}).Observe(float64(result.RowsDeleted)) + s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": table}).Observe(result.Duration.Seconds()) } if totalDeleted > 0 { reapLog.Info("Reaper deleted rows from lookup tables") } - s.reapOffsets = newOffsets - reapDuration := time.Since(reapStart).Seconds() - s.Metrics().LedgerIngestionReapLookupTablesDuration.Observe(float64(reapDuration)) + s.Metrics().RowsReapedByLookupTable.With(prometheus.Labels{"table": "total"}).Observe(float64(totalDeleted)) + s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": "total"}).Observe(time.Since(reapStart).Seconds()) } func (s *system) incrementStateVerificationErrors() int { diff --git a/services/horizon/internal/ingest/main_test.go b/services/horizon/internal/ingest/main_test.go index 4f0e220ebe..fde8e40a9c 100644 --- a/services/horizon/internal/ingest/main_test.go +++ b/services/horizon/internal/ingest/main_test.go @@ -562,16 +562,13 @@ func (m *mockDBQ) NewTradeBatchInsertBuilder() history.TradeBatchInsertBuilder { return args.Get(0).(history.TradeBatchInsertBuilder) } -func (m *mockDBQ) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]int64, map[string]int64, error) { +func (m *mockDBQ) ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]history.LookupTableReapResult, error) { args := m.Called(ctx, offsets) - var r1, r2 map[string]int64 + var r1 map[string]history.LookupTableReapResult if args.Get(0) != nil { - r1 = args.Get(0).(map[string]int64) + r1 = args.Get(0).(map[string]history.LookupTableReapResult) } - if args.Get(1) != nil { - r1 = args.Get(1).(map[string]int64) - } - return r1, r2, args.Error(2) + return r1, args.Error(2) } func (m *mockDBQ) RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis, roundingSlippageFilter int) error {