From 6f9397ef9f7c241e6369ea08bff9b3c46a8a0fbe Mon Sep 17 00:00:00 2001 From: tamirms Date: Tue, 30 Jul 2024 14:53:47 +0100 Subject: [PATCH 01/17] reap lookup tables without blocking ingestion --- .../horizon/internal/db2/history/key_value.go | 44 +-- services/horizon/internal/db2/history/main.go | 141 +++++---- .../horizon/internal/db2/history/main_test.go | 24 +- .../horizon/internal/db2/history/reap_test.go | 278 +++++++++--------- services/horizon/internal/ingest/main.go | 98 +++--- 5 files changed, 309 insertions(+), 276 deletions(-) diff --git a/services/horizon/internal/db2/history/key_value.go b/services/horizon/internal/db2/history/key_value.go index 3d23451937..0d64033ebd 100644 --- a/services/horizon/internal/db2/history/key_value.go +++ b/services/horizon/internal/db2/history/key_value.go @@ -5,7 +5,6 @@ import ( "database/sql" "fmt" "strconv" - "strings" sq "github.com/Masterminds/squirrel" @@ -207,41 +206,26 @@ func (q *Q) getValueFromStore(ctx context.Context, key string, forUpdate bool) ( return value, nil } -type KeyValuePair struct { - Key string `db:"key"` - Value string `db:"value"` -} - -func (q *Q) getLookupTableReapOffsets(ctx context.Context) (map[string]int64, error) { - keys := make([]string, 0, len(historyLookupTables)) - for table := range historyLookupTables { - keys = append(keys, table+lookupTableReapOffsetSuffix) - } - offsets := map[string]int64{} - var pairs []KeyValuePair - query := sq.Select("key", "value"). +func (q *Q) getLookupTableReapOffset(ctx context.Context, table string) (int64, error) { + query := sq.Select("value"). From("key_value_store"). Where(map[string]interface{}{ - "key": keys, + "key": table + lookupTableReapOffsetSuffix, }) - err := q.Select(ctx, &pairs, query) + var text string + err := q.Get(ctx, &text, query) if err != nil { - return nil, err - } - for _, pair := range pairs { - table := strings.TrimSuffix(pair.Key, lookupTableReapOffsetSuffix) - if _, ok := historyLookupTables[table]; !ok { - return nil, fmt.Errorf("invalid key: %s", pair.Key) - } - - var offset int64 - offset, err = strconv.ParseInt(pair.Value, 10, 64) - if err != nil { - return nil, fmt.Errorf("invalid offset: %s", pair.Value) + if errors.Cause(err) == sql.ErrNoRows { + return 0, nil } - offsets[table] = offset + return 0, err + } + var offset int64 + offset, err = strconv.ParseInt(text, 10, 64) + if err != nil { + return 0, fmt.Errorf("invalid offset: %s", text) } - return offsets, err + return offset, nil } func (q *Q) updateLookupTableReapOffset(ctx context.Context, table string, offset int64) error { diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index e9d8ffb185..b95bb37f61 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -282,7 +282,8 @@ type IngestionQ interface { NewTradeBatchInsertBuilder() TradeBatchInsertBuilder RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis, roundingSlippageFilter int) error RebuildTradeAggregationBuckets(ctx context.Context, fromLedger, toLedger uint32, roundingSlippageFilter int) error - ReapLookupTables(ctx context.Context, batchSize int) (map[string]LookupTableReapResult, error) + ReapLookupTable(ctx context.Context, table string, ids []int64, newOffset int64) (int64, error) + FindLookupTableRowsToReap(ctx context.Context, table string, batchSize int) ([]int64, int64, error) CreateAssets(ctx context.Context, assets []xdr.Asset, batchSize int) (map[string]Asset, error) QTransactions QTrustLines @@ -977,63 +978,55 @@ type LookupTableReapResult struct { Duration time.Duration } -// ReapLookupTables removes rows from lookup tables like history_claimable_balances -// which aren't used (orphaned), i.e. history entries for them were reaped. -// This method must be executed inside ingestion transaction. Otherwise it may -// create invalid state in lookup and history tables. -func (q *Q) ReapLookupTables(ctx context.Context, batchSize int) ( - map[string]LookupTableReapResult, - error, -) { - if q.GetTx() == nil { - return nil, errors.New("cannot be called outside of an ingestion transaction") +func (q *Q) FindLookupTableRowsToReap(ctx context.Context, table string, batchSize int) ([]int64, int64, error) { + offset, err := q.getLookupTableReapOffset(ctx, table) + if err != nil { + return nil, 0, fmt.Errorf("could not obtain offsets: %w", err) } - offsets, err := q.getLookupTableReapOffsets(ctx) + // Find new offset before removing the rows + var newOffset int64 + err = q.GetRaw(ctx, &newOffset, fmt.Sprintf("SELECT id FROM %s where id >= %d limit 1 offset %d", table, offset, batchSize)) if err != nil { - return nil, fmt.Errorf("could not obtain offsets: %w", err) + if q.NoRows(err) { + newOffset = 0 + } else { + return nil, 0, err + } } - results := map[string]LookupTableReapResult{} - for table, historyTables := range historyLookupTables { - startTime := time.Now() - query := constructReapLookupTablesQuery(table, historyTables, batchSize, offsets[table]) + var ids []int64 + err = q.SelectRaw(ctx, &ids, constructFindReapLookupTablesQuery(table, batchSize, offset)) + if err != nil { + return nil, 0, fmt.Errorf("could not query orphaned rows: %w", err) + } - // Find new offset before removing the rows - var newOffset int64 - err := q.GetRaw(ctx, &newOffset, fmt.Sprintf("SELECT id FROM %s where id >= %d limit 1 offset %d", table, offsets[table], batchSize)) - if err != nil { - if q.NoRows(err) { - newOffset = 0 - } else { - return nil, err - } - } + return ids, newOffset, nil +} - res, err := q.ExecRaw( - context.WithValue(ctx, &db.QueryTypeContextKey, db.DeleteQueryType), - query, - ) - if err != nil { - return nil, errors.Wrapf(err, "error running query: %s", query) - } +func (q *Q) ReapLookupTable(ctx context.Context, table string, ids []int64, newOffset int64) (int64, error) { + if err := q.Begin(ctx); err != nil { + return 0, fmt.Errorf("could not start transaction: %w", err) + } + defer q.Rollback() - if err = q.updateLookupTableReapOffset(ctx, table, newOffset); err != nil { - return nil, fmt.Errorf("error updating offset: %w", err) - } + if err := q.updateLookupTableReapOffset(ctx, table, newOffset); err != nil { + return 0, fmt.Errorf("error updating offset: %w", err) + } - rows, err := res.RowsAffected() + var rowsDeleted int64 + if len(ids) > 0 { + var err error + rowsDeleted, err = q.deleteLookupTableRows(ctx, table, ids) if err != nil { - return nil, errors.Wrapf(err, "error running RowsAffected after query: %s", query) + return 0, fmt.Errorf("could not delete orphaned rows: %w", err) } + } - results[table] = LookupTableReapResult{ - Offset: newOffset, - RowsDeleted: rows, - Duration: time.Since(startTime), - } + if err := q.Commit(); err != nil { + return 0, fmt.Errorf("could not commit transaction: %w", err) } - return results, nil + return rowsDeleted, nil } var historyLookupTables = map[string][]tableObjectFieldPair{ @@ -1125,29 +1118,71 @@ var historyLookupTables = map[string][]tableObjectFieldPair{ // possible that rows will be skipped from deletion. But offset is reset // when it reaches the table size so eventually all orphaned rows are // deleted. -func constructReapLookupTablesQuery(table string, historyTables []tableObjectFieldPair, batchSize int, offset int64) string { +func (q *Q) deleteLookupTableRows(ctx context.Context, table string, ids []int64) (int64, error) { + deleteQuery, args := constructDeleteLookupTableRowsQuery(table, ids) + result, err := q.ExecRaw( + context.WithValue(ctx, &db.QueryTypeContextKey, db.DeleteQueryType), + deleteQuery, + args, + ) + if err != nil { + return 0, fmt.Errorf("error running query: %w", err) + } + var deletedCount int64 + deletedCount, err = result.RowsAffected() + if err != nil { + return 0, fmt.Errorf("error getting deleted count: %w", err) + } + return deletedCount, nil +} + +func constructDeleteLookupTableRowsQuery(table string, ids []int64) (string, []interface{}) { + var conditions []string + for _, referencedTable := range historyLookupTables[table] { + conditions = append( + conditions, + fmt.Sprintf( + "NOT EXISTS ( SELECT 1 as row FROM %s WHERE %s.%s = id LIMIT 1)", + referencedTable.name, + referencedTable.name, referencedTable.objectField, + ), + ) + } + + innerQuery, args := sq.Select("id").From(table).Where(map[string]interface{}{ + "id": ids, + }).OrderBy("id asc").Suffix("FOR UPDATE").MustSql() + + deleteQuery := fmt.Sprintf( + "DELETE FROM %s WHERE id IN ("+ + "WITH ha_batch AS (%s) "+ + "SELECT e1.id as id FROM ha_batch e1 WHERE ", + table, innerQuery, + ) + strings.Join(conditions, " AND ") + ")" + return deleteQuery, args +} + +func constructFindReapLookupTablesQuery(table string, batchSize int, offset int64) string { var conditions []string - for _, historyTable := range historyTables { + for _, referencedTable := range historyLookupTables[table] { conditions = append( conditions, fmt.Sprintf( "NOT EXISTS ( SELECT 1 as row FROM %s WHERE %s.%s = id LIMIT 1)", - historyTable.name, - historyTable.name, historyTable.objectField, + referencedTable.name, + referencedTable.name, referencedTable.objectField, ), ) } return fmt.Sprintf( - "DELETE FROM %s WHERE id IN ("+ - "WITH ha_batch AS (SELECT id FROM %s WHERE id >= %d ORDER BY id limit %d) "+ + "WITH ha_batch AS (SELECT id FROM %s WHERE id >= %d ORDER BY id limit %d) "+ "SELECT e1.id as id FROM ha_batch e1 WHERE ", table, - table, offset, batchSize, - ) + strings.Join(conditions, " AND ") + ")" + ) + strings.Join(conditions, " AND ") } // DeleteRangeAll deletes a range of rows from all history tables between diff --git a/services/horizon/internal/db2/history/main_test.go b/services/horizon/internal/db2/history/main_test.go index 1a28b9e584..b15ca25250 100644 --- a/services/horizon/internal/db2/history/main_test.go +++ b/services/horizon/internal/db2/history/main_test.go @@ -69,20 +69,34 @@ func TestElderLedger(t *testing.T) { } } +func TestConstructDeleteLookupTableRowsQuery(t *testing.T) { + query, args := constructDeleteLookupTableRowsQuery( + "history_accounts", + []int64{100, 20, 30}, + ) + + assert.Equal(t, + "DELETE FROM history_accounts WHERE id IN (WITH ha_batch AS (SELECT id FROM history_accounts WHERE id IN (?,?,?) ORDER BY id asc FOR UPDATE) SELECT e1.id as id FROM ha_batch e1 "+ + "WHERE NOT EXISTS ( SELECT 1 as row FROM history_transaction_participants WHERE history_transaction_participants.history_account_id = id LIMIT 1) "+ + "AND NOT EXISTS ( SELECT 1 as row FROM history_effects WHERE history_effects.history_account_id = id LIMIT 1) "+ + "AND NOT EXISTS ( SELECT 1 as row FROM history_operation_participants WHERE history_operation_participants.history_account_id = id LIMIT 1) "+ + "AND NOT EXISTS ( SELECT 1 as row FROM history_trades WHERE history_trades.base_account_id = id LIMIT 1) "+ + "AND NOT EXISTS ( SELECT 1 as row FROM history_trades WHERE history_trades.counter_account_id = id LIMIT 1))", query) + assert.Equal(t, []interface{}([]interface{}{int64(100), int64(20), int64(30)}), args) +} + func TestConstructReapLookupTablesQuery(t *testing.T) { - query := constructReapLookupTablesQuery( + query := constructFindReapLookupTablesQuery( "history_accounts", - historyLookupTables["history_accounts"], 10, 0, ) assert.Equal(t, - "DELETE FROM history_accounts WHERE id IN ("+ - "WITH ha_batch AS (SELECT id FROM history_accounts WHERE id >= 0 ORDER BY id limit 10) SELECT e1.id as id FROM ha_batch e1 "+ + "WITH ha_batch AS (SELECT id FROM history_accounts WHERE id >= 0 ORDER BY id limit 10) SELECT e1.id as id FROM ha_batch e1 "+ "WHERE NOT EXISTS ( SELECT 1 as row FROM history_transaction_participants WHERE history_transaction_participants.history_account_id = id LIMIT 1) "+ "AND NOT EXISTS ( SELECT 1 as row FROM history_effects WHERE history_effects.history_account_id = id LIMIT 1) "+ "AND NOT EXISTS ( SELECT 1 as row FROM history_operation_participants WHERE history_operation_participants.history_account_id = id LIMIT 1) "+ "AND NOT EXISTS ( SELECT 1 as row FROM history_trades WHERE history_trades.base_account_id = id LIMIT 1) "+ - "AND NOT EXISTS ( SELECT 1 as row FROM history_trades WHERE history_trades.counter_account_id = id LIMIT 1))", query) + "AND NOT EXISTS ( SELECT 1 as row FROM history_trades WHERE history_trades.counter_account_id = id LIMIT 1)", query) } diff --git a/services/horizon/internal/db2/history/reap_test.go b/services/horizon/internal/db2/history/reap_test.go index 5601cd19b6..d400dd5e38 100644 --- a/services/horizon/internal/db2/history/reap_test.go +++ b/services/horizon/internal/db2/history/reap_test.go @@ -3,8 +3,6 @@ package history_test import ( "testing" - "github.com/stellar/go/services/horizon/internal/db2/history" - "github.com/stellar/go/services/horizon/internal/ingest" "github.com/stellar/go/services/horizon/internal/test" ) @@ -12,142 +10,142 @@ func TestReapLookupTables(t *testing.T) { tt := test.Start(t) defer tt.Finish() tt.Scenario("kahuna") - - db := tt.HorizonSession() - reaper := ingest.NewReaper( - ingest.ReapConfig{ - RetentionCount: 1, - BatchSize: 50, - }, - db, - ) - - var ( - prevLedgers, curLedgers int - prevAccounts, curAccounts int - prevAssets, curAssets int - prevClaimableBalances, curClaimableBalances int - prevLiquidityPools, curLiquidityPools int - ) - - err := db.GetRaw(tt.Ctx, &prevLedgers, `SELECT COUNT(*) FROM history_ledgers`) - tt.Require.NoError(err) - err = db.GetRaw(tt.Ctx, &prevAccounts, `SELECT COUNT(*) FROM history_accounts`) - tt.Require.NoError(err) - err = db.GetRaw(tt.Ctx, &prevAssets, `SELECT COUNT(*) FROM history_assets`) - tt.Require.NoError(err) - err = db.GetRaw(tt.Ctx, &prevClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`) - tt.Require.NoError(err) - err = db.GetRaw(tt.Ctx, &prevLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`) - tt.Require.NoError(err) - - err = reaper.DeleteUnretainedHistory(tt.Ctx) - tt.Require.NoError(err) - - q := &history.Q{tt.HorizonSession()} - - err = q.Begin(tt.Ctx) - tt.Require.NoError(err) - - results, err := q.ReapLookupTables(tt.Ctx, 5) - tt.Require.NoError(err) - - err = q.Commit() - tt.Require.NoError(err) - - err = db.GetRaw(tt.Ctx, &curLedgers, `SELECT COUNT(*) FROM history_ledgers`) - tt.Require.NoError(err) - err = db.GetRaw(tt.Ctx, &curAccounts, `SELECT COUNT(*) FROM history_accounts`) - tt.Require.NoError(err) - err = db.GetRaw(tt.Ctx, &curAssets, `SELECT COUNT(*) FROM history_assets`) - tt.Require.NoError(err) - err = db.GetRaw(tt.Ctx, &curClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`) - tt.Require.NoError(err) - err = db.GetRaw(tt.Ctx, &curLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`) - tt.Require.NoError(err) - - tt.Assert.Equal(61, prevLedgers, "prevLedgers") - tt.Assert.Equal(1, curLedgers, "curLedgers") - - tt.Assert.Equal(25, prevAccounts, "prevAccounts") - tt.Assert.Equal(21, curAccounts, "curAccounts") - tt.Assert.Equal(int64(4), results["history_accounts"].RowsDeleted, `deletedCount["history_accounts"]`) - - tt.Assert.Equal(7, prevAssets, "prevAssets") - tt.Assert.Equal(2, curAssets, "curAssets") - tt.Assert.Equal(int64(5), results["history_assets"].RowsDeleted, `deletedCount["history_assets"]`) - - tt.Assert.Equal(1, prevClaimableBalances, "prevClaimableBalances") - tt.Assert.Equal(0, curClaimableBalances, "curClaimableBalances") - tt.Assert.Equal(int64(1), results["history_claimable_balances"].RowsDeleted, `deletedCount["history_claimable_balances"]`) - - tt.Assert.Equal(1, prevLiquidityPools, "prevLiquidityPools") - tt.Assert.Equal(0, curLiquidityPools, "curLiquidityPools") - tt.Assert.Equal(int64(1), results["history_liquidity_pools"].RowsDeleted, `deletedCount["history_liquidity_pools"]`) - - tt.Assert.Len(results, 4) - tt.Assert.Equal(int64(6), results["history_accounts"].Offset) - tt.Assert.Equal(int64(6), results["history_assets"].Offset) - tt.Assert.Equal(int64(0), results["history_claimable_balances"].Offset) - tt.Assert.Equal(int64(0), results["history_liquidity_pools"].Offset) - - err = q.Begin(tt.Ctx) - tt.Require.NoError(err) - - results, err = q.ReapLookupTables(tt.Ctx, 5) - tt.Require.NoError(err) - - err = q.Commit() - tt.Require.NoError(err) - - err = db.GetRaw(tt.Ctx, &curAccounts, `SELECT COUNT(*) FROM history_accounts`) - tt.Require.NoError(err) - err = db.GetRaw(tt.Ctx, &curAssets, `SELECT COUNT(*) FROM history_assets`) - tt.Require.NoError(err) - - tt.Assert.Equal(16, curAccounts, "curAccounts") - tt.Assert.Equal(int64(5), results["history_accounts"].RowsDeleted, `deletedCount["history_accounts"]`) - - tt.Assert.Equal(0, curAssets, "curAssets") - tt.Assert.Equal(int64(2), results["history_assets"].RowsDeleted, `deletedCount["history_assets"]`) - - tt.Assert.Equal(int64(0), results["history_claimable_balances"].RowsDeleted, `deletedCount["history_claimable_balances"]`) - - tt.Assert.Equal(int64(0), results["history_liquidity_pools"].RowsDeleted, `deletedCount["history_liquidity_pools"]`) - - tt.Assert.Len(results, 4) - tt.Assert.Equal(int64(11), results["history_accounts"].Offset) - tt.Assert.Equal(int64(0), results["history_assets"].Offset) - tt.Assert.Equal(int64(0), results["history_claimable_balances"].Offset) - tt.Assert.Equal(int64(0), results["history_liquidity_pools"].Offset) - - err = q.Begin(tt.Ctx) - tt.Require.NoError(err) - - results, err = q.ReapLookupTables(tt.Ctx, 1000) - tt.Require.NoError(err) - - err = q.Commit() - tt.Require.NoError(err) - - err = db.GetRaw(tt.Ctx, &curAccounts, `SELECT COUNT(*) FROM history_accounts`) - tt.Require.NoError(err) - err = db.GetRaw(tt.Ctx, &curAssets, `SELECT COUNT(*) FROM history_assets`) - tt.Require.NoError(err) - - tt.Assert.Equal(1, curAccounts, "curAccounts") - tt.Assert.Equal(int64(15), results["history_accounts"].RowsDeleted, `deletedCount["history_accounts"]`) - - tt.Assert.Equal(0, curAssets, "curAssets") - tt.Assert.Equal(int64(0), results["history_assets"].RowsDeleted, `deletedCount["history_assets"]`) - - tt.Assert.Equal(int64(0), results["history_claimable_balances"].RowsDeleted, `deletedCount["history_claimable_balances"]`) - - tt.Assert.Equal(int64(0), results["history_liquidity_pools"].RowsDeleted, `deletedCount["history_liquidity_pools"]`) - - tt.Assert.Len(results, 4) - tt.Assert.Equal(int64(0), results["history_accounts"].Offset) - tt.Assert.Equal(int64(0), results["history_assets"].Offset) - tt.Assert.Equal(int64(0), results["history_claimable_balances"].Offset) - tt.Assert.Equal(int64(0), results["history_liquidity_pools"].Offset) + // + //db := tt.HorizonSession() + //reaper := ingest.NewReaper( + // ingest.ReapConfig{ + // RetentionCount: 1, + // BatchSize: 50, + // }, + // db, + //) + // + //var ( + // prevLedgers, curLedgers int + // prevAccounts, curAccounts int + // prevAssets, curAssets int + // prevClaimableBalances, curClaimableBalances int + // prevLiquidityPools, curLiquidityPools int + //) + // + //err := db.GetRaw(tt.Ctx, &prevLedgers, `SELECT COUNT(*) FROM history_ledgers`) + //tt.Require.NoError(err) + //err = db.GetRaw(tt.Ctx, &prevAccounts, `SELECT COUNT(*) FROM history_accounts`) + //tt.Require.NoError(err) + //err = db.GetRaw(tt.Ctx, &prevAssets, `SELECT COUNT(*) FROM history_assets`) + //tt.Require.NoError(err) + //err = db.GetRaw(tt.Ctx, &prevClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`) + //tt.Require.NoError(err) + //err = db.GetRaw(tt.Ctx, &prevLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`) + //tt.Require.NoError(err) + // + //err = reaper.DeleteUnretainedHistory(tt.Ctx) + //tt.Require.NoError(err) + // + //q := &history.Q{tt.HorizonSession()} + // + //err = q.Begin(tt.Ctx) + //tt.Require.NoError(err) + // + //results, err := q.ReapLookupTables(tt.Ctx, 5) + //tt.Require.NoError(err) + // + //err = q.Commit() + //tt.Require.NoError(err) + // + //err = db.GetRaw(tt.Ctx, &curLedgers, `SELECT COUNT(*) FROM history_ledgers`) + //tt.Require.NoError(err) + //err = db.GetRaw(tt.Ctx, &curAccounts, `SELECT COUNT(*) FROM history_accounts`) + //tt.Require.NoError(err) + //err = db.GetRaw(tt.Ctx, &curAssets, `SELECT COUNT(*) FROM history_assets`) + //tt.Require.NoError(err) + //err = db.GetRaw(tt.Ctx, &curClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`) + //tt.Require.NoError(err) + //err = db.GetRaw(tt.Ctx, &curLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`) + //tt.Require.NoError(err) + // + //tt.Assert.Equal(61, prevLedgers, "prevLedgers") + //tt.Assert.Equal(1, curLedgers, "curLedgers") + // + //tt.Assert.Equal(25, prevAccounts, "prevAccounts") + //tt.Assert.Equal(21, curAccounts, "curAccounts") + //tt.Assert.Equal(int64(4), results["history_accounts"].RowsDeleted, `deletedCount["history_accounts"]`) + // + //tt.Assert.Equal(7, prevAssets, "prevAssets") + //tt.Assert.Equal(2, curAssets, "curAssets") + //tt.Assert.Equal(int64(5), results["history_assets"].RowsDeleted, `deletedCount["history_assets"]`) + // + //tt.Assert.Equal(1, prevClaimableBalances, "prevClaimableBalances") + //tt.Assert.Equal(0, curClaimableBalances, "curClaimableBalances") + //tt.Assert.Equal(int64(1), results["history_claimable_balances"].RowsDeleted, `deletedCount["history_claimable_balances"]`) + // + //tt.Assert.Equal(1, prevLiquidityPools, "prevLiquidityPools") + //tt.Assert.Equal(0, curLiquidityPools, "curLiquidityPools") + //tt.Assert.Equal(int64(1), results["history_liquidity_pools"].RowsDeleted, `deletedCount["history_liquidity_pools"]`) + // + //tt.Assert.Len(results, 4) + //tt.Assert.Equal(int64(6), results["history_accounts"].Offset) + //tt.Assert.Equal(int64(6), results["history_assets"].Offset) + //tt.Assert.Equal(int64(0), results["history_claimable_balances"].Offset) + //tt.Assert.Equal(int64(0), results["history_liquidity_pools"].Offset) + // + //err = q.Begin(tt.Ctx) + //tt.Require.NoError(err) + // + //results, err = q.ReapLookupTables(tt.Ctx, 5) + //tt.Require.NoError(err) + // + //err = q.Commit() + //tt.Require.NoError(err) + // + //err = db.GetRaw(tt.Ctx, &curAccounts, `SELECT COUNT(*) FROM history_accounts`) + //tt.Require.NoError(err) + //err = db.GetRaw(tt.Ctx, &curAssets, `SELECT COUNT(*) FROM history_assets`) + //tt.Require.NoError(err) + // + //tt.Assert.Equal(16, curAccounts, "curAccounts") + //tt.Assert.Equal(int64(5), results["history_accounts"].RowsDeleted, `deletedCount["history_accounts"]`) + // + //tt.Assert.Equal(0, curAssets, "curAssets") + //tt.Assert.Equal(int64(2), results["history_assets"].RowsDeleted, `deletedCount["history_assets"]`) + // + //tt.Assert.Equal(int64(0), results["history_claimable_balances"].RowsDeleted, `deletedCount["history_claimable_balances"]`) + // + //tt.Assert.Equal(int64(0), results["history_liquidity_pools"].RowsDeleted, `deletedCount["history_liquidity_pools"]`) + // + //tt.Assert.Len(results, 4) + //tt.Assert.Equal(int64(11), results["history_accounts"].Offset) + //tt.Assert.Equal(int64(0), results["history_assets"].Offset) + //tt.Assert.Equal(int64(0), results["history_claimable_balances"].Offset) + //tt.Assert.Equal(int64(0), results["history_liquidity_pools"].Offset) + // + //err = q.Begin(tt.Ctx) + //tt.Require.NoError(err) + // + //results, err = q.ReapLookupTables(tt.Ctx, 1000) + //tt.Require.NoError(err) + // + //err = q.Commit() + //tt.Require.NoError(err) + // + //err = db.GetRaw(tt.Ctx, &curAccounts, `SELECT COUNT(*) FROM history_accounts`) + //tt.Require.NoError(err) + //err = db.GetRaw(tt.Ctx, &curAssets, `SELECT COUNT(*) FROM history_assets`) + //tt.Require.NoError(err) + // + //tt.Assert.Equal(1, curAccounts, "curAccounts") + //tt.Assert.Equal(int64(15), results["history_accounts"].RowsDeleted, `deletedCount["history_accounts"]`) + // + //tt.Assert.Equal(0, curAssets, "curAssets") + //tt.Assert.Equal(int64(0), results["history_assets"].RowsDeleted, `deletedCount["history_assets"]`) + // + //tt.Assert.Equal(int64(0), results["history_claimable_balances"].RowsDeleted, `deletedCount["history_claimable_balances"]`) + // + //tt.Assert.Equal(int64(0), results["history_liquidity_pools"].RowsDeleted, `deletedCount["history_liquidity_pools"]`) + // + //tt.Assert.Len(results, 4) + //tt.Assert.Equal(int64(0), results["history_accounts"].Offset) + //tt.Assert.Equal(int64(0), results["history_assets"].Offset) + //tt.Assert.Equal(int64(0), results["history_claimable_balances"].Offset) + //tt.Assert.Equal(int64(0), results["history_liquidity_pools"].Offset) } diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 64e4558723..b90f487b4a 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -413,7 +413,7 @@ func (s *system) initMetrics() { Namespace: "horizon", Subsystem: "ingest", Name: "reap_lookup_tables_duration_seconds", Help: "reap lookup tables durations, sliding window = 10m", Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, - }, []string{"table"}) + }, []string{"table", "type"}) s.metrics.RowsReapedByLookupTable = prometheus.NewSummaryVec(prometheus.SummaryOpts{ Namespace: "horizon", Subsystem: "ingest", Name: "reap_lookup_tables_rows_reaped", @@ -822,55 +822,57 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) { return } - err = s.historyQ.Begin(s.ctx) - if err != nil { - log.WithError(err).Error("Error starting a transaction") - return - } - defer s.historyQ.Rollback() - - // If so block ingestion in the cluster to reap tables - _, err = s.historyQ.GetLastLedgerIngest(s.ctx) - if err != nil { - log.WithError(err).Error(getLastIngestedErrMsg) - return - } - - // Make sure reaping will not take more than 5s, which is average ledger - // closing time. - ctx, cancel := context.WithTimeout(s.ctx, 5*time.Second) - defer cancel() - - reapStart := time.Now() - results, err := s.historyQ.ReapLookupTables(ctx, reapLookupTablesBatchSize) - if err != nil { - log.WithError(err).Warn("Error reaping lookup tables") - return - } - - err = s.historyQ.Commit() - if err != nil { - log.WithError(err).Error("Error committing a transaction") - return - } - - totalDeleted := int64(0) - reapLog := log - for table, result := range results { - totalDeleted += result.RowsDeleted - reapLog = reapLog.WithField(table+"_offset", result.Offset) - reapLog = reapLog.WithField(table+"_duration", result.Duration) - reapLog = reapLog.WithField(table+"_rows_deleted", result.RowsDeleted) - s.Metrics().RowsReapedByLookupTable.With(prometheus.Labels{"table": table}).Observe(float64(result.RowsDeleted)) - s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": table}).Observe(result.Duration.Seconds()) - } + s.wg.Add(1) + go func() { + defer s.wg.Done() - if totalDeleted > 0 { - reapLog.Info("Reaper deleted rows from lookup tables") - } + reapStart := time.Now() + var totalQueryDuration, totalDeleteDuration time.Duration + var totalDeleted int64 + for _, table := range []string{ + "history_accounts", "history_claimable_balances", + "history_assets", "history_liquidity_pools", + } { + startTime := time.Now() + ids, offset, err := s.historyQ.FindLookupTableRowsToReap(s.ctx, table, reapLookupTablesBatchSize) + if err != nil { + log.WithField("table", table).WithError(err).Warn("Error finding orphaned rows") + return + } + queryDuration := time.Since(startTime) + totalQueryDuration += queryDuration + + deleteStartTime := time.Now() + var rowsDeleted int64 + rowsDeleted, err = s.historyQ.ReapLookupTable(s.ctx, table, ids, offset) + deleteDuration := time.Since(deleteStartTime) + totalDeleteDuration += deleteDuration + + s.Metrics().RowsReapedByLookupTable.With(prometheus.Labels{"table": table}). + Observe(float64(rowsDeleted)) + s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": table, "type": "query"}). + Observe(float64(queryDuration)) + s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": table, "type": "delete"}). + Observe(float64(deleteDuration)) + s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": table, "type": "total"}). + Observe(float64(queryDuration + deleteDuration)) + + log.WithField("table", table). + WithField("offset", offset). + WithField(table+"rows_deleted", rowsDeleted). + WithField("query_duration", queryDuration.Seconds()). + WithField("delete_duration", deleteDuration.Seconds()) + } - s.Metrics().RowsReapedByLookupTable.With(prometheus.Labels{"table": "total"}).Observe(float64(totalDeleted)) - s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": "total"}).Observe(time.Since(reapStart).Seconds()) + s.Metrics().RowsReapedByLookupTable.With(prometheus.Labels{"table": "total"}). + Observe(float64(totalDeleted)) + s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": "total", "type": "query"}). + Observe(float64(totalQueryDuration)) + s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": "total", "type": "delete"}). + Observe(float64(totalDeleteDuration)) + s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": "total", "type": "total"}). + Observe(time.Since(reapStart).Seconds()) + }() } func (s *system) incrementStateVerificationErrors() int { From 90e56fc39aca6bd2ca95aceab2e49b0e2eb05f05 Mon Sep 17 00:00:00 2001 From: tamirms Date: Tue, 30 Jul 2024 20:13:29 +0100 Subject: [PATCH 02/17] refactor --- services/horizon/internal/db2/history/main.go | 1 + .../internal/db2/history/verify_lock.go | 7 +- services/horizon/internal/ingest/main.go | 77 ++---------- services/horizon/internal/ingest/main_test.go | 20 ++-- services/horizon/internal/ingest/reap.go | 111 ++++++++++++++++++ 5 files changed, 139 insertions(+), 77 deletions(-) diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index b95bb37f61..dcb21fb20d 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -308,6 +308,7 @@ type IngestionQ interface { GetNextLedgerSequence(context.Context, uint32) (uint32, bool, error) TryStateVerificationLock(context.Context) (bool, error) TryReaperLock(context.Context) (bool, error) + TryLookupTableReaperLock(ctx context.Context) (bool, error) ElderLedger(context.Context, interface{}) error } diff --git a/services/horizon/internal/db2/history/verify_lock.go b/services/horizon/internal/db2/history/verify_lock.go index 29bc11a473..b5d5d37e7f 100644 --- a/services/horizon/internal/db2/history/verify_lock.go +++ b/services/horizon/internal/db2/history/verify_lock.go @@ -15,7 +15,8 @@ const ( // reaperLockId is the objid for the advisory lock acquired during // reaping. The value is arbitrary. The only requirement is that // all ingesting nodes use the same value which is why it's hard coded here. - reaperLockId = 944670730 + reaperLockId = 944670730 + lookupTableReaperLockId = 329518896 ) // TryStateVerificationLock attempts to acquire the state verification lock @@ -34,6 +35,10 @@ func (q *Q) TryReaperLock(ctx context.Context) (bool, error) { return q.tryAdvisoryLock(ctx, reaperLockId) } +func (q *Q) TryLookupTableReaperLock(ctx context.Context) (bool, error) { + return q.tryAdvisoryLock(ctx, lookupTableReaperLockId) +} + func (q *Q) tryAdvisoryLock(ctx context.Context, lockId int) (bool, error) { if tx := q.GetTx(); tx == nil { return false, errors.New("cannot be called outside of a transaction") diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index b90f487b4a..63ee7ba457 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -70,16 +70,15 @@ const ( // * Ledger ingestion, // * State verifications, // * Metrics updates. - // * Reaping (requires 2 connections, the extra connection is used for holding the advisory lock) - MaxDBConnections = 5 + // * Reaping of history (requires 2 connections, the extra connection is used for holding the advisory lock) + // * Reaping of lookup tables (requires 2 connections, the extra connection is used for holding the advisory lock) + MaxDBConnections = 7 stateVerificationErrorThreshold = 3 // 100 ledgers per flush has shown in stress tests // to be best point on performance curve, default to that. MaxLedgersPerFlush uint32 = 100 - - reapLookupTablesBatchSize = 1000 ) var log = logpkg.DefaultLogger.WithField("service", "ingest") @@ -172,9 +171,6 @@ type Metrics struct { // duration of rebuilding trade aggregation buckets. LedgerIngestionTradeAggregationDuration prometheus.Summary - ReapDurationByLookupTable *prometheus.SummaryVec - RowsReapedByLookupTable *prometheus.SummaryVec - // StateVerifyDuration exposes timing metrics about the rate and // duration of state verification. StateVerifyDuration prometheus.Summary @@ -256,7 +252,8 @@ type system struct { maxLedgerPerFlush uint32 - reaper *Reaper + reaper *Reaper + lookupTableReaper *lookupTableReaper currentStateMutex sync.Mutex currentState State @@ -369,6 +366,7 @@ func NewSystem(config Config) (System, error) { config.ReapConfig, config.HistorySession, ), + lookupTableReaper: newLookupTableReaper(config.HistorySession), } system.initMetrics() @@ -409,18 +407,6 @@ func (s *system) initMetrics() { Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, }) - s.metrics.ReapDurationByLookupTable = prometheus.NewSummaryVec(prometheus.SummaryOpts{ - Namespace: "horizon", Subsystem: "ingest", Name: "reap_lookup_tables_duration_seconds", - Help: "reap lookup tables durations, sliding window = 10m", - Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, - }, []string{"table", "type"}) - - s.metrics.RowsReapedByLookupTable = prometheus.NewSummaryVec(prometheus.SummaryOpts{ - Namespace: "horizon", Subsystem: "ingest", Name: "reap_lookup_tables_rows_reaped", - Help: "rows deleted during lookup tables reap, sliding window = 10m", - Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, - }, []string{"table"}) - s.metrics.StateVerifyDuration = prometheus.NewSummary(prometheus.SummaryOpts{ Namespace: "horizon", Subsystem: "ingest", Name: "state_verify_duration_seconds", Help: "state verification durations, sliding window = 10m", @@ -538,8 +524,6 @@ func (s *system) RegisterMetrics(registry *prometheus.Registry) { registry.MustRegister(s.metrics.LocalLatestLedger) registry.MustRegister(s.metrics.LedgerIngestionDuration) registry.MustRegister(s.metrics.LedgerIngestionTradeAggregationDuration) - registry.MustRegister(s.metrics.ReapDurationByLookupTable) - registry.MustRegister(s.metrics.RowsReapedByLookupTable) registry.MustRegister(s.metrics.StateVerifyDuration) registry.MustRegister(s.metrics.StateInvalidGauge) registry.MustRegister(s.metrics.LedgerStatsCounter) @@ -552,6 +536,7 @@ func (s *system) RegisterMetrics(registry *prometheus.Registry) { registry.MustRegister(s.metrics.IngestionErrorCounter) s.ledgerBackend = ledgerbackend.WithMetrics(s.ledgerBackend, registry, "horizon") s.reaper.RegisterMetrics(registry) + s.lookupTableReaper.RegisterMetrics(registry) } // Run starts ingestion system. Ingestion system supports distributed ingestion @@ -825,53 +810,7 @@ func (s *system) maybeReapLookupTables(lastIngestedLedger uint32) { s.wg.Add(1) go func() { defer s.wg.Done() - - reapStart := time.Now() - var totalQueryDuration, totalDeleteDuration time.Duration - var totalDeleted int64 - for _, table := range []string{ - "history_accounts", "history_claimable_balances", - "history_assets", "history_liquidity_pools", - } { - startTime := time.Now() - ids, offset, err := s.historyQ.FindLookupTableRowsToReap(s.ctx, table, reapLookupTablesBatchSize) - if err != nil { - log.WithField("table", table).WithError(err).Warn("Error finding orphaned rows") - return - } - queryDuration := time.Since(startTime) - totalQueryDuration += queryDuration - - deleteStartTime := time.Now() - var rowsDeleted int64 - rowsDeleted, err = s.historyQ.ReapLookupTable(s.ctx, table, ids, offset) - deleteDuration := time.Since(deleteStartTime) - totalDeleteDuration += deleteDuration - - s.Metrics().RowsReapedByLookupTable.With(prometheus.Labels{"table": table}). - Observe(float64(rowsDeleted)) - s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": table, "type": "query"}). - Observe(float64(queryDuration)) - s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": table, "type": "delete"}). - Observe(float64(deleteDuration)) - s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": table, "type": "total"}). - Observe(float64(queryDuration + deleteDuration)) - - log.WithField("table", table). - WithField("offset", offset). - WithField(table+"rows_deleted", rowsDeleted). - WithField("query_duration", queryDuration.Seconds()). - WithField("delete_duration", deleteDuration.Seconds()) - } - - s.Metrics().RowsReapedByLookupTable.With(prometheus.Labels{"table": "total"}). - Observe(float64(totalDeleted)) - s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": "total", "type": "query"}). - Observe(float64(totalQueryDuration)) - s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": "total", "type": "delete"}). - Observe(float64(totalDeleteDuration)) - s.Metrics().ReapDurationByLookupTable.With(prometheus.Labels{"table": "total", "type": "total"}). - Observe(time.Since(reapStart).Seconds()) + s.lookupTableReaper.deleteOrphanedRows(s.ctx) }() } diff --git a/services/horizon/internal/ingest/main_test.go b/services/horizon/internal/ingest/main_test.go index d5733ee5e4..470039fd92 100644 --- a/services/horizon/internal/ingest/main_test.go +++ b/services/horizon/internal/ingest/main_test.go @@ -462,6 +462,11 @@ func (m *mockDBQ) TryReaperLock(ctx context.Context) (bool, error) { return args.Get(0).(bool), args.Error(1) } +func (m *mockDBQ) TryLookupTableReaperLock(ctx context.Context) (bool, error) { + args := m.Called(ctx) + return args.Get(0).(bool), args.Error(1) +} + func (m *mockDBQ) GetNextLedgerSequence(ctx context.Context, start uint32) (uint32, bool, error) { args := m.Called(ctx, start) return args.Get(0).(uint32), args.Get(1).(bool), args.Error(2) @@ -562,13 +567,14 @@ func (m *mockDBQ) NewTradeBatchInsertBuilder() history.TradeBatchInsertBuilder { return args.Get(0).(history.TradeBatchInsertBuilder) } -func (m *mockDBQ) ReapLookupTables(ctx context.Context, batchSize int) (map[string]history.LookupTableReapResult, error) { - args := m.Called(ctx, batchSize) - var r1 map[string]history.LookupTableReapResult - if args.Get(0) != nil { - r1 = args.Get(0).(map[string]history.LookupTableReapResult) - } - return r1, args.Error(2) +func (m *mockDBQ) FindLookupTableRowsToReap(ctx context.Context, table string, batchSize int) ([]int64, int64, error) { + args := m.Called(ctx, table, batchSize) + return args.Get(0).([]int64), args.Get(1).(int64), args.Error(2) +} + +func (m *mockDBQ) ReapLookupTable(ctx context.Context, table string, ids []int64, offset int64) (int64, error) { + args := m.Called(ctx, table, ids, offset) + return args.Get(0).(int64), args.Error(1) } func (m *mockDBQ) RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis, roundingSlippageFilter int) error { diff --git a/services/horizon/internal/ingest/reap.go b/services/horizon/internal/ingest/reap.go index 07a61a4cde..16a473c54c 100644 --- a/services/horizon/internal/ingest/reap.go +++ b/services/horizon/internal/ingest/reap.go @@ -16,6 +16,8 @@ import ( "github.com/stellar/go/toid" ) +const reapLookupTablesBatchSize = 1000 + // Reaper represents the history reaping subsystem of horizon. type Reaper struct { historyQ history.IngestionQ @@ -243,3 +245,112 @@ func (r *Reaper) deleteBatch(ctx context.Context, batchStartSeq, batchEndSeq uin r.deleteBatchDuration.Observe(elapsedSeconds) return count, nil } + +type lookupTableReaper struct { + historyQ history.IngestionQ + reapLockQ history.IngestionQ + pending atomic.Bool + logger *logpkg.Entry + + reapDurationByLookupTable *prometheus.SummaryVec + rowsReapedByLookupTable *prometheus.SummaryVec +} + +func newLookupTableReaper(dbSession db.SessionInterface) *lookupTableReaper { + return &lookupTableReaper{ + historyQ: &history.Q{dbSession.Clone()}, + reapLockQ: &history.Q{dbSession.Clone()}, + pending: atomic.Bool{}, + logger: log.WithField("subservice", "lookuptable-reaper"), + reapDurationByLookupTable: prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: "horizon", Subsystem: "ingest", Name: "reap_lookup_tables_duration_seconds", + Help: "reap lookup tables durations, sliding window = 10m", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }, []string{"table", "type"}), + rowsReapedByLookupTable: prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: "horizon", Subsystem: "ingest", Name: "reap_lookup_tables_rows_reaped", + Help: "rows deleted during lookup tables reap, sliding window = 10m", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }, []string{"table"}), + } +} + +func (r *lookupTableReaper) RegisterMetrics(registry *prometheus.Registry) { + registry.MustRegister( + r.reapDurationByLookupTable, + r.rowsReapedByLookupTable, + ) +} + +func (r *lookupTableReaper) deleteOrphanedRows(ctx context.Context) error { + // check if reap is already in progress on this horizon node + if !r.pending.CompareAndSwap(false, true) { + r.logger.Infof("existing reap already in progress, skipping request to start a new one") + return nil + } + defer r.pending.Store(false) + + if err := r.reapLockQ.Begin(ctx); err != nil { + return errors.Wrap(err, "error while starting reaper lock transaction") + } + defer func() { + if err := r.reapLockQ.Rollback(); err != nil { + r.logger.WithField("error", err).Error("failed to release reaper lock") + } + }() + // check if reap is already in progress on another horizon node + if acquired, err := r.reapLockQ.TryLookupTableReaperLock(ctx); err != nil { + return errors.Wrap(err, "error while acquiring reaper database lock") + } else if !acquired { + r.logger.Info("reap already in progress on another node") + return nil + } + + reapStart := time.Now() + var totalQueryDuration, totalDeleteDuration time.Duration + var totalDeleted int64 + for _, table := range []string{ + "history_accounts", "history_claimable_balances", + "history_assets", "history_liquidity_pools", + } { + startTime := time.Now() + ids, offset, err := r.historyQ.FindLookupTableRowsToReap(ctx, table, reapLookupTablesBatchSize) + if err != nil { + log.WithField("table", table).WithError(err).Warn("Error finding orphaned rows") + return err + } + queryDuration := time.Since(startTime) + totalQueryDuration += queryDuration + + deleteStartTime := time.Now() + var rowsDeleted int64 + rowsDeleted, err = r.historyQ.ReapLookupTable(ctx, table, ids, offset) + deleteDuration := time.Since(deleteStartTime) + totalDeleteDuration += deleteDuration + + r.rowsReapedByLookupTable.With(prometheus.Labels{"table": table}). + Observe(float64(rowsDeleted)) + r.reapDurationByLookupTable.With(prometheus.Labels{"table": table, "type": "query"}). + Observe(float64(queryDuration)) + r.reapDurationByLookupTable.With(prometheus.Labels{"table": table, "type": "delete"}). + Observe(float64(deleteDuration)) + r.reapDurationByLookupTable.With(prometheus.Labels{"table": table, "type": "total"}). + Observe(float64(queryDuration + deleteDuration)) + + log.WithField("table", table). + WithField("offset", offset). + WithField(table+"rows_deleted", rowsDeleted). + WithField("query_duration", queryDuration.Seconds()). + WithField("delete_duration", deleteDuration.Seconds()) + } + + r.rowsReapedByLookupTable.With(prometheus.Labels{"table": "total"}). + Observe(float64(totalDeleted)) + r.reapDurationByLookupTable.With(prometheus.Labels{"table": "total", "type": "query"}). + Observe(float64(totalQueryDuration)) + r.reapDurationByLookupTable.With(prometheus.Labels{"table": "total", "type": "delete"}). + Observe(float64(totalDeleteDuration)) + r.reapDurationByLookupTable.With(prometheus.Labels{"table": "total", "type": "total"}). + Observe(time.Since(reapStart).Seconds()) + return nil +} From acbe6e091a5525539e1cf566c397f925ce4540cf Mon Sep 17 00:00:00 2001 From: tamirms Date: Tue, 30 Jul 2024 20:41:18 +0100 Subject: [PATCH 03/17] fix missing error check --- services/horizon/internal/ingest/reap.go | 4 ++++ services/horizon/internal/ingest/resume_state_test.go | 10 ---------- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/services/horizon/internal/ingest/reap.go b/services/horizon/internal/ingest/reap.go index 16a473c54c..7ae19cb200 100644 --- a/services/horizon/internal/ingest/reap.go +++ b/services/horizon/internal/ingest/reap.go @@ -325,6 +325,10 @@ func (r *lookupTableReaper) deleteOrphanedRows(ctx context.Context) error { deleteStartTime := time.Now() var rowsDeleted int64 rowsDeleted, err = r.historyQ.ReapLookupTable(ctx, table, ids, offset) + if err != nil { + log.WithField("table", table).WithError(err).Warn("Error deleting orphaned rows") + return err + } deleteDuration := time.Since(deleteStartTime) totalDeleteDuration += deleteDuration diff --git a/services/horizon/internal/ingest/resume_state_test.go b/services/horizon/internal/ingest/resume_state_test.go index feb5e13bb0..534ec555f6 100644 --- a/services/horizon/internal/ingest/resume_state_test.go +++ b/services/horizon/internal/ingest/resume_state_test.go @@ -402,10 +402,6 @@ func (s *ResumeTestTestSuite) TestReapingObjectsDisabled() { } func (s *ResumeTestTestSuite) TestErrorReapingObjectsIgnored() { - s.system.config.ReapLookupTables = true - defer func() { - s.system.config.ReapLookupTables = false - }() s.historyQ.On("Begin", s.ctx).Return(nil).Once() s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(100), nil).Once() s.historyQ.On("GetIngestVersion", s.ctx).Return(CurrentVersion, nil).Once() @@ -425,12 +421,6 @@ func (s *ResumeTestTestSuite) TestErrorReapingObjectsIgnored() { s.historyQ.On("GetExpStateInvalid", s.ctx).Return(false, nil).Once() s.historyQ.On("RebuildTradeAggregationBuckets", s.ctx, uint32(101), uint32(101), 0).Return(nil).Once() - // Reap lookup tables: - s.ledgerBackend.On("GetLatestLedgerSequence", s.ctx).Return(uint32(101), nil) - s.historyQ.On("Begin", s.ctx).Return(nil).Once() - s.historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(100), nil).Once() - s.historyQ.On("ReapLookupTables", mock.AnythingOfType("*context.timerCtx"), mock.Anything).Return(nil, nil, errors.New("error reaping objects")).Once() - s.historyQ.On("Rollback").Return(nil).Once() mockStats := &historyarchive.MockArchiveStats{} mockStats.On("GetBackendName").Return("name") mockStats.On("GetDownloads").Return(uint32(0)) From 0d2c17ddef8ad42dcda63353a04f59a93c82b64e Mon Sep 17 00:00:00 2001 From: tamirms Date: Wed, 31 Jul 2024 08:50:38 +0100 Subject: [PATCH 04/17] fix duration metrics --- services/horizon/internal/ingest/reap.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/services/horizon/internal/ingest/reap.go b/services/horizon/internal/ingest/reap.go index 7ae19cb200..b4cb241fb1 100644 --- a/services/horizon/internal/ingest/reap.go +++ b/services/horizon/internal/ingest/reap.go @@ -335,11 +335,11 @@ func (r *lookupTableReaper) deleteOrphanedRows(ctx context.Context) error { r.rowsReapedByLookupTable.With(prometheus.Labels{"table": table}). Observe(float64(rowsDeleted)) r.reapDurationByLookupTable.With(prometheus.Labels{"table": table, "type": "query"}). - Observe(float64(queryDuration)) + Observe(float64(queryDuration.Seconds())) r.reapDurationByLookupTable.With(prometheus.Labels{"table": table, "type": "delete"}). - Observe(float64(deleteDuration)) + Observe(float64(deleteDuration.Seconds())) r.reapDurationByLookupTable.With(prometheus.Labels{"table": table, "type": "total"}). - Observe(float64(queryDuration + deleteDuration)) + Observe(float64((queryDuration + deleteDuration).Seconds())) log.WithField("table", table). WithField("offset", offset). @@ -351,9 +351,9 @@ func (r *lookupTableReaper) deleteOrphanedRows(ctx context.Context) error { r.rowsReapedByLookupTable.With(prometheus.Labels{"table": "total"}). Observe(float64(totalDeleted)) r.reapDurationByLookupTable.With(prometheus.Labels{"table": "total", "type": "query"}). - Observe(float64(totalQueryDuration)) + Observe(float64(totalQueryDuration.Seconds())) r.reapDurationByLookupTable.With(prometheus.Labels{"table": "total", "type": "delete"}). - Observe(float64(totalDeleteDuration)) + Observe(float64(totalDeleteDuration.Seconds())) r.reapDurationByLookupTable.With(prometheus.Labels{"table": "total", "type": "total"}). Observe(time.Since(reapStart).Seconds()) return nil From 6922a39ea799c68e440cb46e44491b896b32e980 Mon Sep 17 00:00:00 2001 From: tamirms Date: Wed, 31 Jul 2024 09:28:51 +0100 Subject: [PATCH 05/17] add log --- services/horizon/internal/ingest/reap.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/services/horizon/internal/ingest/reap.go b/services/horizon/internal/ingest/reap.go index b4cb241fb1..b767007a7c 100644 --- a/services/horizon/internal/ingest/reap.go +++ b/services/horizon/internal/ingest/reap.go @@ -345,7 +345,8 @@ func (r *lookupTableReaper) deleteOrphanedRows(ctx context.Context) error { WithField("offset", offset). WithField(table+"rows_deleted", rowsDeleted). WithField("query_duration", queryDuration.Seconds()). - WithField("delete_duration", deleteDuration.Seconds()) + WithField("delete_duration", deleteDuration.Seconds()). + Info("Reaper deleted rows from lookup tables") } r.rowsReapedByLookupTable.With(prometheus.Labels{"table": "total"}). From a169e728106190d3a3c9f34e82179963affbb8c1 Mon Sep 17 00:00:00 2001 From: tamirms Date: Wed, 31 Jul 2024 13:36:39 +0100 Subject: [PATCH 06/17] fix delete query --- services/horizon/internal/db2/history/main.go | 20 ++++++++++++------- .../horizon/internal/db2/history/main_test.go | 5 ++--- services/horizon/internal/ingest/reap.go | 6 +++--- 3 files changed, 18 insertions(+), 13 deletions(-) diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index dcb21fb20d..b45632cce2 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -9,6 +9,7 @@ import ( "database/sql/driver" "encoding/json" "fmt" + "strconv" "strings" "sync" "time" @@ -1120,11 +1121,10 @@ var historyLookupTables = map[string][]tableObjectFieldPair{ // when it reaches the table size so eventually all orphaned rows are // deleted. func (q *Q) deleteLookupTableRows(ctx context.Context, table string, ids []int64) (int64, error) { - deleteQuery, args := constructDeleteLookupTableRowsQuery(table, ids) + deleteQuery := constructDeleteLookupTableRowsQuery(table, ids) result, err := q.ExecRaw( context.WithValue(ctx, &db.QueryTypeContextKey, db.DeleteQueryType), deleteQuery, - args, ) if err != nil { return 0, fmt.Errorf("error running query: %w", err) @@ -1137,7 +1137,7 @@ func (q *Q) deleteLookupTableRows(ctx context.Context, table string, ids []int64 return deletedCount, nil } -func constructDeleteLookupTableRowsQuery(table string, ids []int64) (string, []interface{}) { +func constructDeleteLookupTableRowsQuery(table string, ids []int64) string { var conditions []string for _, referencedTable := range historyLookupTables[table] { conditions = append( @@ -1150,9 +1150,15 @@ func constructDeleteLookupTableRowsQuery(table string, ids []int64) (string, []i ) } - innerQuery, args := sq.Select("id").From(table).Where(map[string]interface{}{ - "id": ids, - }).OrderBy("id asc").Suffix("FOR UPDATE").MustSql() + stringIds := make([]string, len(ids)) + for i, id := range ids { + stringIds[i] = strconv.FormatInt(id, 10) + } + innerQuery := fmt.Sprintf( + "SELECT id FROM %s WHERE id IN (%s) ORDER BY id asc FOR UPDATE", + table, + strings.Join(stringIds, ", "), + ) deleteQuery := fmt.Sprintf( "DELETE FROM %s WHERE id IN ("+ @@ -1160,7 +1166,7 @@ func constructDeleteLookupTableRowsQuery(table string, ids []int64) (string, []i "SELECT e1.id as id FROM ha_batch e1 WHERE ", table, innerQuery, ) + strings.Join(conditions, " AND ") + ")" - return deleteQuery, args + return deleteQuery } func constructFindReapLookupTablesQuery(table string, batchSize int, offset int64) string { diff --git a/services/horizon/internal/db2/history/main_test.go b/services/horizon/internal/db2/history/main_test.go index b15ca25250..3778ca9346 100644 --- a/services/horizon/internal/db2/history/main_test.go +++ b/services/horizon/internal/db2/history/main_test.go @@ -70,19 +70,18 @@ func TestElderLedger(t *testing.T) { } func TestConstructDeleteLookupTableRowsQuery(t *testing.T) { - query, args := constructDeleteLookupTableRowsQuery( + query := constructDeleteLookupTableRowsQuery( "history_accounts", []int64{100, 20, 30}, ) assert.Equal(t, - "DELETE FROM history_accounts WHERE id IN (WITH ha_batch AS (SELECT id FROM history_accounts WHERE id IN (?,?,?) ORDER BY id asc FOR UPDATE) SELECT e1.id as id FROM ha_batch e1 "+ + "DELETE FROM history_accounts WHERE id IN (WITH ha_batch AS (SELECT id FROM history_accounts WHERE id IN (100, 20, 30) ORDER BY id asc FOR UPDATE) SELECT e1.id as id FROM ha_batch e1 "+ "WHERE NOT EXISTS ( SELECT 1 as row FROM history_transaction_participants WHERE history_transaction_participants.history_account_id = id LIMIT 1) "+ "AND NOT EXISTS ( SELECT 1 as row FROM history_effects WHERE history_effects.history_account_id = id LIMIT 1) "+ "AND NOT EXISTS ( SELECT 1 as row FROM history_operation_participants WHERE history_operation_participants.history_account_id = id LIMIT 1) "+ "AND NOT EXISTS ( SELECT 1 as row FROM history_trades WHERE history_trades.base_account_id = id LIMIT 1) "+ "AND NOT EXISTS ( SELECT 1 as row FROM history_trades WHERE history_trades.counter_account_id = id LIMIT 1))", query) - assert.Equal(t, []interface{}([]interface{}{int64(100), int64(20), int64(30)}), args) } func TestConstructReapLookupTablesQuery(t *testing.T) { diff --git a/services/horizon/internal/ingest/reap.go b/services/horizon/internal/ingest/reap.go index b767007a7c..681e312ef5 100644 --- a/services/horizon/internal/ingest/reap.go +++ b/services/horizon/internal/ingest/reap.go @@ -316,7 +316,7 @@ func (r *lookupTableReaper) deleteOrphanedRows(ctx context.Context) error { startTime := time.Now() ids, offset, err := r.historyQ.FindLookupTableRowsToReap(ctx, table, reapLookupTablesBatchSize) if err != nil { - log.WithField("table", table).WithError(err).Warn("Error finding orphaned rows") + r.logger.WithField("table", table).WithError(err).Warn("Error finding orphaned rows") return err } queryDuration := time.Since(startTime) @@ -326,7 +326,7 @@ func (r *lookupTableReaper) deleteOrphanedRows(ctx context.Context) error { var rowsDeleted int64 rowsDeleted, err = r.historyQ.ReapLookupTable(ctx, table, ids, offset) if err != nil { - log.WithField("table", table).WithError(err).Warn("Error deleting orphaned rows") + r.logger.WithField("table", table).WithError(err).Warn("Error deleting orphaned rows") return err } deleteDuration := time.Since(deleteStartTime) @@ -341,7 +341,7 @@ func (r *lookupTableReaper) deleteOrphanedRows(ctx context.Context) error { r.reapDurationByLookupTable.With(prometheus.Labels{"table": table, "type": "total"}). Observe(float64((queryDuration + deleteDuration).Seconds())) - log.WithField("table", table). + r.logger.WithField("table", table). WithField("offset", offset). WithField(table+"rows_deleted", rowsDeleted). WithField("query_duration", queryDuration.Seconds()). From 513ad1b3148ed43ca11a5026774115ebc54a160d Mon Sep 17 00:00:00 2001 From: tamirms Date: Wed, 31 Jul 2024 17:35:50 +0100 Subject: [PATCH 07/17] fix rows_deleted log --- services/horizon/internal/ingest/reap.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/horizon/internal/ingest/reap.go b/services/horizon/internal/ingest/reap.go index 681e312ef5..63b56de993 100644 --- a/services/horizon/internal/ingest/reap.go +++ b/services/horizon/internal/ingest/reap.go @@ -343,7 +343,7 @@ func (r *lookupTableReaper) deleteOrphanedRows(ctx context.Context) error { r.logger.WithField("table", table). WithField("offset", offset). - WithField(table+"rows_deleted", rowsDeleted). + WithField("rows_deleted", rowsDeleted). WithField("query_duration", queryDuration.Seconds()). WithField("delete_duration", deleteDuration.Seconds()). Info("Reaper deleted rows from lookup tables") From 4f36d7011b5e217e9ba32ae67aa9a17a03bbbdc7 Mon Sep 17 00:00:00 2001 From: tamirms Date: Tue, 13 Aug 2024 09:55:39 +0100 Subject: [PATCH 08/17] lock pre-existing rows --- services/horizon/internal/db2/history/account_loader.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/services/horizon/internal/db2/history/account_loader.go b/services/horizon/internal/db2/history/account_loader.go index 9e15920609..33b1e328b6 100644 --- a/services/horizon/internal/db2/history/account_loader.go +++ b/services/horizon/internal/db2/history/account_loader.go @@ -328,8 +328,9 @@ func bulkGet(ctx context.Context, q *Q, table string, fields []columnValues, res pq.Array(field.objects), ) } + lockSuffix := "ORDER BY id ASC FOR KEY SHARE" sql := `SELECT * FROM ` + table + ` WHERE (` + strings.Join(columns, ",") + `) IN - (SELECT ` + strings.Join(unnestPart, ",") + `)` + (SELECT ` + strings.Join(unnestPart, ",") + `) ` + lockSuffix return q.SelectRaw( ctx, From 93be30073d37c03ae5fe1f4638a15397accd6122 Mon Sep 17 00:00:00 2001 From: tamirms Date: Thu, 15 Aug 2024 11:30:56 +0100 Subject: [PATCH 09/17] change order to fix race condition with reaper --- services/horizon/internal/db2/history/account_loader.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/services/horizon/internal/db2/history/account_loader.go b/services/horizon/internal/db2/history/account_loader.go index 33b1e328b6..fad6931d8a 100644 --- a/services/horizon/internal/db2/history/account_loader.go +++ b/services/horizon/internal/db2/history/account_loader.go @@ -134,17 +134,17 @@ func (l *loader[K, T]) Exec(ctx context.Context, session db.SessionInterface) er return l.less(keys[i], keys[j]) }) - if count, err := l.insert(ctx, q, keys); err != nil { + if count, err := l.query(ctx, q, keys); err != nil { return err } else { l.stats.Total += count - l.stats.Inserted += count } - if count, err := l.query(ctx, q, keys); err != nil { + if count, err := l.insert(ctx, q, keys); err != nil { return err } else { l.stats.Total += count + l.stats.Inserted += count } return nil From 27afb89de3beb13c6901cc1da6eac7ea00a3d8fe Mon Sep 17 00:00:00 2001 From: tamirms Date: Tue, 27 Aug 2024 13:14:39 +0100 Subject: [PATCH 10/17] use CTE above delete query --- services/horizon/internal/db2/history/main.go | 11 +- .../horizon/internal/db2/history/main_test.go | 3 +- .../horizon/internal/db2/history/reap_test.go | 287 +++++++++--------- 3 files changed, 157 insertions(+), 144 deletions(-) diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index b45632cce2..ef285379a6 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -1161,11 +1161,12 @@ func constructDeleteLookupTableRowsQuery(table string, ids []int64) string { ) deleteQuery := fmt.Sprintf( - "DELETE FROM %s WHERE id IN ("+ - "WITH ha_batch AS (%s) "+ - "SELECT e1.id as id FROM ha_batch e1 WHERE ", - table, innerQuery, - ) + strings.Join(conditions, " AND ") + ")" + "WITH ha_batch AS (%s) DELETE FROM %s WHERE id IN ("+ + "SELECT e1.id as id FROM ha_batch e1 WHERE %s)", + innerQuery, + table, + strings.Join(conditions, " AND "), + ) return deleteQuery } diff --git a/services/horizon/internal/db2/history/main_test.go b/services/horizon/internal/db2/history/main_test.go index 3778ca9346..e74e02e61d 100644 --- a/services/horizon/internal/db2/history/main_test.go +++ b/services/horizon/internal/db2/history/main_test.go @@ -76,7 +76,8 @@ func TestConstructDeleteLookupTableRowsQuery(t *testing.T) { ) assert.Equal(t, - "DELETE FROM history_accounts WHERE id IN (WITH ha_batch AS (SELECT id FROM history_accounts WHERE id IN (100, 20, 30) ORDER BY id asc FOR UPDATE) SELECT e1.id as id FROM ha_batch e1 "+ + "WITH ha_batch AS (SELECT id FROM history_accounts WHERE id IN (100, 20, 30) ORDER BY id asc FOR UPDATE) "+ + "DELETE FROM history_accounts WHERE id IN (SELECT e1.id as id FROM ha_batch e1 "+ "WHERE NOT EXISTS ( SELECT 1 as row FROM history_transaction_participants WHERE history_transaction_participants.history_account_id = id LIMIT 1) "+ "AND NOT EXISTS ( SELECT 1 as row FROM history_effects WHERE history_effects.history_account_id = id LIMIT 1) "+ "AND NOT EXISTS ( SELECT 1 as row FROM history_operation_participants WHERE history_operation_participants.history_account_id = id LIMIT 1) "+ diff --git a/services/horizon/internal/db2/history/reap_test.go b/services/horizon/internal/db2/history/reap_test.go index d400dd5e38..af20fbc976 100644 --- a/services/horizon/internal/db2/history/reap_test.go +++ b/services/horizon/internal/db2/history/reap_test.go @@ -1,151 +1,162 @@ package history_test import ( + "context" "testing" + "github.com/stretchr/testify/assert" + + "github.com/stellar/go/services/horizon/internal/db2/history" + "github.com/stellar/go/services/horizon/internal/ingest" "github.com/stellar/go/services/horizon/internal/test" ) +type reapResult struct { + Offset int64 + RowsDeleted int64 +} + +func reapLookupTables(t *testing.T, q *history.Q, batchSize int) map[string]reapResult { + results := map[string]reapResult{} + + for _, table := range []string{ + "history_accounts", + "history_assets", + "history_claimable_balances", + "history_liquidity_pools", + } { + ids, offset, err := q.FindLookupTableRowsToReap(context.Background(), table, batchSize) + assert.NoError(t, err) + rowsDeleted, err := q.ReapLookupTable(context.Background(), table, ids, offset) + assert.NoError(t, err) + results[table] = reapResult{ + Offset: offset, + RowsDeleted: rowsDeleted, + } + } + + return results +} + func TestReapLookupTables(t *testing.T) { tt := test.Start(t) defer tt.Finish() tt.Scenario("kahuna") - // - //db := tt.HorizonSession() - //reaper := ingest.NewReaper( - // ingest.ReapConfig{ - // RetentionCount: 1, - // BatchSize: 50, - // }, - // db, - //) - // - //var ( - // prevLedgers, curLedgers int - // prevAccounts, curAccounts int - // prevAssets, curAssets int - // prevClaimableBalances, curClaimableBalances int - // prevLiquidityPools, curLiquidityPools int - //) - // - //err := db.GetRaw(tt.Ctx, &prevLedgers, `SELECT COUNT(*) FROM history_ledgers`) - //tt.Require.NoError(err) - //err = db.GetRaw(tt.Ctx, &prevAccounts, `SELECT COUNT(*) FROM history_accounts`) - //tt.Require.NoError(err) - //err = db.GetRaw(tt.Ctx, &prevAssets, `SELECT COUNT(*) FROM history_assets`) - //tt.Require.NoError(err) - //err = db.GetRaw(tt.Ctx, &prevClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`) - //tt.Require.NoError(err) - //err = db.GetRaw(tt.Ctx, &prevLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`) - //tt.Require.NoError(err) - // - //err = reaper.DeleteUnretainedHistory(tt.Ctx) - //tt.Require.NoError(err) - // - //q := &history.Q{tt.HorizonSession()} - // - //err = q.Begin(tt.Ctx) - //tt.Require.NoError(err) - // - //results, err := q.ReapLookupTables(tt.Ctx, 5) - //tt.Require.NoError(err) - // - //err = q.Commit() - //tt.Require.NoError(err) - // - //err = db.GetRaw(tt.Ctx, &curLedgers, `SELECT COUNT(*) FROM history_ledgers`) - //tt.Require.NoError(err) - //err = db.GetRaw(tt.Ctx, &curAccounts, `SELECT COUNT(*) FROM history_accounts`) - //tt.Require.NoError(err) - //err = db.GetRaw(tt.Ctx, &curAssets, `SELECT COUNT(*) FROM history_assets`) - //tt.Require.NoError(err) - //err = db.GetRaw(tt.Ctx, &curClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`) - //tt.Require.NoError(err) - //err = db.GetRaw(tt.Ctx, &curLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`) - //tt.Require.NoError(err) - // - //tt.Assert.Equal(61, prevLedgers, "prevLedgers") - //tt.Assert.Equal(1, curLedgers, "curLedgers") - // - //tt.Assert.Equal(25, prevAccounts, "prevAccounts") - //tt.Assert.Equal(21, curAccounts, "curAccounts") - //tt.Assert.Equal(int64(4), results["history_accounts"].RowsDeleted, `deletedCount["history_accounts"]`) - // - //tt.Assert.Equal(7, prevAssets, "prevAssets") - //tt.Assert.Equal(2, curAssets, "curAssets") - //tt.Assert.Equal(int64(5), results["history_assets"].RowsDeleted, `deletedCount["history_assets"]`) - // - //tt.Assert.Equal(1, prevClaimableBalances, "prevClaimableBalances") - //tt.Assert.Equal(0, curClaimableBalances, "curClaimableBalances") - //tt.Assert.Equal(int64(1), results["history_claimable_balances"].RowsDeleted, `deletedCount["history_claimable_balances"]`) - // - //tt.Assert.Equal(1, prevLiquidityPools, "prevLiquidityPools") - //tt.Assert.Equal(0, curLiquidityPools, "curLiquidityPools") - //tt.Assert.Equal(int64(1), results["history_liquidity_pools"].RowsDeleted, `deletedCount["history_liquidity_pools"]`) - // - //tt.Assert.Len(results, 4) - //tt.Assert.Equal(int64(6), results["history_accounts"].Offset) - //tt.Assert.Equal(int64(6), results["history_assets"].Offset) - //tt.Assert.Equal(int64(0), results["history_claimable_balances"].Offset) - //tt.Assert.Equal(int64(0), results["history_liquidity_pools"].Offset) - // - //err = q.Begin(tt.Ctx) - //tt.Require.NoError(err) - // - //results, err = q.ReapLookupTables(tt.Ctx, 5) - //tt.Require.NoError(err) - // - //err = q.Commit() - //tt.Require.NoError(err) - // - //err = db.GetRaw(tt.Ctx, &curAccounts, `SELECT COUNT(*) FROM history_accounts`) - //tt.Require.NoError(err) - //err = db.GetRaw(tt.Ctx, &curAssets, `SELECT COUNT(*) FROM history_assets`) - //tt.Require.NoError(err) - // - //tt.Assert.Equal(16, curAccounts, "curAccounts") - //tt.Assert.Equal(int64(5), results["history_accounts"].RowsDeleted, `deletedCount["history_accounts"]`) - // - //tt.Assert.Equal(0, curAssets, "curAssets") - //tt.Assert.Equal(int64(2), results["history_assets"].RowsDeleted, `deletedCount["history_assets"]`) - // - //tt.Assert.Equal(int64(0), results["history_claimable_balances"].RowsDeleted, `deletedCount["history_claimable_balances"]`) - // - //tt.Assert.Equal(int64(0), results["history_liquidity_pools"].RowsDeleted, `deletedCount["history_liquidity_pools"]`) - // - //tt.Assert.Len(results, 4) - //tt.Assert.Equal(int64(11), results["history_accounts"].Offset) - //tt.Assert.Equal(int64(0), results["history_assets"].Offset) - //tt.Assert.Equal(int64(0), results["history_claimable_balances"].Offset) - //tt.Assert.Equal(int64(0), results["history_liquidity_pools"].Offset) - // - //err = q.Begin(tt.Ctx) - //tt.Require.NoError(err) - // - //results, err = q.ReapLookupTables(tt.Ctx, 1000) - //tt.Require.NoError(err) - // - //err = q.Commit() - //tt.Require.NoError(err) - // - //err = db.GetRaw(tt.Ctx, &curAccounts, `SELECT COUNT(*) FROM history_accounts`) - //tt.Require.NoError(err) - //err = db.GetRaw(tt.Ctx, &curAssets, `SELECT COUNT(*) FROM history_assets`) - //tt.Require.NoError(err) - // - //tt.Assert.Equal(1, curAccounts, "curAccounts") - //tt.Assert.Equal(int64(15), results["history_accounts"].RowsDeleted, `deletedCount["history_accounts"]`) - // - //tt.Assert.Equal(0, curAssets, "curAssets") - //tt.Assert.Equal(int64(0), results["history_assets"].RowsDeleted, `deletedCount["history_assets"]`) - // - //tt.Assert.Equal(int64(0), results["history_claimable_balances"].RowsDeleted, `deletedCount["history_claimable_balances"]`) - // - //tt.Assert.Equal(int64(0), results["history_liquidity_pools"].RowsDeleted, `deletedCount["history_liquidity_pools"]`) - // - //tt.Assert.Len(results, 4) - //tt.Assert.Equal(int64(0), results["history_accounts"].Offset) - //tt.Assert.Equal(int64(0), results["history_assets"].Offset) - //tt.Assert.Equal(int64(0), results["history_claimable_balances"].Offset) - //tt.Assert.Equal(int64(0), results["history_liquidity_pools"].Offset) + + db := tt.HorizonSession() + reaper := ingest.NewReaper( + ingest.ReapConfig{ + RetentionCount: 1, + BatchSize: 50, + }, + db, + ) + + var ( + prevLedgers, curLedgers int + prevAccounts, curAccounts int + prevAssets, curAssets int + prevClaimableBalances, curClaimableBalances int + prevLiquidityPools, curLiquidityPools int + ) + + err := db.GetRaw(tt.Ctx, &prevLedgers, `SELECT COUNT(*) FROM history_ledgers`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &prevAccounts, `SELECT COUNT(*) FROM history_accounts`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &prevAssets, `SELECT COUNT(*) FROM history_assets`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &prevClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &prevLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`) + tt.Require.NoError(err) + + err = reaper.DeleteUnretainedHistory(tt.Ctx) + tt.Require.NoError(err) + + q := &history.Q{tt.HorizonSession()} + + results := reapLookupTables(t, q, 5) + + err = db.GetRaw(tt.Ctx, &curLedgers, `SELECT COUNT(*) FROM history_ledgers`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &curAccounts, `SELECT COUNT(*) FROM history_accounts`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &curAssets, `SELECT COUNT(*) FROM history_assets`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &curClaimableBalances, `SELECT COUNT(*) FROM history_claimable_balances`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &curLiquidityPools, `SELECT COUNT(*) FROM history_liquidity_pools`) + tt.Require.NoError(err) + + tt.Assert.Equal(61, prevLedgers, "prevLedgers") + tt.Assert.Equal(1, curLedgers, "curLedgers") + + tt.Assert.Equal(25, prevAccounts, "prevAccounts") + tt.Assert.Equal(21, curAccounts, "curAccounts") + tt.Assert.Equal(int64(4), results["history_accounts"].RowsDeleted, `deletedCount["history_accounts"]`) + + tt.Assert.Equal(7, prevAssets, "prevAssets") + tt.Assert.Equal(2, curAssets, "curAssets") + tt.Assert.Equal(int64(5), results["history_assets"].RowsDeleted, `deletedCount["history_assets"]`) + + tt.Assert.Equal(1, prevClaimableBalances, "prevClaimableBalances") + tt.Assert.Equal(0, curClaimableBalances, "curClaimableBalances") + tt.Assert.Equal(int64(1), results["history_claimable_balances"].RowsDeleted, `deletedCount["history_claimable_balances"]`) + + tt.Assert.Equal(1, prevLiquidityPools, "prevLiquidityPools") + tt.Assert.Equal(0, curLiquidityPools, "curLiquidityPools") + tt.Assert.Equal(int64(1), results["history_liquidity_pools"].RowsDeleted, `deletedCount["history_liquidity_pools"]`) + + tt.Assert.Len(results, 4) + tt.Assert.Equal(int64(6), results["history_accounts"].Offset) + tt.Assert.Equal(int64(6), results["history_assets"].Offset) + tt.Assert.Equal(int64(0), results["history_claimable_balances"].Offset) + tt.Assert.Equal(int64(0), results["history_liquidity_pools"].Offset) + + results = reapLookupTables(t, q, 5) + + err = db.GetRaw(tt.Ctx, &curAccounts, `SELECT COUNT(*) FROM history_accounts`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &curAssets, `SELECT COUNT(*) FROM history_assets`) + tt.Require.NoError(err) + + tt.Assert.Equal(16, curAccounts, "curAccounts") + tt.Assert.Equal(int64(5), results["history_accounts"].RowsDeleted, `deletedCount["history_accounts"]`) + + tt.Assert.Equal(0, curAssets, "curAssets") + tt.Assert.Equal(int64(2), results["history_assets"].RowsDeleted, `deletedCount["history_assets"]`) + + tt.Assert.Equal(int64(0), results["history_claimable_balances"].RowsDeleted, `deletedCount["history_claimable_balances"]`) + + tt.Assert.Equal(int64(0), results["history_liquidity_pools"].RowsDeleted, `deletedCount["history_liquidity_pools"]`) + + tt.Assert.Len(results, 4) + tt.Assert.Equal(int64(11), results["history_accounts"].Offset) + tt.Assert.Equal(int64(0), results["history_assets"].Offset) + tt.Assert.Equal(int64(0), results["history_claimable_balances"].Offset) + tt.Assert.Equal(int64(0), results["history_liquidity_pools"].Offset) + + results = reapLookupTables(t, q, 1000) + + err = db.GetRaw(tt.Ctx, &curAccounts, `SELECT COUNT(*) FROM history_accounts`) + tt.Require.NoError(err) + err = db.GetRaw(tt.Ctx, &curAssets, `SELECT COUNT(*) FROM history_assets`) + tt.Require.NoError(err) + + tt.Assert.Equal(1, curAccounts, "curAccounts") + tt.Assert.Equal(int64(15), results["history_accounts"].RowsDeleted, `deletedCount["history_accounts"]`) + + tt.Assert.Equal(0, curAssets, "curAssets") + tt.Assert.Equal(int64(0), results["history_assets"].RowsDeleted, `deletedCount["history_assets"]`) + + tt.Assert.Equal(int64(0), results["history_claimable_balances"].RowsDeleted, `deletedCount["history_claimable_balances"]`) + + tt.Assert.Equal(int64(0), results["history_liquidity_pools"].RowsDeleted, `deletedCount["history_liquidity_pools"]`) + + tt.Assert.Len(results, 4) + tt.Assert.Equal(int64(0), results["history_accounts"].Offset) + tt.Assert.Equal(int64(0), results["history_assets"].Offset) + tt.Assert.Equal(int64(0), results["history_claimable_balances"].Offset) + tt.Assert.Equal(int64(0), results["history_liquidity_pools"].Offset) } From 2c17f4fed0c7f98c031a690cc11422b579b907cf Mon Sep 17 00:00:00 2001 From: tamirms Date: Tue, 27 Aug 2024 15:17:12 +0100 Subject: [PATCH 11/17] add concurrency modes --- .../internal/db2/history/account_loader.go | 81 +++++++++++++------ .../db2/history/account_loader_test.go | 12 ++- .../internal/db2/history/asset_loader.go | 5 +- .../internal/db2/history/asset_loader_test.go | 11 ++- .../db2/history/claimable_balance_loader.go | 5 +- .../history/claimable_balance_loader_test.go | 11 ++- .../effect_batch_insert_builder_test.go | 2 +- .../internal/db2/history/effect_test.go | 6 +- .../internal/db2/history/fee_bump_scenario.go | 2 +- .../db2/history/liquidity_pool_loader.go | 7 +- .../db2/history/liquidity_pool_loader_test.go | 11 ++- ...n_participant_batch_insert_builder_test.go | 2 +- .../internal/db2/history/operation_test.go | 2 +- .../internal/db2/history/participants_test.go | 2 +- .../internal/db2/history/transaction_test.go | 8 +- .../internal/ingest/processor_runner.go | 18 ++--- .../internal/ingest/processor_runner_test.go | 2 +- ...ble_balances_transaction_processor_test.go | 2 +- .../processors/effects_processor_test.go | 12 +-- ...uidity_pools_transaction_processor_test.go | 2 +- .../processors/participants_processor_test.go | 2 +- 21 files changed, 133 insertions(+), 72 deletions(-) diff --git a/services/horizon/internal/db2/history/account_loader.go b/services/horizon/internal/db2/history/account_loader.go index fad6931d8a..e402129a44 100644 --- a/services/horizon/internal/db2/history/account_loader.go +++ b/services/horizon/internal/db2/history/account_loader.go @@ -17,6 +17,14 @@ import ( var errSealed = errors.New("cannot register more entries to Loader after calling Exec()") +type ConcurrencyMode int + +const ( + _ ConcurrencyMode = iota + ConcurrentInserts + ConcurrentDeletes +) + // LoaderStats describes the result of executing a history lookup id Loader type LoaderStats struct { // Total is the number of elements registered to the Loader @@ -38,7 +46,7 @@ type FutureAccountID = future[string, Account] type AccountLoader = loader[string, Account] // NewAccountLoader will construct a new AccountLoader instance. -func NewAccountLoader() *AccountLoader { +func NewAccountLoader(concurrencyMode ConcurrencyMode) *AccountLoader { return &AccountLoader{ sealed: false, set: set.Set[string]{}, @@ -58,20 +66,22 @@ func NewAccountLoader() *AccountLoader { mappingFromRow: func(account Account) (string, int64) { return account.Address, account.ID }, - less: cmp.Less[string], + less: cmp.Less[string], + concurrencyMode: concurrencyMode, } } type loader[K comparable, T any] struct { - sealed bool - set set.Set[K] - ids map[K]int64 - stats LoaderStats - name string - table string - columnsForKeys func([]K) []columnValues - mappingFromRow func(T) (K, int64) - less func(K, K) bool + sealed bool + set set.Set[K] + ids map[K]int64 + stats LoaderStats + name string + table string + columnsForKeys func([]K) []columnValues + mappingFromRow func(T) (K, int64) + less func(K, K) bool + concurrencyMode ConcurrencyMode } type future[K comparable, T any] struct { @@ -134,17 +144,34 @@ func (l *loader[K, T]) Exec(ctx context.Context, session db.SessionInterface) er return l.less(keys[i], keys[j]) }) - if count, err := l.query(ctx, q, keys); err != nil { - return err - } else { - l.stats.Total += count - } + if l.concurrencyMode == ConcurrentInserts { + if count, err := l.insert(ctx, q, keys); err != nil { + return err + } else { + l.stats.Total += count + l.stats.Inserted += count + } - if count, err := l.insert(ctx, q, keys); err != nil { - return err + if count, err := l.query(ctx, q, keys, false); err != nil { + return err + } else { + l.stats.Total += count + } + } else if l.concurrencyMode == ConcurrentDeletes { + if count, err := l.query(ctx, q, keys, true); err != nil { + return err + } else { + l.stats.Total += count + } + + if count, err := l.insert(ctx, q, keys); err != nil { + return err + } else { + l.stats.Total += count + l.stats.Inserted += count + } } else { - l.stats.Total += count - l.stats.Inserted += count + return fmt.Errorf("concurrency mode %v is invalid", l.concurrencyMode) } return nil @@ -204,11 +231,15 @@ func (l *loader[K, T]) insert(ctx context.Context, q *Q, keys []K) (int, error) return len(rows), nil } -func (l *loader[K, T]) query(ctx context.Context, q *Q, keys []K) (int, error) { +func (l *loader[K, T]) query(ctx context.Context, q *Q, keys []K, lockRows bool) (int, error) { keys = l.filter(keys) if len(keys) == 0 { return 0, nil } + var suffix string + if lockRows { + suffix = "ORDER BY id ASC FOR KEY SHARE" + } var rows []T err := bulkGet( @@ -217,6 +248,7 @@ func (l *loader[K, T]) query(ctx context.Context, q *Q, keys []K) (int, error) { l.table, l.columnsForKeys(keys), &rows, + suffix, ) if err != nil { return 0, err @@ -293,7 +325,7 @@ func bulkInsert(ctx context.Context, q *Q, table string, fields []columnValues, ) } -func bulkGet(ctx context.Context, q *Q, table string, fields []columnValues, response interface{}) error { +func bulkGet(ctx context.Context, q *Q, table string, fields []columnValues, response interface{}, suffix string) error { unnestPart := make([]string, 0, len(fields)) columns := make([]string, 0, len(fields)) pqArrays := make([]interface{}, 0, len(fields)) @@ -328,9 +360,8 @@ func bulkGet(ctx context.Context, q *Q, table string, fields []columnValues, res pq.Array(field.objects), ) } - lockSuffix := "ORDER BY id ASC FOR KEY SHARE" sql := `SELECT * FROM ` + table + ` WHERE (` + strings.Join(columns, ",") + `) IN - (SELECT ` + strings.Join(unnestPart, ",") + `) ` + lockSuffix + (SELECT ` + strings.Join(unnestPart, ",") + `) ` + suffix return q.SelectRaw( ctx, @@ -348,7 +379,7 @@ type AccountLoaderStub struct { // NewAccountLoaderStub returns a new AccountLoaderStub instance func NewAccountLoaderStub() AccountLoaderStub { - return AccountLoaderStub{Loader: NewAccountLoader()} + return AccountLoaderStub{Loader: NewAccountLoader(ConcurrentInserts)} } // Insert updates the wrapped AccountLoader so that the given account diff --git a/services/horizon/internal/db2/history/account_loader_test.go b/services/horizon/internal/db2/history/account_loader_test.go index 9a9fb30445..83b172b40b 100644 --- a/services/horizon/internal/db2/history/account_loader_test.go +++ b/services/horizon/internal/db2/history/account_loader_test.go @@ -8,6 +8,7 @@ import ( "github.com/stellar/go/keypair" "github.com/stellar/go/services/horizon/internal/test" + "github.com/stellar/go/support/db" ) func TestAccountLoader(t *testing.T) { @@ -16,12 +17,18 @@ func TestAccountLoader(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) session := tt.HorizonSession() + testAccountLoader(t, session, ConcurrentInserts) + test.ResetHorizonDB(t, tt.HorizonDB) + testAccountLoader(t, session, ConcurrentDeletes) +} + +func testAccountLoader(t *testing.T, session *db.Session, mode ConcurrencyMode) { var addresses []string for i := 0; i < 100; i++ { addresses = append(addresses, keypair.MustRandom().Address()) } - loader := NewAccountLoader() + loader := NewAccountLoader(mode) for _, address := range addresses { future := loader.GetFuture(address) _, err := future.Value() @@ -58,7 +65,7 @@ func TestAccountLoader(t *testing.T) { // check that Loader works when all the previous values are already // present in the db and also add 10 more rows to insert - loader = NewAccountLoader() + loader = NewAccountLoader(mode) for i := 0; i < 10; i++ { addresses = append(addresses, keypair.MustRandom().Address()) } @@ -85,5 +92,4 @@ func TestAccountLoader(t *testing.T) { assert.Equal(t, account.ID, internalId) assert.Equal(t, account.Address, address) } - } diff --git a/services/horizon/internal/db2/history/asset_loader.go b/services/horizon/internal/db2/history/asset_loader.go index cdd2a0d714..33c5c333dd 100644 --- a/services/horizon/internal/db2/history/asset_loader.go +++ b/services/horizon/internal/db2/history/asset_loader.go @@ -42,7 +42,7 @@ type FutureAssetID = future[AssetKey, Asset] type AssetLoader = loader[AssetKey, Asset] // NewAssetLoader will construct a new AssetLoader instance. -func NewAssetLoader() *AssetLoader { +func NewAssetLoader(concurrencyMode ConcurrencyMode) *AssetLoader { return &AssetLoader{ sealed: false, set: set.Set[AssetKey]{}, @@ -88,6 +88,7 @@ func NewAssetLoader() *AssetLoader { less: func(a AssetKey, b AssetKey) bool { return a.String() < b.String() }, + concurrencyMode: concurrencyMode, } } @@ -99,7 +100,7 @@ type AssetLoaderStub struct { // NewAssetLoaderStub returns a new AssetLoaderStub instance func NewAssetLoaderStub() AssetLoaderStub { - return AssetLoaderStub{Loader: NewAssetLoader()} + return AssetLoaderStub{Loader: NewAssetLoader(ConcurrentInserts)} } // Insert updates the wrapped AssetLoaderStub so that the given asset diff --git a/services/horizon/internal/db2/history/asset_loader_test.go b/services/horizon/internal/db2/history/asset_loader_test.go index ca65cebb7e..e7a0495cad 100644 --- a/services/horizon/internal/db2/history/asset_loader_test.go +++ b/services/horizon/internal/db2/history/asset_loader_test.go @@ -9,6 +9,7 @@ import ( "github.com/stellar/go/keypair" "github.com/stellar/go/services/horizon/internal/test" + "github.com/stellar/go/support/db" "github.com/stellar/go/xdr" ) @@ -40,6 +41,12 @@ func TestAssetLoader(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) session := tt.HorizonSession() + testAssetLoader(t, session, ConcurrentInserts) + test.ResetHorizonDB(t, tt.HorizonDB) + testAssetLoader(t, session, ConcurrentDeletes) +} + +func testAssetLoader(t *testing.T, session *db.Session, mode ConcurrencyMode) { var keys []AssetKey for i := 0; i < 100; i++ { var key AssetKey @@ -66,7 +73,7 @@ func TestAssetLoader(t *testing.T) { keys = append(keys, key) } - loader := NewAssetLoader() + loader := NewAssetLoader(mode) for _, key := range keys { future := loader.GetFuture(key) _, err := future.Value() @@ -109,7 +116,7 @@ func TestAssetLoader(t *testing.T) { // check that Loader works when all the previous values are already // present in the db and also add 10 more rows to insert - loader = NewAssetLoader() + loader = NewAssetLoader(mode) for i := 0; i < 10; i++ { var key AssetKey if i%2 == 0 { diff --git a/services/horizon/internal/db2/history/claimable_balance_loader.go b/services/horizon/internal/db2/history/claimable_balance_loader.go index f775ea4b24..9107d4fb9f 100644 --- a/services/horizon/internal/db2/history/claimable_balance_loader.go +++ b/services/horizon/internal/db2/history/claimable_balance_loader.go @@ -19,7 +19,7 @@ type FutureClaimableBalanceID = future[string, HistoryClaimableBalance] type ClaimableBalanceLoader = loader[string, HistoryClaimableBalance] // NewClaimableBalanceLoader will construct a new ClaimableBalanceLoader instance. -func NewClaimableBalanceLoader() *ClaimableBalanceLoader { +func NewClaimableBalanceLoader(concurrencyMode ConcurrencyMode) *ClaimableBalanceLoader { return &ClaimableBalanceLoader{ sealed: false, set: set.Set[string]{}, @@ -39,6 +39,7 @@ func NewClaimableBalanceLoader() *ClaimableBalanceLoader { mappingFromRow: func(row HistoryClaimableBalance) (string, int64) { return row.BalanceID, row.InternalID }, - less: cmp.Less[string], + less: cmp.Less[string], + concurrencyMode: concurrencyMode, } } diff --git a/services/horizon/internal/db2/history/claimable_balance_loader_test.go b/services/horizon/internal/db2/history/claimable_balance_loader_test.go index f5759015c7..490d5a0f70 100644 --- a/services/horizon/internal/db2/history/claimable_balance_loader_test.go +++ b/services/horizon/internal/db2/history/claimable_balance_loader_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stellar/go/services/horizon/internal/test" + "github.com/stellar/go/support/db" "github.com/stellar/go/xdr" ) @@ -17,6 +18,12 @@ func TestClaimableBalanceLoader(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) session := tt.HorizonSession() + testCBLoader(t, tt, session, ConcurrentInserts) + test.ResetHorizonDB(t, tt.HorizonDB) + testCBLoader(t, tt, session, ConcurrentDeletes) +} + +func testCBLoader(t *testing.T, tt *test.T, session *db.Session, mode ConcurrencyMode) { var ids []string for i := 0; i < 100; i++ { balanceID := xdr.ClaimableBalanceId{ @@ -28,7 +35,7 @@ func TestClaimableBalanceLoader(t *testing.T) { ids = append(ids, id) } - loader := NewClaimableBalanceLoader() + loader := NewClaimableBalanceLoader(mode) var futures []FutureClaimableBalanceID for _, id := range ids { future := loader.GetFuture(id) @@ -70,7 +77,7 @@ func TestClaimableBalanceLoader(t *testing.T) { // check that Loader works when all the previous values are already // present in the db and also add 10 more rows to insert - loader = NewClaimableBalanceLoader() + loader = NewClaimableBalanceLoader(mode) for i := 100; i < 110; i++ { balanceID := xdr.ClaimableBalanceId{ Type: xdr.ClaimableBalanceIdTypeClaimableBalanceIdTypeV0, diff --git a/services/horizon/internal/db2/history/effect_batch_insert_builder_test.go b/services/horizon/internal/db2/history/effect_batch_insert_builder_test.go index e1ac998953..e917983b8f 100644 --- a/services/horizon/internal/db2/history/effect_batch_insert_builder_test.go +++ b/services/horizon/internal/db2/history/effect_batch_insert_builder_test.go @@ -20,7 +20,7 @@ func TestAddEffect(t *testing.T) { address := "GAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSTVY" muxedAddres := "MAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSAAAAAAAAAAE2LP26" - accountLoader := NewAccountLoader() + accountLoader := NewAccountLoader(ConcurrentInserts) builder := q.NewEffectBatchInsertBuilder() sequence := int32(56) diff --git a/services/horizon/internal/db2/history/effect_test.go b/services/horizon/internal/db2/history/effect_test.go index 19af0ceff8..ba59cf3fd4 100644 --- a/services/horizon/internal/db2/history/effect_test.go +++ b/services/horizon/internal/db2/history/effect_test.go @@ -23,7 +23,7 @@ func TestEffectsForLiquidityPool(t *testing.T) { // Insert Effect address := "GAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSTVY" muxedAddres := "MAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSAAAAAAAAAAE2LP26" - accountLoader := NewAccountLoader() + accountLoader := NewAccountLoader(ConcurrentInserts) builder := q.NewEffectBatchInsertBuilder() sequence := int32(56) @@ -47,7 +47,7 @@ func TestEffectsForLiquidityPool(t *testing.T) { // Insert Liquidity Pool history liquidityPoolID := "abcde" - lpLoader := NewLiquidityPoolLoader() + lpLoader := NewLiquidityPoolLoader(ConcurrentInserts) operationBuilder := q.NewOperationLiquidityPoolBatchInsertBuilder() tt.Assert.NoError(operationBuilder.Add(opID, lpLoader.GetFuture(liquidityPoolID))) @@ -78,7 +78,7 @@ func TestEffectsForTrustlinesSponsorshipEmptyAssetType(t *testing.T) { address := "GAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSTVY" muxedAddres := "MAQAA5L65LSYH7CQ3VTJ7F3HHLGCL3DSLAR2Y47263D56MNNGHSQSAAAAAAAAAAE2LP26" - accountLoader := NewAccountLoader() + accountLoader := NewAccountLoader(ConcurrentInserts) builder := q.NewEffectBatchInsertBuilder() sequence := int32(56) diff --git a/services/horizon/internal/db2/history/fee_bump_scenario.go b/services/horizon/internal/db2/history/fee_bump_scenario.go index da6563c732..e161d686d1 100644 --- a/services/horizon/internal/db2/history/fee_bump_scenario.go +++ b/services/horizon/internal/db2/history/fee_bump_scenario.go @@ -288,7 +288,7 @@ func FeeBumpScenario(tt *test.T, q *Q, successful bool) FeeBumpFixture { details, err = json.Marshal(map[string]interface{}{"new_seq": 98}) tt.Assert.NoError(err) - accountLoader := NewAccountLoader() + accountLoader := NewAccountLoader(ConcurrentInserts) err = effectBuilder.Add( accountLoader.GetFuture(account.Address()), diff --git a/services/horizon/internal/db2/history/liquidity_pool_loader.go b/services/horizon/internal/db2/history/liquidity_pool_loader.go index a03caaa988..5da2a7b6fd 100644 --- a/services/horizon/internal/db2/history/liquidity_pool_loader.go +++ b/services/horizon/internal/db2/history/liquidity_pool_loader.go @@ -19,7 +19,7 @@ type FutureLiquidityPoolID = future[string, HistoryLiquidityPool] type LiquidityPoolLoader = loader[string, HistoryLiquidityPool] // NewLiquidityPoolLoader will construct a new LiquidityPoolLoader instance. -func NewLiquidityPoolLoader() *LiquidityPoolLoader { +func NewLiquidityPoolLoader(concurrencyMode ConcurrencyMode) *LiquidityPoolLoader { return &LiquidityPoolLoader{ sealed: false, set: set.Set[string]{}, @@ -39,7 +39,8 @@ func NewLiquidityPoolLoader() *LiquidityPoolLoader { mappingFromRow: func(row HistoryLiquidityPool) (string, int64) { return row.PoolID, row.InternalID }, - less: cmp.Less[string], + less: cmp.Less[string], + concurrencyMode: concurrencyMode, } } @@ -51,7 +52,7 @@ type LiquidityPoolLoaderStub struct { // NewLiquidityPoolLoaderStub returns a new LiquidityPoolLoader instance func NewLiquidityPoolLoaderStub() LiquidityPoolLoaderStub { - return LiquidityPoolLoaderStub{Loader: NewLiquidityPoolLoader()} + return LiquidityPoolLoaderStub{Loader: NewLiquidityPoolLoader(ConcurrentInserts)} } // Insert updates the wrapped LiquidityPoolLoader so that the given liquidity pool diff --git a/services/horizon/internal/db2/history/liquidity_pool_loader_test.go b/services/horizon/internal/db2/history/liquidity_pool_loader_test.go index aec2fcd886..c7a1282760 100644 --- a/services/horizon/internal/db2/history/liquidity_pool_loader_test.go +++ b/services/horizon/internal/db2/history/liquidity_pool_loader_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stellar/go/services/horizon/internal/test" + "github.com/stellar/go/support/db" "github.com/stellar/go/xdr" ) @@ -16,6 +17,12 @@ func TestLiquidityPoolLoader(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) session := tt.HorizonSession() + testLPLoader(t, tt, session, ConcurrentInserts) + test.ResetHorizonDB(t, tt.HorizonDB) + testLPLoader(t, tt, session, ConcurrentDeletes) +} + +func testLPLoader(t *testing.T, tt *test.T, session *db.Session, mode ConcurrencyMode) { var ids []string for i := 0; i < 100; i++ { poolID := xdr.PoolId{byte(i)} @@ -24,7 +31,7 @@ func TestLiquidityPoolLoader(t *testing.T) { ids = append(ids, id) } - loader := NewLiquidityPoolLoader() + loader := NewLiquidityPoolLoader(mode) for _, id := range ids { future := loader.GetFuture(id) _, err := future.Value() @@ -62,7 +69,7 @@ func TestLiquidityPoolLoader(t *testing.T) { // check that Loader works when all the previous values are already // present in the db and also add 10 more rows to insert - loader = NewLiquidityPoolLoader() + loader = NewLiquidityPoolLoader(mode) for i := 100; i < 110; i++ { poolID := xdr.PoolId{byte(i)} var id string diff --git a/services/horizon/internal/db2/history/operation_participant_batch_insert_builder_test.go b/services/horizon/internal/db2/history/operation_participant_batch_insert_builder_test.go index 7e823064f2..eb30fe6659 100644 --- a/services/horizon/internal/db2/history/operation_participant_batch_insert_builder_test.go +++ b/services/horizon/internal/db2/history/operation_participant_batch_insert_builder_test.go @@ -15,7 +15,7 @@ func TestAddOperationParticipants(t *testing.T) { test.ResetHorizonDB(t, tt.HorizonDB) q := &Q{tt.HorizonSession()} - accountLoader := NewAccountLoader() + accountLoader := NewAccountLoader(ConcurrentInserts) address := keypair.MustRandom().Address() tt.Assert.NoError(q.Begin(tt.Ctx)) builder := q.NewOperationParticipantBatchInsertBuilder() diff --git a/services/horizon/internal/db2/history/operation_test.go b/services/horizon/internal/db2/history/operation_test.go index 1d20a9cb10..8899bfcdf6 100644 --- a/services/horizon/internal/db2/history/operation_test.go +++ b/services/horizon/internal/db2/history/operation_test.go @@ -125,7 +125,7 @@ func TestOperationByLiquidityPool(t *testing.T) { // Insert Liquidity Pool history liquidityPoolID := "a2f38836a839de008cf1d782c81f45e1253cc5d3dad9110b872965484fec0a49" - lpLoader := NewLiquidityPoolLoader() + lpLoader := NewLiquidityPoolLoader(ConcurrentInserts) lpOperationBuilder := q.NewOperationLiquidityPoolBatchInsertBuilder() tt.Assert.NoError(lpOperationBuilder.Add(opID1, lpLoader.GetFuture(liquidityPoolID))) diff --git a/services/horizon/internal/db2/history/participants_test.go b/services/horizon/internal/db2/history/participants_test.go index 37f7654abb..15d09da0ac 100644 --- a/services/horizon/internal/db2/history/participants_test.go +++ b/services/horizon/internal/db2/history/participants_test.go @@ -35,7 +35,7 @@ func TestTransactionParticipantsBatch(t *testing.T) { q := &Q{tt.HorizonSession()} batch := q.NewTransactionParticipantsBatchInsertBuilder() - accountLoader := NewAccountLoader() + accountLoader := NewAccountLoader(ConcurrentInserts) transactionID := int64(1) otherTransactionID := int64(2) diff --git a/services/horizon/internal/db2/history/transaction_test.go b/services/horizon/internal/db2/history/transaction_test.go index 65c6734644..bd8cb1673c 100644 --- a/services/horizon/internal/db2/history/transaction_test.go +++ b/services/horizon/internal/db2/history/transaction_test.go @@ -79,7 +79,7 @@ func TestTransactionByLiquidityPool(t *testing.T) { // Insert Liquidity Pool history liquidityPoolID := "a2f38836a839de008cf1d782c81f45e1253cc5d3dad9110b872965484fec0a49" - lpLoader := NewLiquidityPoolLoader() + lpLoader := NewLiquidityPoolLoader(ConcurrentInserts) lpTransactionBuilder := q.NewTransactionLiquidityPoolBatchInsertBuilder() tt.Assert.NoError(lpTransactionBuilder.Add(txID, lpLoader.GetFuture(liquidityPoolID))) tt.Assert.NoError(lpLoader.Exec(tt.Ctx, q)) @@ -940,15 +940,15 @@ func TestTransactionQueryBuilder(t *testing.T) { tt.Assert.NoError(q.Begin(tt.Ctx)) address := "GBXGQJWVLWOYHFLVTKWV5FGHA3LNYY2JQKM7OAJAUEQFU6LPCSEFVXON" - accountLoader := NewAccountLoader() + accountLoader := NewAccountLoader(ConcurrentInserts) accountLoader.GetFuture(address) cbID := "00000000178826fbfe339e1f5c53417c6fedfe2c05e8bec14303143ec46b38981b09c3f9" - cbLoader := NewClaimableBalanceLoader() + cbLoader := NewClaimableBalanceLoader(ConcurrentInserts) cbLoader.GetFuture(cbID) lpID := "0000a8198b5e25994c1ca5b0556faeb27325ac746296944144e0a7406d501e8a" - lpLoader := NewLiquidityPoolLoader() + lpLoader := NewLiquidityPoolLoader(ConcurrentInserts) lpLoader.GetFuture(lpID) tt.Assert.NoError(accountLoader.Exec(tt.Ctx, q)) diff --git a/services/horizon/internal/ingest/processor_runner.go b/services/horizon/internal/ingest/processor_runner.go index e6f0e0cf74..75b8645953 100644 --- a/services/horizon/internal/ingest/processor_runner.go +++ b/services/horizon/internal/ingest/processor_runner.go @@ -133,11 +133,11 @@ func buildChangeProcessor( }) } -func (s *ProcessorRunner) buildTransactionProcessor(ledgersProcessor *processors.LedgersProcessor) (groupLoaders, *groupTransactionProcessors) { - accountLoader := history.NewAccountLoader() - assetLoader := history.NewAssetLoader() - lpLoader := history.NewLiquidityPoolLoader() - cbLoader := history.NewClaimableBalanceLoader() +func (s *ProcessorRunner) buildTransactionProcessor(ledgersProcessor *processors.LedgersProcessor, concurrencyMode history.ConcurrencyMode) (groupLoaders, *groupTransactionProcessors) { + accountLoader := history.NewAccountLoader(concurrencyMode) + assetLoader := history.NewAssetLoader(concurrencyMode) + lpLoader := history.NewLiquidityPoolLoader(concurrencyMode) + cbLoader := history.NewClaimableBalanceLoader(concurrencyMode) loaders := newGroupLoaders([]horizonLazyLoader{accountLoader, assetLoader, lpLoader, cbLoader}) statsLedgerTransactionProcessor := processors.NewStatsLedgerTransactionProcessor() @@ -366,7 +366,7 @@ func (s *ProcessorRunner) streamLedger(ledger xdr.LedgerCloseMeta, return nil } -func (s *ProcessorRunner) runTransactionProcessorsOnLedger(registry nameRegistry, ledger xdr.LedgerCloseMeta) ( +func (s *ProcessorRunner) runTransactionProcessorsOnLedger(registry nameRegistry, ledger xdr.LedgerCloseMeta, concurrencyMode history.ConcurrencyMode) ( transactionStats processors.StatsLedgerTransactionProcessorResults, transactionDurations runDurations, tradeStats processors.TradeStats, @@ -381,7 +381,7 @@ func (s *ProcessorRunner) runTransactionProcessorsOnLedger(registry nameRegistry groupTransactionFilterers := s.buildTransactionFilterer() // when in online mode, the submission result processor must always run (regardless of whether filter rules exist or not) groupFilteredOutProcessors := s.buildFilteredOutProcessor() - loaders, groupTransactionProcessors := s.buildTransactionProcessor(ledgersProcessor) + loaders, groupTransactionProcessors := s.buildTransactionProcessor(ledgersProcessor, concurrencyMode) if err = registerTransactionProcessors( registry, @@ -494,7 +494,7 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedgers(ledgers []xdr.Ledger groupTransactionFilterers := s.buildTransactionFilterer() // intentionally skip filtered out processor groupFilteredOutProcessors := newGroupTransactionProcessors(nil, nil, nil) - loaders, groupTransactionProcessors := s.buildTransactionProcessor(ledgersProcessor) + loaders, groupTransactionProcessors := s.buildTransactionProcessor(ledgersProcessor, history.ConcurrentInserts) startTime := time.Now() curHeap, sysHeap := getMemStats() @@ -611,7 +611,7 @@ func (s *ProcessorRunner) RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) ( return } - transactionStats, transactionDurations, tradeStats, loaderDurations, loaderStats, err := s.runTransactionProcessorsOnLedger(registry, ledger) + transactionStats, transactionDurations, tradeStats, loaderDurations, loaderStats, err := s.runTransactionProcessorsOnLedger(registry, ledger, history.ConcurrentDeletes) stats.changeStats = changeStatsProcessor.GetResults() stats.changeDurations = groupChangeProcessors.processorsRunDurations diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index e6ce6b512c..82c712b737 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -248,7 +248,7 @@ func TestProcessorRunnerBuildTransactionProcessor(t *testing.T) { ledgersProcessor := &processors.LedgersProcessor{} - _, processor := runner.buildTransactionProcessor(ledgersProcessor) + _, processor := runner.buildTransactionProcessor(ledgersProcessor, history.ConcurrentInserts) assert.IsType(t, &groupTransactionProcessors{}, processor) assert.IsType(t, &processors.StatsLedgerTransactionProcessor{}, processor.processors[0]) assert.IsType(t, &processors.EffectProcessor{}, processor.processors[1]) diff --git a/services/horizon/internal/ingest/processors/claimable_balances_transaction_processor_test.go b/services/horizon/internal/ingest/processors/claimable_balances_transaction_processor_test.go index ca918e08ea..5967ef41b1 100644 --- a/services/horizon/internal/ingest/processors/claimable_balances_transaction_processor_test.go +++ b/services/horizon/internal/ingest/processors/claimable_balances_transaction_processor_test.go @@ -44,7 +44,7 @@ func (s *ClaimableBalancesTransactionProcessorTestSuiteLedger) SetupTest() { }, }, } - s.cbLoader = history.NewClaimableBalanceLoader() + s.cbLoader = history.NewClaimableBalanceLoader(history.ConcurrentInserts) s.processor = NewClaimableBalancesTransactionProcessor( s.cbLoader, diff --git a/services/horizon/internal/ingest/processors/effects_processor_test.go b/services/horizon/internal/ingest/processors/effects_processor_test.go index 0243768fde..276f6fcb03 100644 --- a/services/horizon/internal/ingest/processors/effects_processor_test.go +++ b/services/horizon/internal/ingest/processors/effects_processor_test.go @@ -7,11 +7,11 @@ import ( "crypto/rand" "encoding/hex" "encoding/json" + "math/big" + "strings" "testing" "github.com/guregu/null" - "math/big" - "strings" "github.com/stellar/go/keypair" "github.com/stellar/go/protocols/horizon/base" @@ -62,7 +62,7 @@ func TestEffectsProcessorTestSuiteLedger(t *testing.T) { func (s *EffectsProcessorTestSuiteLedger) SetupTest() { s.ctx = context.Background() - s.accountLoader = history.NewAccountLoader() + s.accountLoader = history.NewAccountLoader(history.ConcurrentInserts) s.mockBatchInsertBuilder = &history.MockEffectBatchInsertBuilder{} s.lcm = xdr.LedgerCloseMeta{ @@ -446,7 +446,7 @@ func TestEffectsCoversAllOperationTypes(t *testing.T) { } assert.True(t, err2 != nil || err == nil, s) }() - err = operation.ingestEffects(history.NewAccountLoader(), &history.MockEffectBatchInsertBuilder{}) + err = operation.ingestEffects(history.NewAccountLoader(history.ConcurrentInserts), &history.MockEffectBatchInsertBuilder{}) }() } @@ -468,7 +468,7 @@ func TestEffectsCoversAllOperationTypes(t *testing.T) { ledgerSequence: 1, } // calling effects should error due to the unknown operation - err := operation.ingestEffects(history.NewAccountLoader(), &history.MockEffectBatchInsertBuilder{}) + err := operation.ingestEffects(history.NewAccountLoader(history.ConcurrentInserts), &history.MockEffectBatchInsertBuilder{}) assert.Contains(t, err.Error(), "Unknown operation type") } @@ -2558,7 +2558,7 @@ type effect struct { } func assertIngestEffects(t *testing.T, operation transactionOperationWrapper, expected []effect) { - accountLoader := history.NewAccountLoader() + accountLoader := history.NewAccountLoader(history.ConcurrentInserts) mockBatchInsertBuilder := &history.MockEffectBatchInsertBuilder{} for _, expectedEffect := range expected { diff --git a/services/horizon/internal/ingest/processors/liquidity_pools_transaction_processor_test.go b/services/horizon/internal/ingest/processors/liquidity_pools_transaction_processor_test.go index 8d08e44d44..cdafc5bcc3 100644 --- a/services/horizon/internal/ingest/processors/liquidity_pools_transaction_processor_test.go +++ b/services/horizon/internal/ingest/processors/liquidity_pools_transaction_processor_test.go @@ -44,7 +44,7 @@ func (s *LiquidityPoolsTransactionProcessorTestSuiteLedger) SetupTest() { }, }, } - s.lpLoader = history.NewLiquidityPoolLoader() + s.lpLoader = history.NewLiquidityPoolLoader(history.ConcurrentInserts) s.processor = NewLiquidityPoolsTransactionProcessor( s.lpLoader, diff --git a/services/horizon/internal/ingest/processors/participants_processor_test.go b/services/horizon/internal/ingest/processors/participants_processor_test.go index b81bd22f67..f6154b2b39 100644 --- a/services/horizon/internal/ingest/processors/participants_processor_test.go +++ b/services/horizon/internal/ingest/processors/participants_processor_test.go @@ -86,7 +86,7 @@ func (s *ParticipantsProcessorTestSuiteLedger) SetupTest() { s.thirdTx.Envelope.V1.Tx.SourceAccount = aid.ToMuxedAccount() s.thirdTxID = toid.New(int32(sequence), 3, 0).ToInt64() - s.accountLoader = history.NewAccountLoader() + s.accountLoader = history.NewAccountLoader(history.ConcurrentInserts) s.addressToFuture = map[string]history.FutureAccountID{} for _, address := range s.addresses { s.addressToFuture[address] = s.accountLoader.GetFuture(address) From 399818f6c582523b987e1f7e3700ecf4a17feb50 Mon Sep 17 00:00:00 2001 From: tamirms Date: Tue, 27 Aug 2024 22:37:49 +0100 Subject: [PATCH 12/17] add tests for concurrency mode --- .../internal/db2/history/account_loader.go | 21 ++ .../db2/history/loader_concurrency_test.go | 189 ++++++++++++++++++ services/horizon/internal/db2/history/main.go | 16 +- 3 files changed, 222 insertions(+), 4 deletions(-) create mode 100644 services/horizon/internal/db2/history/loader_concurrency_test.go diff --git a/services/horizon/internal/db2/history/account_loader.go b/services/horizon/internal/db2/history/account_loader.go index e402129a44..1f2314d63a 100644 --- a/services/horizon/internal/db2/history/account_loader.go +++ b/services/horizon/internal/db2/history/account_loader.go @@ -17,11 +17,32 @@ import ( var errSealed = errors.New("cannot register more entries to Loader after calling Exec()") +// ConcurrencyMode is used to configure the level of thread-safety for a loader type ConcurrencyMode int +func (cm ConcurrencyMode) String() string { + switch cm { + case ConcurrentInserts: + return "ConcurrentInserts" + case ConcurrentDeletes: + return "ConcurrentDeletes" + default: + return "unknown" + } +} + const ( _ ConcurrencyMode = iota + // ConcurrentInserts configures the loader to maintain safety when there are multiple loaders + // inserting into the same table concurrently. This ConcurrencyMode is suitable for parallel reingestion. + // Note while ConcurrentInserts is enabled it is not safe to have deletes occurring concurrently on the + // same table. ConcurrentInserts + // ConcurrentDeletes configures the loader to maintain safety when there is another thread which is invoking + // reapLookupTable() to delete rows from the same table concurrently. This ConcurrencyMode is suitable for + // live ingestion when reaping of lookup tables is enabled. + // Note while ConcurrentDeletes is enabled it is not safe to have multiple threads inserting concurrently to the + // same table. ConcurrentDeletes ) diff --git a/services/horizon/internal/db2/history/loader_concurrency_test.go b/services/horizon/internal/db2/history/loader_concurrency_test.go new file mode 100644 index 0000000000..d3da832bce --- /dev/null +++ b/services/horizon/internal/db2/history/loader_concurrency_test.go @@ -0,0 +1,189 @@ +package history + +import ( + "context" + "database/sql" + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/stellar/go/keypair" + "github.com/stellar/go/services/horizon/internal/test" +) + +func TestLoaderConcurrentInserts(t *testing.T) { + tt := test.Start(t) + defer tt.Finish() + test.ResetHorizonDB(t, tt.HorizonDB) + s1 := tt.HorizonSession() + s2 := s1.Clone() + + for _, testCase := range []struct { + mode ConcurrencyMode + pass bool + }{ + {ConcurrentInserts, true}, + {ConcurrentDeletes, false}, + } { + t.Failed() + t.Run(fmt.Sprintf("%v", testCase.mode), func(t *testing.T) { + var addresses []string + for i := 0; i < 10; i++ { + addresses = append(addresses, keypair.MustRandom().Address()) + } + + l1 := NewAccountLoader(testCase.mode) + for _, address := range addresses { + l1.GetFuture(address) + } + + for i := 0; i < 5; i++ { + addresses = append(addresses, keypair.MustRandom().Address()) + } + + l2 := NewAccountLoader(testCase.mode) + for _, address := range addresses { + l2.GetFuture(address) + } + + assert.NoError(t, s1.Begin(context.Background())) + assert.NoError(t, l1.Exec(context.Background(), s1)) + + assert.NoError(t, s2.Begin(context.Background())) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + <-time.After(time.Second * 3) + assert.NoError(t, s1.Commit()) + }() + assert.NoError(t, l2.Exec(context.Background(), s2)) + assert.NoError(t, s2.Commit()) + wg.Wait() + + assert.Equal(t, LoaderStats{ + Total: 10, + Inserted: 10, + }, l1.Stats()) + + if testCase.pass { + assert.Equal(t, LoaderStats{ + Total: 15, + Inserted: 5, + }, l2.Stats()) + } else { + assert.NotEqual(t, LoaderStats{ + Total: 15, + Inserted: 5, + }, l2.Stats()) + return + } + + q := &Q{s1} + for _, address := range addresses[:10] { + l1Id, err := l1.GetNow(address) + assert.NoError(t, err) + + l2Id, err := l2.GetNow(address) + assert.NoError(t, err) + assert.Equal(t, l1Id, l2Id) + + var account Account + assert.NoError(t, q.AccountByAddress(context.Background(), &account, address)) + assert.Equal(t, account.ID, l1Id) + assert.Equal(t, account.Address, address) + } + + for _, address := range addresses[10:] { + l2Id, err := l2.GetNow(address) + assert.NoError(t, err) + + var account Account + assert.NoError(t, q.AccountByAddress(context.Background(), &account, address)) + assert.Equal(t, account.ID, l2Id) + assert.Equal(t, account.Address, address) + } + }) + } +} + +func TestLoaderConcurrentDeletes(t *testing.T) { + tt := test.Start(t) + defer tt.Finish() + test.ResetHorizonDB(t, tt.HorizonDB) + s1 := tt.HorizonSession() + s2 := s1.Clone() + + for _, testCase := range []struct { + mode ConcurrencyMode + pass bool + }{ + {ConcurrentInserts, false}, + {ConcurrentDeletes, true}, + } { + t.Run(fmt.Sprintf("%v", testCase.mode), func(t *testing.T) { + var addresses []string + for i := 0; i < 10; i++ { + addresses = append(addresses, keypair.MustRandom().Address()) + } + + loader := NewAccountLoader(testCase.mode) + for _, address := range addresses { + loader.GetFuture(address) + } + assert.NoError(t, loader.Exec(context.Background(), s1)) + + var ids []int64 + for _, address := range addresses { + id, err := loader.GetNow(address) + assert.NoError(t, err) + ids = append(ids, id) + } + + loader = NewAccountLoader(testCase.mode) + for _, address := range addresses { + loader.GetFuture(address) + } + + assert.NoError(t, s1.Begin(context.Background())) + assert.NoError(t, loader.Exec(context.Background(), s1)) + + assert.NoError(t, s2.Begin(context.Background())) + q2 := &Q{s2} + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + <-time.After(time.Second * 3) + + q1 := &Q{s1} + for _, address := range addresses { + id, err := loader.GetNow(address) + assert.NoError(t, err) + + var account Account + err = q1.AccountByAddress(context.Background(), &account, address) + if testCase.pass { + assert.NoError(t, err) + assert.Equal(t, account.ID, id) + assert.Equal(t, account.Address, address) + } else { + assert.ErrorContains(t, err, sql.ErrNoRows.Error()) + } + } + assert.NoError(t, s1.Commit()) + }() + + deletedCount, err := q2.reapLookupTable(context.Background(), "history_accounts", ids, 1000) + assert.NoError(t, err) + assert.Equal(t, int64(len(addresses)), deletedCount) + assert.NoError(t, s2.Commit()) + + wg.Wait() + }) + } +} diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index ef285379a6..dd62e17c5e 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -1012,6 +1012,18 @@ func (q *Q) ReapLookupTable(ctx context.Context, table string, ids []int64, newO } defer q.Rollback() + rowsDeleted, err := q.reapLookupTable(ctx, table, ids, newOffset) + if err != nil { + return 0, err + } + + if err := q.Commit(); err != nil { + return 0, fmt.Errorf("could not commit transaction: %w", err) + } + return rowsDeleted, nil +} + +func (q *Q) reapLookupTable(ctx context.Context, table string, ids []int64, newOffset int64) (int64, error) { if err := q.updateLookupTableReapOffset(ctx, table, newOffset); err != nil { return 0, fmt.Errorf("error updating offset: %w", err) } @@ -1024,10 +1036,6 @@ func (q *Q) ReapLookupTable(ctx context.Context, table string, ids []int64, newO return 0, fmt.Errorf("could not delete orphaned rows: %w", err) } } - - if err := q.Commit(); err != nil { - return 0, fmt.Errorf("could not commit transaction: %w", err) - } return rowsDeleted, nil } From cee27c1fbd5a55a95efb25e96fb1ff66a0b651a0 Mon Sep 17 00:00:00 2001 From: tamirms Date: Tue, 3 Sep 2024 08:10:17 +0100 Subject: [PATCH 13/17] Update services/horizon/internal/db2/history/key_value.go Co-authored-by: shawn --- services/horizon/internal/db2/history/key_value.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/horizon/internal/db2/history/key_value.go b/services/horizon/internal/db2/history/key_value.go index 0d64033ebd..9fee9513c2 100644 --- a/services/horizon/internal/db2/history/key_value.go +++ b/services/horizon/internal/db2/history/key_value.go @@ -223,7 +223,7 @@ func (q *Q) getLookupTableReapOffset(ctx context.Context, table string) (int64, var offset int64 offset, err = strconv.ParseInt(text, 10, 64) if err != nil { - return 0, fmt.Errorf("invalid offset: %s", text) + return 0, fmt.Errorf("invalid offset: %s for table %s", text, table) } return offset, nil } From 372b6d221cafbe841657f2a4308ae4654f663506 Mon Sep 17 00:00:00 2001 From: tamirms Date: Tue, 3 Sep 2024 08:12:20 +0100 Subject: [PATCH 14/17] Update services/horizon/internal/db2/history/main.go Co-authored-by: shawn --- services/horizon/internal/db2/history/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index dd62e17c5e..9264bca967 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -1025,7 +1025,7 @@ func (q *Q) ReapLookupTable(ctx context.Context, table string, ids []int64, newO func (q *Q) reapLookupTable(ctx context.Context, table string, ids []int64, newOffset int64) (int64, error) { if err := q.updateLookupTableReapOffset(ctx, table, newOffset); err != nil { - return 0, fmt.Errorf("error updating offset: %w", err) + return 0, fmt.Errorf("error updating offset for table %s: %w ", table, err) } var rowsDeleted int64 From 57c57b997ebbf37abfa9f52c1c8588a23cb9d932 Mon Sep 17 00:00:00 2001 From: tamirms Date: Tue, 3 Sep 2024 14:17:20 +0100 Subject: [PATCH 15/17] code review fixes --- .../internal/db2/history/account_loader.go | 348 ----------------- .../horizon/internal/db2/history/loader.go | 365 ++++++++++++++++++ .../db2/history/loader_concurrency_test.go | 14 +- services/horizon/internal/db2/history/main.go | 59 +-- .../internal/db2/history/verify_lock.go | 7 +- 5 files changed, 413 insertions(+), 380 deletions(-) create mode 100644 services/horizon/internal/db2/history/loader.go diff --git a/services/horizon/internal/db2/history/account_loader.go b/services/horizon/internal/db2/history/account_loader.go index 1f2314d63a..f69e7d7f48 100644 --- a/services/horizon/internal/db2/history/account_loader.go +++ b/services/horizon/internal/db2/history/account_loader.go @@ -2,58 +2,10 @@ package history import ( "cmp" - "context" - "database/sql/driver" - "fmt" - "sort" - "strings" - - "github.com/lib/pq" "github.com/stellar/go/support/collections/set" - "github.com/stellar/go/support/db" - "github.com/stellar/go/support/errors" ) -var errSealed = errors.New("cannot register more entries to Loader after calling Exec()") - -// ConcurrencyMode is used to configure the level of thread-safety for a loader -type ConcurrencyMode int - -func (cm ConcurrencyMode) String() string { - switch cm { - case ConcurrentInserts: - return "ConcurrentInserts" - case ConcurrentDeletes: - return "ConcurrentDeletes" - default: - return "unknown" - } -} - -const ( - _ ConcurrencyMode = iota - // ConcurrentInserts configures the loader to maintain safety when there are multiple loaders - // inserting into the same table concurrently. This ConcurrencyMode is suitable for parallel reingestion. - // Note while ConcurrentInserts is enabled it is not safe to have deletes occurring concurrently on the - // same table. - ConcurrentInserts - // ConcurrentDeletes configures the loader to maintain safety when there is another thread which is invoking - // reapLookupTable() to delete rows from the same table concurrently. This ConcurrencyMode is suitable for - // live ingestion when reaping of lookup tables is enabled. - // Note while ConcurrentDeletes is enabled it is not safe to have multiple threads inserting concurrently to the - // same table. - ConcurrentDeletes -) - -// LoaderStats describes the result of executing a history lookup id Loader -type LoaderStats struct { - // Total is the number of elements registered to the Loader - Total int - // Inserted is the number of elements inserted into the lookup table - Inserted int -} - // FutureAccountID represents a future history account. // A FutureAccountID is created by an AccountLoader and // the account id is available after calling Exec() on @@ -92,306 +44,6 @@ func NewAccountLoader(concurrencyMode ConcurrencyMode) *AccountLoader { } } -type loader[K comparable, T any] struct { - sealed bool - set set.Set[K] - ids map[K]int64 - stats LoaderStats - name string - table string - columnsForKeys func([]K) []columnValues - mappingFromRow func(T) (K, int64) - less func(K, K) bool - concurrencyMode ConcurrencyMode -} - -type future[K comparable, T any] struct { - key K - loader *loader[K, T] -} - -// Value implements the database/sql/driver Valuer interface. -func (f future[K, T]) Value() (driver.Value, error) { - return f.loader.GetNow(f.key) -} - -// GetFuture registers the given key into the Loader and -// returns a future which will hold the history id for -// the key after Exec() is called. -func (l *loader[K, T]) GetFuture(key K) future[K, T] { - if l.sealed { - panic(errSealed) - } - - l.set.Add(key) - return future[K, T]{ - key: key, - loader: l, - } -} - -// GetNow returns the history id for the given key. -// GetNow should only be called on values which were registered by -// GetFuture() calls. Also, Exec() must be called before any GetNow -// call can succeed. -func (l *loader[K, T]) GetNow(key K) (int64, error) { - if !l.sealed { - return 0, fmt.Errorf(`invalid loader state, - Exec was not called yet to properly seal and resolve %v id`, key) - } - if internalID, ok := l.ids[key]; !ok { - return 0, fmt.Errorf(`loader key %v was not found`, key) - } else { - return internalID, nil - } -} - -// Exec will look up all the history ids for the keys registered in the Loader. -// If there are no history ids for a given set of keys, Exec will insert rows -// into the corresponding history table to establish a mapping between each key and its history id. -func (l *loader[K, T]) Exec(ctx context.Context, session db.SessionInterface) error { - l.sealed = true - if len(l.set) == 0 { - return nil - } - q := &Q{session} - keys := make([]K, 0, len(l.set)) - for key := range l.set { - keys = append(keys, key) - } - // sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock - // https://github.com/stellar/go/issues/2370 - sort.Slice(keys, func(i, j int) bool { - return l.less(keys[i], keys[j]) - }) - - if l.concurrencyMode == ConcurrentInserts { - if count, err := l.insert(ctx, q, keys); err != nil { - return err - } else { - l.stats.Total += count - l.stats.Inserted += count - } - - if count, err := l.query(ctx, q, keys, false); err != nil { - return err - } else { - l.stats.Total += count - } - } else if l.concurrencyMode == ConcurrentDeletes { - if count, err := l.query(ctx, q, keys, true); err != nil { - return err - } else { - l.stats.Total += count - } - - if count, err := l.insert(ctx, q, keys); err != nil { - return err - } else { - l.stats.Total += count - l.stats.Inserted += count - } - } else { - return fmt.Errorf("concurrency mode %v is invalid", l.concurrencyMode) - } - - return nil -} - -// Stats returns the number of addresses registered in the Loader and the number of rows -// inserted into the history table. -func (l *loader[K, T]) Stats() LoaderStats { - return l.stats -} - -func (l *loader[K, T]) Name() string { - return l.name -} - -func (l *loader[K, T]) filter(keys []K) []K { - if len(l.ids) == 0 { - return keys - } - - remaining := make([]K, 0, len(keys)) - for _, key := range keys { - if _, ok := l.ids[key]; ok { - continue - } - remaining = append(remaining, key) - } - return remaining -} - -func (l *loader[K, T]) updateMap(rows []T) { - for _, row := range rows { - key, id := l.mappingFromRow(row) - l.ids[key] = id - } -} - -func (l *loader[K, T]) insert(ctx context.Context, q *Q, keys []K) (int, error) { - keys = l.filter(keys) - if len(keys) == 0 { - return 0, nil - } - - var rows []T - err := bulkInsert( - ctx, - q, - l.table, - l.columnsForKeys(keys), - &rows, - ) - if err != nil { - return 0, err - } - - l.updateMap(rows) - return len(rows), nil -} - -func (l *loader[K, T]) query(ctx context.Context, q *Q, keys []K, lockRows bool) (int, error) { - keys = l.filter(keys) - if len(keys) == 0 { - return 0, nil - } - var suffix string - if lockRows { - suffix = "ORDER BY id ASC FOR KEY SHARE" - } - - var rows []T - err := bulkGet( - ctx, - q, - l.table, - l.columnsForKeys(keys), - &rows, - suffix, - ) - if err != nil { - return 0, err - } - - l.updateMap(rows) - return len(rows), nil -} - -type columnValues struct { - name string - dbType string - objects []string -} - -func bulkInsert(ctx context.Context, q *Q, table string, fields []columnValues, response interface{}) error { - unnestPart := make([]string, 0, len(fields)) - insertFieldsPart := make([]string, 0, len(fields)) - pqArrays := make([]interface{}, 0, len(fields)) - - // In the code below we are building the bulk insert query which looks like: - // - // WITH rows AS - // (SELECT - // /* unnestPart */ - // unnest(?::type1[]), /* field1 */ - // unnest(?::type2[]), /* field2 */ - // ... - // ) - // INSERT INTO table ( - // /* insertFieldsPart */ - // field1, - // field2, - // ... - // ) - // SELECT * FROM rows ON CONFLICT (field1, field2, ...) DO NOTHING RETURNING * - // - // Using unnest allows to get around the maximum limit of 65,535 query parameters, - // see https://www.postgresql.org/docs/12/limits.html and - // https://klotzandrew.com/blog/postgres-passing-65535-parameter-limit/ - // - // Without using unnest we would have to use multiple insert statements to insert - // all the rows for large datasets. - for _, field := range fields { - unnestPart = append( - unnestPart, - fmt.Sprintf("unnest(?::%s[]) /* %s */", field.dbType, field.name), - ) - insertFieldsPart = append( - insertFieldsPart, - field.name, - ) - pqArrays = append( - pqArrays, - pq.Array(field.objects), - ) - } - columns := strings.Join(insertFieldsPart, ",") - - sql := ` - WITH rows AS - (SELECT ` + strings.Join(unnestPart, ",") + `) - INSERT INTO ` + table + ` - (` + columns + `) - SELECT * FROM rows - ON CONFLICT (` + columns + `) DO NOTHING - RETURNING *` - - return q.SelectRaw( - ctx, - response, - sql, - pqArrays..., - ) -} - -func bulkGet(ctx context.Context, q *Q, table string, fields []columnValues, response interface{}, suffix string) error { - unnestPart := make([]string, 0, len(fields)) - columns := make([]string, 0, len(fields)) - pqArrays := make([]interface{}, 0, len(fields)) - - // In the code below we are building the bulk get query which looks like: - // - // SELECT * FROM table WHERE (field1, field2, ...) IN - // (SELECT - // /* unnestPart */ - // unnest(?::type1[]), /* field1 */ - // unnest(?::type2[]), /* field2 */ - // ... - // ) - // - // Using unnest allows to get around the maximum limit of 65,535 query parameters, - // see https://www.postgresql.org/docs/12/limits.html and - // https://klotzandrew.com/blog/postgres-passing-65535-parameter-limit/ - // - // Without using unnest we would have to use multiple select statements to obtain - // all the rows for large datasets. - for _, field := range fields { - unnestPart = append( - unnestPart, - fmt.Sprintf("unnest(?::%s[]) /* %s */", field.dbType, field.name), - ) - columns = append( - columns, - field.name, - ) - pqArrays = append( - pqArrays, - pq.Array(field.objects), - ) - } - sql := `SELECT * FROM ` + table + ` WHERE (` + strings.Join(columns, ",") + `) IN - (SELECT ` + strings.Join(unnestPart, ",") + `) ` + suffix - - return q.SelectRaw( - ctx, - response, - sql, - pqArrays..., - ) -} - // AccountLoaderStub is a stub wrapper around AccountLoader which allows // you to manually configure the mapping of addresses to history account ids type AccountLoaderStub struct { diff --git a/services/horizon/internal/db2/history/loader.go b/services/horizon/internal/db2/history/loader.go new file mode 100644 index 0000000000..dc2236accb --- /dev/null +++ b/services/horizon/internal/db2/history/loader.go @@ -0,0 +1,365 @@ +package history + +import ( + "context" + "database/sql/driver" + "fmt" + "sort" + "strings" + + "github.com/lib/pq" + + "github.com/stellar/go/support/collections/set" + "github.com/stellar/go/support/db" +) + +var errSealed = fmt.Errorf("cannot register more entries to Loader after calling Exec()") + +// ConcurrencyMode is used to configure the level of thread-safety for a loader +type ConcurrencyMode int + +func (cm ConcurrencyMode) String() string { + switch cm { + case ConcurrentInserts: + return "ConcurrentInserts" + case ConcurrentDeletes: + return "ConcurrentDeletes" + default: + return "unknown" + } +} + +const ( + _ ConcurrencyMode = iota + // ConcurrentInserts configures the loader to maintain safety when there are multiple loaders + // inserting into the same table concurrently. This ConcurrencyMode is suitable for parallel reingestion. + // Note while ConcurrentInserts is enabled it is not safe to have deletes occurring concurrently on the + // same table. + ConcurrentInserts + // ConcurrentDeletes configures the loader to maintain safety when there is another thread which is invoking + // reapLookupTable() to delete rows from the same table concurrently. This ConcurrencyMode is suitable for + // live ingestion when reaping of lookup tables is enabled. + // Note while ConcurrentDeletes is enabled it is not safe to have multiple threads inserting concurrently to the + // same table. + ConcurrentDeletes +) + +// LoaderStats describes the result of executing a history lookup id Loader +type LoaderStats struct { + // Total is the number of elements registered to the Loader + Total int + // Inserted is the number of elements inserted into the lookup table + Inserted int +} + +type loader[K comparable, T any] struct { + sealed bool + set set.Set[K] + ids map[K]int64 + stats LoaderStats + name string + table string + columnsForKeys func([]K) []columnValues + mappingFromRow func(T) (K, int64) + less func(K, K) bool + concurrencyMode ConcurrencyMode +} + +type future[K comparable, T any] struct { + key K + loader *loader[K, T] +} + +// Value implements the database/sql/driver Valuer interface. +func (f future[K, T]) Value() (driver.Value, error) { + return f.loader.GetNow(f.key) +} + +// GetFuture registers the given key into the Loader and +// returns a future which will hold the history id for +// the key after Exec() is called. +func (l *loader[K, T]) GetFuture(key K) future[K, T] { + if l.sealed { + panic(errSealed) + } + + l.set.Add(key) + return future[K, T]{ + key: key, + loader: l, + } +} + +// GetNow returns the history id for the given key. +// GetNow should only be called on values which were registered by +// GetFuture() calls. Also, Exec() must be called before any GetNow +// call can succeed. +func (l *loader[K, T]) GetNow(key K) (int64, error) { + if !l.sealed { + return 0, fmt.Errorf(`invalid loader state, + Exec was not called yet to properly seal and resolve %v id`, key) + } + if internalID, ok := l.ids[key]; !ok { + return 0, fmt.Errorf(`loader key %v was not found`, key) + } else { + return internalID, nil + } +} + +// Exec will look up all the history ids for the keys registered in the Loader. +// If there are no history ids for a given set of keys, Exec will insert rows +// into the corresponding history table to establish a mapping between each key and its history id. +func (l *loader[K, T]) Exec(ctx context.Context, session db.SessionInterface) error { + l.sealed = true + if len(l.set) == 0 { + return nil + } + q := &Q{session} + keys := make([]K, 0, len(l.set)) + for key := range l.set { + keys = append(keys, key) + } + // sort entries before inserting rows to prevent deadlocks on acquiring a ShareLock + // https://github.com/stellar/go/issues/2370 + sort.Slice(keys, func(i, j int) bool { + return l.less(keys[i], keys[j]) + }) + + if l.concurrencyMode == ConcurrentInserts { + // if there are other ingestion transactions running concurrently, + // we need to first insert the records (with a ON CONFLICT DO NOTHING + // clause). Then, we can query for the remaining records. + // This order (insert first and then query) is important because + // if multiple concurrent transactions try to insert the same record + // only one of them will succeed and the other transactions will omit + // the record from the RETURNING set. + if count, err := l.insert(ctx, q, keys); err != nil { + return err + } else { + l.stats.Total += count + l.stats.Inserted += count + } + + if count, err := l.query(ctx, q, keys, false); err != nil { + return err + } else { + l.stats.Total += count + } + } else if l.concurrencyMode == ConcurrentDeletes { + // if the lookup table reaping transaction is running concurrently, + // we need to lock the rows from the lookup table to ensure that + // the reaper cannot run until after the ingestion transaction has + // been committed. + if count, err := l.query(ctx, q, keys, true); err != nil { + return err + } else { + l.stats.Total += count + } + + // insert whatever records were not found from l.query() + if count, err := l.insert(ctx, q, keys); err != nil { + return err + } else { + l.stats.Total += count + l.stats.Inserted += count + } + } else { + return fmt.Errorf("concurrency mode %v is invalid", l.concurrencyMode) + } + + return nil +} + +// Stats returns the number of addresses registered in the Loader and the number of rows +// inserted into the history table. +func (l *loader[K, T]) Stats() LoaderStats { + return l.stats +} + +func (l *loader[K, T]) Name() string { + return l.name +} + +func (l *loader[K, T]) filter(keys []K) []K { + if len(l.ids) == 0 { + return keys + } + + remaining := make([]K, 0, len(keys)) + for _, key := range keys { + if _, ok := l.ids[key]; ok { + continue + } + remaining = append(remaining, key) + } + return remaining +} + +func (l *loader[K, T]) updateMap(rows []T) { + for _, row := range rows { + key, id := l.mappingFromRow(row) + l.ids[key] = id + } +} + +func (l *loader[K, T]) insert(ctx context.Context, q *Q, keys []K) (int, error) { + keys = l.filter(keys) + if len(keys) == 0 { + return 0, nil + } + + var rows []T + err := bulkInsert( + ctx, + q, + l.table, + l.columnsForKeys(keys), + &rows, + ) + if err != nil { + return 0, err + } + + l.updateMap(rows) + return len(rows), nil +} + +func (l *loader[K, T]) query(ctx context.Context, q *Q, keys []K, lockRows bool) (int, error) { + keys = l.filter(keys) + if len(keys) == 0 { + return 0, nil + } + var suffix string + if lockRows { + suffix = "ORDER BY id ASC FOR KEY SHARE" + } + + var rows []T + err := bulkGet( + ctx, + q, + l.table, + l.columnsForKeys(keys), + &rows, + suffix, + ) + if err != nil { + return 0, err + } + + l.updateMap(rows) + return len(rows), nil +} + +type columnValues struct { + name string + dbType string + objects []string +} + +func bulkInsert(ctx context.Context, q *Q, table string, fields []columnValues, response interface{}) error { + unnestPart := make([]string, 0, len(fields)) + insertFieldsPart := make([]string, 0, len(fields)) + pqArrays := make([]interface{}, 0, len(fields)) + + // In the code below we are building the bulk insert query which looks like: + // + // WITH rows AS + // (SELECT + // /* unnestPart */ + // unnest(?::type1[]), /* field1 */ + // unnest(?::type2[]), /* field2 */ + // ... + // ) + // INSERT INTO table ( + // /* insertFieldsPart */ + // field1, + // field2, + // ... + // ) + // SELECT * FROM rows ON CONFLICT (field1, field2, ...) DO NOTHING RETURNING * + // + // Using unnest allows to get around the maximum limit of 65,535 query parameters, + // see https://www.postgresql.org/docs/12/limits.html and + // https://klotzandrew.com/blog/postgres-passing-65535-parameter-limit/ + // + // Without using unnest we would have to use multiple insert statements to insert + // all the rows for large datasets. + for _, field := range fields { + unnestPart = append( + unnestPart, + fmt.Sprintf("unnest(?::%s[]) /* %s */", field.dbType, field.name), + ) + insertFieldsPart = append( + insertFieldsPart, + field.name, + ) + pqArrays = append( + pqArrays, + pq.Array(field.objects), + ) + } + columns := strings.Join(insertFieldsPart, ",") + + sql := ` + WITH rows AS + (SELECT ` + strings.Join(unnestPart, ",") + `) + INSERT INTO ` + table + ` + (` + columns + `) + SELECT * FROM rows + ON CONFLICT (` + columns + `) DO NOTHING + RETURNING *` + + return q.SelectRaw( + ctx, + response, + sql, + pqArrays..., + ) +} + +func bulkGet(ctx context.Context, q *Q, table string, fields []columnValues, response interface{}, suffix string) error { + unnestPart := make([]string, 0, len(fields)) + columns := make([]string, 0, len(fields)) + pqArrays := make([]interface{}, 0, len(fields)) + + // In the code below we are building the bulk get query which looks like: + // + // SELECT * FROM table WHERE (field1, field2, ...) IN + // (SELECT + // /* unnestPart */ + // unnest(?::type1[]), /* field1 */ + // unnest(?::type2[]), /* field2 */ + // ... + // ) + // + // Using unnest allows to get around the maximum limit of 65,535 query parameters, + // see https://www.postgresql.org/docs/12/limits.html and + // https://klotzandrew.com/blog/postgres-passing-65535-parameter-limit/ + // + // Without using unnest we would have to use multiple select statements to obtain + // all the rows for large datasets. + for _, field := range fields { + unnestPart = append( + unnestPart, + fmt.Sprintf("unnest(?::%s[]) /* %s */", field.dbType, field.name), + ) + columns = append( + columns, + field.name, + ) + pqArrays = append( + pqArrays, + pq.Array(field.objects), + ) + } + sql := `SELECT * FROM ` + table + ` WHERE (` + strings.Join(columns, ",") + `) IN + (SELECT ` + strings.Join(unnestPart, ",") + `) ` + suffix + + return q.SelectRaw( + ctx, + response, + sql, + pqArrays..., + ) +} diff --git a/services/horizon/internal/db2/history/loader_concurrency_test.go b/services/horizon/internal/db2/history/loader_concurrency_test.go index d3da832bce..e87d901bd8 100644 --- a/services/horizon/internal/db2/history/loader_concurrency_test.go +++ b/services/horizon/internal/db2/history/loader_concurrency_test.go @@ -28,7 +28,6 @@ func TestLoaderConcurrentInserts(t *testing.T) { {ConcurrentInserts, true}, {ConcurrentDeletes, false}, } { - t.Failed() t.Run(fmt.Sprintf("%v", testCase.mode), func(t *testing.T) { var addresses []string for i := 0; i < 10; i++ { @@ -60,6 +59,10 @@ func TestLoaderConcurrentInserts(t *testing.T) { <-time.After(time.Second * 3) assert.NoError(t, s1.Commit()) }() + // l2.Exec(context.Background(), s2) will block until s1 + // is committed because s1 and s2 both attempt to insert common + // accounts and, since s1 executed first, s2 must wait until + // s1 terminates. assert.NoError(t, l2.Exec(context.Background(), s2)) assert.NoError(t, s2.Commit()) wg.Wait() @@ -75,8 +78,8 @@ func TestLoaderConcurrentInserts(t *testing.T) { Inserted: 5, }, l2.Stats()) } else { - assert.NotEqual(t, LoaderStats{ - Total: 15, + assert.Equal(t, LoaderStats{ + Total: 5, Inserted: 5, }, l2.Stats()) return @@ -101,6 +104,9 @@ func TestLoaderConcurrentInserts(t *testing.T) { l2Id, err := l2.GetNow(address) assert.NoError(t, err) + _, err = l1.GetNow(address) + assert.ErrorContains(t, err, "was not found") + var account Account assert.NoError(t, q.AccountByAddress(context.Background(), &account, address)) assert.Equal(t, account.ID, l2Id) @@ -178,6 +184,8 @@ func TestLoaderConcurrentDeletes(t *testing.T) { assert.NoError(t, s1.Commit()) }() + // the reaper should block until s1 has been committed because s1 has locked + // the orphaned rows deletedCount, err := q2.reapLookupTable(context.Background(), "history_accounts", ids, 1000) assert.NoError(t, err) assert.Equal(t, int64(len(addresses)), deletedCount) diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index 9264bca967..2707323be3 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -988,7 +988,14 @@ func (q *Q) FindLookupTableRowsToReap(ctx context.Context, table string, batchSi // Find new offset before removing the rows var newOffset int64 - err = q.GetRaw(ctx, &newOffset, fmt.Sprintf("SELECT id FROM %s where id >= %d limit 1 offset %d", table, offset, batchSize)) + err = q.GetRaw( + ctx, + &newOffset, + fmt.Sprintf( + "SELECT id FROM %s WHERE id >= %d ORDER BY id ASC LIMIT 1 OFFSET %d", + table, offset, batchSize, + ), + ) if err != nil { if q.NoRows(err) { newOffset = 0 @@ -1103,31 +1110,6 @@ var historyLookupTables = map[string][]tableObjectFieldPair{ }, } -// constructReapLookupTablesQuery creates a query like (using history_claimable_balances -// as an example): -// -// delete from history_claimable_balances where id in ( -// -// WITH ha_batch AS ( -// SELECT id -// FROM history_claimable_balances -// WHERE id >= 1000 -// ORDER BY id limit 1000 -// ) SELECT e1.id as id FROM ha_batch e1 -// WHERE NOT EXISTS (SELECT 1 FROM history_transaction_claimable_balances WHERE history_transaction_claimable_balances.history_claimable_balance_id = id limit 1) -// AND NOT EXISTS (SELECT 1 FROM history_operation_claimable_balances WHERE history_operation_claimable_balances.history_claimable_balance_id = id limit 1) -// ) -// -// In short it checks the 1000 rows omitting 1000 row of history_claimable_balances -// and counts occurrences of each row in corresponding history tables. -// If there are no history rows for a given id, the row in -// history_claimable_balances is removed. -// -// The offset param should be increased before each execution. Given that -// some rows will be removed and some will be added by ingestion it's -// possible that rows will be skipped from deletion. But offset is reset -// when it reaches the table size so eventually all orphaned rows are -// deleted. func (q *Q) deleteLookupTableRows(ctx context.Context, table string, ids []int64) (int64, error) { deleteQuery := constructDeleteLookupTableRowsQuery(table, ids) result, err := q.ExecRaw( @@ -1135,7 +1117,7 @@ func (q *Q) deleteLookupTableRows(ctx context.Context, table string, ids []int64 deleteQuery, ) if err != nil { - return 0, fmt.Errorf("error running query: %w", err) + return 0, fmt.Errorf("error running query %s : %w", deleteQuery, err) } var deletedCount int64 deletedCount, err = result.RowsAffected() @@ -1145,6 +1127,29 @@ func (q *Q) deleteLookupTableRows(ctx context.Context, table string, ids []int64 return deletedCount, nil } +// constructDeleteLookupTableRowsQuery creates a query like (using history_claimable_balances +// as an example): +// +// WITH ha_batch AS ( +// SELECT id +// FROM history_claimable_balances +// WHERE IN ($1, $2, ...) ORDER BY id asc FOR UPDATE +// ) DELETE FROM history_claimable_balances WHERE id IN ( +// SELECT e1.id as id FROM ha_batch e1 +// WHERE NOT EXISTS (SELECT 1 FROM history_transaction_claimable_balances WHERE history_transaction_claimable_balances.history_claimable_balance_id = id limit 1) +// AND NOT EXISTS (SELECT 1 FROM history_operation_claimable_balances WHERE history_operation_claimable_balances.history_claimable_balance_id = id limit 1) +// ) +// +// It checks each of the candidate rows provided in the top level IN clause +// and counts occurrences of each row in corresponding history tables. +// If there are no history rows for a given id, the row in +// history_claimable_balances is removed. +// +// Note that the rows are locked using via SELECT FOR UPDATE. The reason +// for that is to maintain safety when ingestion is running concurrently. +// The ingestion loaders will also lock rows from the history lookup tables +// via SELECT FOR KEY SHARE. This will ensure that the reaping transaction +// will block until the ingestion transaction commits (or vice-versa). func constructDeleteLookupTableRowsQuery(table string, ids []int64) string { var conditions []string for _, referencedTable := range historyLookupTables[table] { diff --git a/services/horizon/internal/db2/history/verify_lock.go b/services/horizon/internal/db2/history/verify_lock.go index b5d5d37e7f..4d7d1fbde7 100644 --- a/services/horizon/internal/db2/history/verify_lock.go +++ b/services/horizon/internal/db2/history/verify_lock.go @@ -13,9 +13,12 @@ const ( // all ingesting nodes use the same value which is why it's hard coded here.`1 stateVerificationLockId = 73897213 // reaperLockId is the objid for the advisory lock acquired during - // reaping. The value is arbitrary. The only requirement is that + // reaping of history tables. The value is arbitrary. The only requirement is that + // all ingesting nodes use the same value which is why it's hard coded here. + reaperLockId = 944670730 + // lookupTableReaperLockId is the objid for the advisory lock acquired during + // reaping of lookup tables. The value is arbitrary. The only requirement is that // all ingesting nodes use the same value which is why it's hard coded here. - reaperLockId = 944670730 lookupTableReaperLockId = 329518896 ) From 5b6027dcf358e7f066cdbc8fbffd45628299a04b Mon Sep 17 00:00:00 2001 From: tamirms Date: Thu, 5 Sep 2024 06:44:21 +0100 Subject: [PATCH 16/17] add direction to order by clause --- services/horizon/internal/db2/history/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index 2707323be3..59d828d091 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -1198,7 +1198,7 @@ func constructFindReapLookupTablesQuery(table string, batchSize int, offset int6 } return fmt.Sprintf( - "WITH ha_batch AS (SELECT id FROM %s WHERE id >= %d ORDER BY id limit %d) "+ + "WITH ha_batch AS (SELECT id FROM %s WHERE id >= %d ORDER BY id ASC limit %d) "+ "SELECT e1.id as id FROM ha_batch e1 WHERE ", table, offset, From 98556cec397b271a4fcf304fc7d70a414ea11df6 Mon Sep 17 00:00:00 2001 From: tamirms Date: Thu, 5 Sep 2024 07:04:21 +0100 Subject: [PATCH 17/17] fix TestConstructReapLookupTablesQuery --- services/horizon/internal/db2/history/main_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/horizon/internal/db2/history/main_test.go b/services/horizon/internal/db2/history/main_test.go index e74e02e61d..a86f4c14a4 100644 --- a/services/horizon/internal/db2/history/main_test.go +++ b/services/horizon/internal/db2/history/main_test.go @@ -93,7 +93,7 @@ func TestConstructReapLookupTablesQuery(t *testing.T) { ) assert.Equal(t, - "WITH ha_batch AS (SELECT id FROM history_accounts WHERE id >= 0 ORDER BY id limit 10) SELECT e1.id as id FROM ha_batch e1 "+ + "WITH ha_batch AS (SELECT id FROM history_accounts WHERE id >= 0 ORDER BY id ASC limit 10) SELECT e1.id as id FROM ha_batch e1 "+ "WHERE NOT EXISTS ( SELECT 1 as row FROM history_transaction_participants WHERE history_transaction_participants.history_account_id = id LIMIT 1) "+ "AND NOT EXISTS ( SELECT 1 as row FROM history_effects WHERE history_effects.history_account_id = id LIMIT 1) "+ "AND NOT EXISTS ( SELECT 1 as row FROM history_operation_participants WHERE history_operation_participants.history_account_id = id LIMIT 1) "+