Skip to content

Commit

Permalink
address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Farber98 committed Nov 26, 2024
1 parent 7c545d8 commit a45fa1e
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 151 deletions.
3 changes: 2 additions & 1 deletion pkg/solana/txm/txm.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,8 @@ func (txm *Txm) handleNotFoundSignatureStatus(sig solanaGo.Signature) {
// Otherwise, it marks the transaction as errored.
func (txm *Txm) handleErrorSignatureStatus(sig solanaGo.Signature, status *rpc.SignatureStatusesResult) {
// We want to rebroadcast rather than drop tx if expiration rebroadcast is enabled when blockhash was not found.
if status.Err != nil && status.Err == client.ErrBlockhashNotFound && txm.cfg.TxExpirationRebroadcast() {
// converting error to string so we are able to check if it contains the error message.
if status.Err != nil && strings.Contains(fmt.Sprintf("%v", status.Err), "BlockhashNotFound") && txm.cfg.TxExpirationRebroadcast() {
return
}

Expand Down
157 changes: 58 additions & 99 deletions pkg/solana/txm/txm_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,14 @@ func TestTxm(t *testing.T) {
return out
}, nil,
)

mc.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{
Value: &rpc.LatestBlockhashResult{
LastValidBlockHeight: 100,
Blockhash: solana.Hash{},
},
}, nil)
// happy path (send => simulate success => tx: nil => tx: processed => tx: confirmed => finalized => done)
t.Run("happyPath", func(t *testing.T) {
mc.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{
Value: &rpc.LatestBlockhashResult{
LastValidBlockHeight: 100,
Blockhash: solana.Hash{},
},
}, nil).Once()
sig := randomSignature(t)
tx, signed := getTx(t, 0, mkey)
var wg sync.WaitGroup
Expand Down Expand Up @@ -234,12 +233,6 @@ func TestTxm(t *testing.T) {

// fail on initial transmit (RPC immediate rejects)
t.Run("fail_initialTx", func(t *testing.T) {
mc.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{
Value: &rpc.LatestBlockhashResult{
LastValidBlockHeight: 100,
Blockhash: solana.Hash{},
},
}, nil).Once()
tx, signed := getTx(t, 1, mkey)
var wg sync.WaitGroup
wg.Add(1)
Expand Down Expand Up @@ -267,12 +260,6 @@ func TestTxm(t *testing.T) {
})
// tx fails simulation (simulation error)
t.Run("fail_simulation", func(t *testing.T) {
mc.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{
Value: &rpc.LatestBlockhashResult{
LastValidBlockHeight: 100,
Blockhash: solana.Hash{},
},
}, nil).Once()
tx, signed := getTx(t, 2, mkey)
sig := randomSignature(t)
var wg sync.WaitGroup
Expand Down Expand Up @@ -302,12 +289,6 @@ func TestTxm(t *testing.T) {

// tx fails simulation (rpc error, timeout should clean up b/c sig status will be nil)
t.Run("fail_simulation_confirmNil", func(t *testing.T) {
mc.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{
Value: &rpc.LatestBlockhashResult{
LastValidBlockHeight: 100,
Blockhash: solana.Hash{},
},
}, nil).Once()
tx, signed := getTx(t, 3, mkey)
sig := randomSignature(t)
retry0 := randomSignature(t)
Expand Down Expand Up @@ -347,12 +328,6 @@ func TestTxm(t *testing.T) {
// tx fails simulation with an InstructionError (indicates reverted execution)
// manager should cancel sending retry immediately + increment reverted prom metric
t.Run("fail_simulation_instructionError", func(t *testing.T) {
mc.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{
Value: &rpc.LatestBlockhashResult{
LastValidBlockHeight: 100,
Blockhash: solana.Hash{},
},
}, nil).Once()
tx, signed := getTx(t, 4, mkey)
sig := randomSignature(t)
var wg sync.WaitGroup
Expand Down Expand Up @@ -392,12 +367,6 @@ func TestTxm(t *testing.T) {
// tx fails simulation with BlockHashNotFound error
// txm should continue to finalize tx (in this case it will succeed)
t.Run("fail_simulation_blockhashNotFound", func(t *testing.T) {
mc.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{
Value: &rpc.LatestBlockhashResult{
LastValidBlockHeight: 100,
Blockhash: solana.Hash{},
},
}, nil).Once()
tx, signed := getTx(t, 5, mkey)
sig := randomSignature(t)
var wg sync.WaitGroup
Expand Down Expand Up @@ -448,12 +417,6 @@ func TestTxm(t *testing.T) {
// tx fails simulation with AlreadyProcessed error
// txm should continue to confirm tx (in this case it will revert)
t.Run("fail_simulation_alreadyProcessed", func(t *testing.T) {
mc.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{
Value: &rpc.LatestBlockhashResult{
LastValidBlockHeight: 100,
Blockhash: solana.Hash{},
},
}, nil).Once()
tx, signed := getTx(t, 6, mkey)
sig := randomSignature(t)
var wg sync.WaitGroup
Expand Down Expand Up @@ -494,12 +457,6 @@ func TestTxm(t *testing.T) {

// tx passes sim, never passes processed (timeout should cleanup)
t.Run("fail_confirm_processed", func(t *testing.T) {
mc.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{
Value: &rpc.LatestBlockhashResult{
LastValidBlockHeight: 100,
Blockhash: solana.Hash{},
},
}, nil).Once()
tx, signed := getTx(t, 7, mkey)
sig := randomSignature(t)
retry0 := randomSignature(t)
Expand Down Expand Up @@ -544,12 +501,6 @@ func TestTxm(t *testing.T) {

// tx passes sim, shows processed, moves to nil (timeout should cleanup)
t.Run("fail_confirm_processedToNil", func(t *testing.T) {
mc.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{
Value: &rpc.LatestBlockhashResult{
LastValidBlockHeight: 100,
Blockhash: solana.Hash{},
},
}, nil).Once()
tx, signed := getTx(t, 8, mkey)
sig := randomSignature(t)
retry0 := randomSignature(t)
Expand Down Expand Up @@ -601,12 +552,6 @@ func TestTxm(t *testing.T) {

// tx passes sim, errors on confirm
t.Run("fail_confirm_revert", func(t *testing.T) {
mc.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{
Value: &rpc.LatestBlockhashResult{
LastValidBlockHeight: 100,
Blockhash: solana.Hash{},
},
}, nil).Once()
tx, signed := getTx(t, 9, mkey)
sig := randomSignature(t)
var wg sync.WaitGroup
Expand Down Expand Up @@ -644,12 +589,6 @@ func TestTxm(t *testing.T) {

// tx passes sim, first retried TXs get dropped
t.Run("success_retryTx", func(t *testing.T) {
mc.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{
Value: &rpc.LatestBlockhashResult{
LastValidBlockHeight: 100,
Blockhash: solana.Hash{},
},
}, nil).Once()
tx, signed := getTx(t, 10, mkey)
sig := randomSignature(t)
retry0 := randomSignature(t)
Expand Down Expand Up @@ -696,12 +635,6 @@ func TestTxm(t *testing.T) {

// fee bumping disabled
t.Run("feeBumpingDisabled", func(t *testing.T) {
mc.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{
Value: &rpc.LatestBlockhashResult{
LastValidBlockHeight: 100,
Blockhash: solana.Hash{},
},
}, nil).Once()
sig := randomSignature(t)
tx, signed := getTx(t, 11, mkey)

Expand Down Expand Up @@ -762,12 +695,6 @@ func TestTxm(t *testing.T) {

// compute unit limit disabled
t.Run("computeUnitLimitDisabled", func(t *testing.T) {
mc.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{
Value: &rpc.LatestBlockhashResult{
LastValidBlockHeight: 100,
Blockhash: solana.Hash{},
},
}, nil).Once()
sig := randomSignature(t)
tx, signed := getTx(t, 12, mkey)

Expand Down Expand Up @@ -981,14 +908,14 @@ func TestTxm_compute_unit_limit_estimation(t *testing.T) {
return out
}, nil,
)
mc.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{
Value: &rpc.LatestBlockhashResult{
LastValidBlockHeight: 100,
Blockhash: solana.Hash{},
},
}, nil)

t.Run("simulation_succeeds", func(t *testing.T) {
mc.On("LatestBlockhash", mock.Anything).Return(&rpc.GetLatestBlockhashResult{
Value: &rpc.LatestBlockhashResult{
LastValidBlockHeight: 100,
Blockhash: solana.Hash{},
},
}, nil).Once()
// Test tx is not discarded due to confirm timeout and tracked to finalization
tx, signed := getTx(t, 1, mkey)
// add signature and compute unit limit to tx for simulation (excludes compute unit price)
Expand Down Expand Up @@ -1185,7 +1112,7 @@ func TestTxm_ExpirationRebroadcast(t *testing.T) {
cfg := config.NewDefault()
cfg.Chain.FeeEstimatorMode = &estimator
cfg.Chain.TxConfirmTimeout = relayconfig.MustNewDuration(5 * time.Second)
cfg.Chain.TxRetentionTimeout = relayconfig.MustNewDuration(10 * time.Second) // Enable retention to keep transactions after finality and be able to check.
cfg.Chain.TxRetentionTimeout = relayconfig.MustNewDuration(10 * time.Second) // Enable retention to keep transactions after finality and be able to check their statuses.
lggr := logger.Test(t)
ctx := tests.Context(t)

Expand Down Expand Up @@ -1222,7 +1149,7 @@ func TestTxm_ExpirationRebroadcast(t *testing.T) {
).Maybe()
}

mc.On("SimulateTx", mock.Anything, mock.Anything, mock.Anything).Return(&rpc.SimulateTransactionResult{}, nil)
mc.On("SimulateTx", mock.Anything, mock.Anything, mock.Anything).Return(&rpc.SimulateTransactionResult{}, nil).Maybe()
if statuses != nil {
mc.On("SignatureStatuses", mock.Anything, mock.AnythingOfType("[]solana.Signature")).Return(
func(_ context.Context, sigs []solana.Signature) ([]*rpc.SignatureStatusesResult, error) {
Expand Down Expand Up @@ -1251,6 +1178,9 @@ func TestTxm_ExpirationRebroadcast(t *testing.T) {
return txm, mc, mkey
}

// tracking prom metrics
prom := soltxmProm{id: id}

t.Run("WithRebroadcast", func(t *testing.T) {
txExpirationRebroadcast := true
statuses := map[solana.Signature]func() *rpc.SignatureStatusesResult{}
Expand Down Expand Up @@ -1318,7 +1248,12 @@ func TestTxm_ExpirationRebroadcast(t *testing.T) {
txID := "test-rebroadcast"
assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &txID))
wg.Wait()
time.Sleep(2 * time.Second) // Sleep to allow for rebroadcasting
waitFor(t, txm.cfg.TxConfirmTimeout(), txm, prom, empty)

// check prom metric
prom.confirmed++
prom.finalized++
prom.assertEqual(t)

// Check that transaction for txID has been finalized and rebroadcasted
status, err := txm.GetTransactionStatus(ctx, txID)
Expand Down Expand Up @@ -1366,7 +1301,12 @@ func TestTxm_ExpirationRebroadcast(t *testing.T) {
txID := "test-no-rebroadcast"
assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &txID))
wg.Wait()
time.Sleep(2 * time.Second) // Sleep to ensure no rebroadcast
waitFor(t, txm.cfg.TxConfirmTimeout(), txm, prom, empty)

// check prom metric
prom.drop++
prom.error++
prom.assertEqual(t)

// Check that transaction for txID has not been finalized and has not been rebroadcasted
status, err := txm.GetTransactionStatus(ctx, txID)
Expand All @@ -1384,7 +1324,7 @@ func TestTxm_ExpirationRebroadcast(t *testing.T) {
slotHeightFunc := func() (uint64, error) {
return uint64(1500), nil
}
// Mock LatestBlockhash to return a invalid blockhash in the first 3 attempts (initial + 2 rebroadcasts)
// Mock LatestBlockhash to return an invalid blockhash in the first 3 attempts (initial + 2 rebroadcasts)
// the last one is valid because it is greater than the slotHeight
expectedRebroadcastsCount := 3
callCount := 0
Expand Down Expand Up @@ -1441,7 +1381,12 @@ func TestTxm_ExpirationRebroadcast(t *testing.T) {
txID := "test-rebroadcast"
assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &txID))
wg.Wait()
time.Sleep(2 * time.Second) // Sleep to allow for rebroadcasting
waitFor(t, txm.cfg.TxConfirmTimeout(), txm, prom, empty)

// check prom metric
prom.confirmed++
prom.finalized++
prom.assertEqual(t)

// Check that transaction for txID has been finalized and rebroadcasted
status, err := txm.GetTransactionStatus(ctx, txID)
Expand Down Expand Up @@ -1498,7 +1443,12 @@ func TestTxm_ExpirationRebroadcast(t *testing.T) {
txID := "test-confirmed-before-rebroadcast"
assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &txID))
wg.Wait()
time.Sleep(1 * time.Second) // Allow for processing
waitFor(t, txm.cfg.TxConfirmTimeout(), txm, prom, empty)

// check prom metric
prom.confirmed++
prom.finalized++
prom.assertEqual(t)

// Check that transaction has been finalized without rebroadcast
status, err := txm.GetTransactionStatus(ctx, txID)
Expand Down Expand Up @@ -1540,27 +1490,36 @@ func TestTxm_ExpirationRebroadcast(t *testing.T) {
return sig1, nil
}

var wg sync.WaitGroup
wg.Add(1)
count := 0
statuses[sig1] = func() *rpc.SignatureStatusesResult {
defer func() { count++ }()
// Transaction remains unconfirmed
if count == 1 {
wg.Done()
}
return nil
}

txm, _, mkey := setupTxmTest(txExpirationRebroadcast, latestBlockhashFunc, slotHeightFunc, sendTxFunc, statuses)
tx, _ := getTx(t, 0, mkey)
txID := "test-rebroadcast-error"
assert.NoError(t, txm.Enqueue(ctx, t.Name(), tx, &txID))
time.Sleep(2 * time.Second) // Allow for processing
wg.Wait()
waitFor(t, cfg.TxConfirmTimeout(), txm, prom, empty)

// TODO: Add check that transaction status is failed due to rebroadcast error when prebroadcast is implemented and we have an error in sendWithRetry
// check prom metric
prom.drop++
prom.error++
prom.assertEqual(t)

// Transaction should be moved to failed after trying to rebroadcast and failing to get confirmations
status, err := txm.GetTransactionStatus(ctx, txID)
require.NoError(t, err)
require.Equal(t, types.Pending, status) // TODO: Change to Failed when prebroadcast error is implemented
require.Equal(t, types.Failed, status)
rebroadcastCount, err := txm.txs.GetTxRebroadcastCount(txID)
require.NoError(t, err)
require.Equal(t, 1, rebroadcastCount) // Attempted to rebroadcast 1 time but encountered error
time.Sleep(2 * time.Second) // Allow for processing
rebroadcastCount, err = txm.txs.GetTxRebroadcastCount(txID) // rebroadcast should still be 1. We should not be rebroadcasting.
require.NoError(t, err)
require.Equal(t, 1, rebroadcastCount)
})
}
Loading

0 comments on commit a45fa1e

Please sign in to comment.