Skip to content

Commit

Permalink
tests and fix some bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
Farber98 committed Dec 9, 2024
1 parent 53948e1 commit fdc8068
Show file tree
Hide file tree
Showing 5 changed files with 589 additions and 36 deletions.
82 changes: 55 additions & 27 deletions pkg/solana/txm/pendingtx.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ type PendingTxContext interface {
TrimFinalizedErroredTxs() int
// GetSignatureInfo returns the transaction ID and TxState for the provided signature
GetSignatureInfo(sig solana.Signature) (txInfo, error)
// UpdateSignatureStatus updates the status of a signature in the SigToTxInfo map
UpdateSignatureStatus(sig solana.Signature, status TxState) error
// TxHasReorg determines whether a reorg has occurred for a given tx.
// It achieves this by comparing the highest aggregated state across all associated signatures with the current state of the transaction.
// If the highest aggregated state is less than the current state, a reorg has occurred and we need to handle it.
TxHasReorg(id string) bool
// OnReorg resets the transaction state to Broadcasted for the given signature and returns the pendingTx for retrying.
OnReorg(sig solana.Signature) (pendingTx, error)
OnReorg(sig solana.Signature, id string) (pendingTx, error)
}

// finishedTx is used to store info required to track transactions to finality or error
Expand Down Expand Up @@ -569,19 +571,12 @@ func (c *pendingTxContext) GetSignatureInfo(sig solana.Signature) (txInfo, error
return info, nil
}

func (c *pendingTxContext) OnReorg(sig solana.Signature) (pendingTx, error) {
// Acquire a read lock to check if the signature exists and needs to be reset
func (c *pendingTxContext) OnReorg(sig solana.Signature, id string) (pendingTx, error) {
err := c.withReadLock(func() error {
// Check if the signature is still being tracked
info, exists := c.sigToTxInfo[sig]
if !exists {
return ErrSigDoesNotExist
}

// Check if the transaction is still in a non finalized/errored state
var broadcastedExists, confirmedExists bool
_, broadcastedExists = c.broadcastedProcessedTxs[info.id]
_, confirmedExists = c.confirmedTxs[info.id]
_, broadcastedExists = c.broadcastedProcessedTxs[id]
_, confirmedExists = c.confirmedTxs[id]
if !broadcastedExists && !confirmedExists {
return ErrTransactionNotFound
}
Expand All @@ -593,19 +588,14 @@ func (c *pendingTxContext) OnReorg(sig solana.Signature) (pendingTx, error) {
}

var pTx pendingTx
// Acquire a write lock to perform the state reset
// Acquire a write lock to perform the state reset if needed
_, err = c.withWriteLock(func() (string, error) {
// Retrieve sig and tx again inside the write lock
info, exists := c.sigToTxInfo[sig]
if !exists {
return "", ErrSigDoesNotExist
}

tx, broadcastedProcessedExists := c.broadcastedProcessedTxs[info.id]
// Retrieve tx again inside the write lock
tx, broadcastedProcessedExists := c.broadcastedProcessedTxs[id]
if broadcastedProcessedExists {
pTx = tx
}
tx, confirmedExists := c.confirmedTxs[info.id]
tx, confirmedExists := c.confirmedTxs[id]
if confirmedExists {
pTx = tx
}
Expand All @@ -622,19 +612,18 @@ func (c *pendingTxContext) OnReorg(sig solana.Signature) (pendingTx, error) {
// on the next status polling cycle.
// This approach does not introduce any risk with the expiration logic since
// we check for status changes before considering a transaction for expiration.
info.state, pTx.state = Broadcasted, Broadcasted
c.sigToTxInfo[sig] = info
c.broadcastedProcessedTxs[info.id] = pTx
pTx.state = Broadcasted
c.broadcastedProcessedTxs[id] = pTx

// If the transaction regressed from confirmed state, we also need to remove it from the confirmed map
if confirmedExists {
delete(c.confirmedTxs, info.id)
delete(c.confirmedTxs, id)
}

return "", nil
})
if err != nil {
// If transaction or sig were not found
// If transaction was not found
return pendingTx{}, err
}

Expand Down Expand Up @@ -678,6 +667,41 @@ func (c *pendingTxContext) TxHasReorg(id string) bool {
return highestSigAggState < pTx.state
}

func (c *pendingTxContext) UpdateSignatureStatus(sig solana.Signature, status TxState) error {
// Acquire a read lock to check if the signature exists and needs to be reset
err := c.withReadLock(func() error {
// Check if the signature is still being tracked
_, exists := c.sigToTxInfo[sig]
if !exists {
return ErrSigDoesNotExist
}
return nil
})
if err != nil {
// If sig not found, return
return err
}

// Acquire a write lock to perform the state reset
_, err = c.withWriteLock(func() (string, error) {
// Retrieve sig again inside the write lock
info, exists := c.sigToTxInfo[sig]
if !exists {
return "", ErrSigDoesNotExist
}
// Update the status of the signature
info.state = status
c.sigToTxInfo[sig] = info
return "", nil
})
if err != nil {
// If sig was not found
return err
}

return nil
}

func (c *pendingTxContext) withReadLock(fn func() error) error {
c.lock.RLock()
defer c.lock.RUnlock()
Expand Down Expand Up @@ -809,10 +833,14 @@ func (c *pendingTxContextWithProm) GetSignatureInfo(sig solana.Signature) (txInf
return c.pendingTx.GetSignatureInfo(sig)
}

func (c *pendingTxContextWithProm) OnReorg(sig solana.Signature) (pendingTx, error) {
return c.pendingTx.OnReorg(sig)
func (c *pendingTxContextWithProm) OnReorg(sig solana.Signature, id string) (pendingTx, error) {
return c.pendingTx.OnReorg(sig, id)
}

func (c *pendingTxContextWithProm) TxHasReorg(id string) bool {
return c.pendingTx.TxHasReorg(id)
}

func (c *pendingTxContextWithProm) UpdateSignatureStatus(sig solana.Signature, status TxState) error {
return c.pendingTx.UpdateSignatureStatus(sig, status)
}
217 changes: 217 additions & 0 deletions pkg/solana/txm/pendingtx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1375,3 +1375,220 @@ func TestPendingTxContext_ListAllExpiredBroadcastedTxs(t *testing.T) {
})
}
}

func TestPendingTxContext_UpdateSignatureStatus(t *testing.T) {
t.Parallel()
txs := newPendingTxContext()
sig := randomSignature(t)
txID := uuid.NewString()
cancelFunc := func() {}

// Add new transaction and signature
tx := pendingTx{id: txID}
require.NoError(t, txs.New(tx))
require.NoError(t, txs.AddSignature(cancelFunc, txID, sig))

// updates signature status successfully
err := txs.UpdateSignatureStatus(sig, Confirmed)
require.NoError(t, err)
txInfo, exists := txs.sigToTxInfo[sig]
require.True(t, exists)
require.Equal(t, Confirmed, txInfo.state)

// updating non-existent signature returs error
nonExistentSig := randomSignature(t)
err = txs.UpdateSignatureStatus(nonExistentSig, Confirmed)
require.ErrorIs(t, err, ErrSigDoesNotExist)

// Test concurrent updates to ensure thread safety
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(status TxState) {
defer wg.Done()
err := txs.UpdateSignatureStatus(sig, status)
require.NoError(t, err)
}(Confirmed)
}
wg.Wait()

// Verify final status
txInfo, exists = txs.sigToTxInfo[sig]
require.True(t, exists)
require.Equal(t, Confirmed, txInfo.state)
}

func createTxAndAddSig(t *testing.T, txs *pendingTxContext) (string, solana.Signature) {
sig := randomSignature(t)
txID := uuid.NewString()
tx := pendingTx{id: txID}
require.NoError(t, txs.New(tx))
require.NoError(t, txs.AddSignature(func() {}, txID, sig))
return txID, sig
}

func TestPendingTxContext_OnReorg(t *testing.T) {
t.Parallel()
txs := newPendingTxContext()
t.Run("successfully reset transaction from Processed to Broadcasted", func(t *testing.T) {
// Transition to Processed state
txID, sig := createTxAndAddSig(t, txs)
_, err := txs.OnProcessed(sig)
require.NoError(t, err)

// Call OnReorg
pTx, err := txs.OnReorg(sig, txID)
require.NoError(t, err)
require.Equal(t, Broadcasted, pTx.state)

// Verify the transaction's state is reset to Broadcasted
txInfo, exists := txs.broadcastedProcessedTxs[txID]
require.True(t, exists)
require.Equal(t, Broadcasted, txInfo.state)
})

t.Run("successfully reset transaction from Confirmed to Broadcasted", func(t *testing.T) {
// Transition to Processed and then Confirmed state
txID, sig := createTxAndAddSig(t, txs)
_, err := txs.OnProcessed(sig)
require.NoError(t, err)
_, err = txs.OnConfirmed(sig)
require.NoError(t, err)

// Call OnReorg
pTx, err := txs.OnReorg(sig, txID)
require.NoError(t, err)
require.Equal(t, Broadcasted, pTx.state)

// Verify the transaction's state is reset to Broadcasted
txInfo, exists := txs.broadcastedProcessedTxs[txID]
require.True(t, exists)
require.Equal(t, Broadcasted, txInfo.state)

// Ensure it's removed from confirmed transactions
_, exists = txs.confirmedTxs[txID]
require.False(t, exists)
})

t.Run("fail to reset transaction in Finalized state", func(t *testing.T) {
// Transition to Processed, Confirmed, and then Finalized state
txID, sig := createTxAndAddSig(t, txs)
_, err := txs.OnProcessed(sig)
require.NoError(t, err)
_, err = txs.OnConfirmed(sig)
require.NoError(t, err)
_, err = txs.OnFinalized(sig, 10*time.Second)
require.NoError(t, err)

// Call OnReorg
_, err = txs.OnReorg(sig, txID)
require.Error(t, err)
require.Equal(t, ErrTransactionNotFound, err)
})

t.Run("fail to reset transaction in Errored state", func(t *testing.T) {
// Transition to Errored state
txID, sig := createTxAndAddSig(t, txs)
_, err := txs.OnError(sig, 10*time.Second, Errored, 0)
require.NoError(t, err)

// Call OnReorg
_, err = txs.OnReorg(sig, txID)
require.Error(t, err)
require.Equal(t, ErrTransactionNotFound, err)
})

t.Run("fail to reset non-existent transaction", func(t *testing.T) {
_, err := txs.OnReorg(randomSignature(t), "non-existent")
require.Error(t, err)
require.Equal(t, ErrTransactionNotFound, err)
})
}

func TestPendingTxContext_GetSignatureInfo(t *testing.T) {
t.Parallel()
// Initialize a new pendingTxContext
txs := newPendingTxContext()
t.Run("successfully retrieve existing signature info", func(t *testing.T) {
txID, sig := createTxAndAddSig(t, txs)
// Retrieve the signature info
info, err := txs.GetSignatureInfo(sig)
require.NoError(t, err)
require.Equal(t, txID, info.id)
require.Equal(t, Broadcasted, info.state)
})

t.Run("fail to retrieve non-existent signature info", func(t *testing.T) {
nonExistentSig := randomSignature(t)

// Attempt to retrieve info for a signature that doesn't exist
_, err := txs.GetSignatureInfo(nonExistentSig)
require.ErrorIs(t, err, ErrSigDoesNotExist)
})

t.Run("concurrent access to GetSignatureInfo", func(t *testing.T) {
txID, sig := createTxAndAddSig(t, txs)

// Perform concurrent reads
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
info, err := txs.GetSignatureInfo(sig)
require.NoError(t, err)
require.Equal(t, txID, info.id)
}()
}
wg.Wait()
})
}

func TestPendingTxContext_TxHasReorg(t *testing.T) {
t.Parallel()
txs := newPendingTxContext()
cancelFunc := func() {}
t.Run("no reorg: tx does not exist", func(t *testing.T) {
hasReorg := txs.TxHasReorg("non-existent")
require.False(t, hasReorg, "expected no reorg for non-existent transaction")
})

t.Run("no reorg: a signature >= transaction state", func(t *testing.T) {
// Create transaction and add signatures
txID, sig1 := createTxAndAddSig(t, txs)
sig2 := randomSignature(t)
require.NoError(t, txs.AddSignature(cancelFunc, txID, sig2))

// Transition transaction to Confirmed through sig1
_, err := txs.OnProcessed(sig1)
require.NoError(t, err)
_, err = txs.OnConfirmed(sig1)
require.NoError(t, err)

// sig1 is Confirmed and sig2 is Broadcasted.
// TxHasReorg should return false because sig1 >= tx state = Confirmed
hasReorg := txs.TxHasReorg(txID)
require.False(t, hasReorg, "expected no reorg when all signatures are >= transaction state")
})

t.Run("reorg: all signatures < transaction state", func(t *testing.T) {
// Create transaction and add signatures
txID, sig1 := createTxAndAddSig(t, txs)
sig2 := randomSignature(t)
require.NoError(t, txs.AddSignature(cancelFunc, txID, sig2))

// Transition transaction to Confirmed through sig1
_, err := txs.OnProcessed(sig1)
require.NoError(t, err)
_, err = txs.OnConfirmed(sig1)
require.NoError(t, err)

// Regress sig1 to processed state
require.NoError(t, txs.UpdateSignatureStatus(sig1, Processed))

// Now, sig1 is in Processed state and sig2 is in Broadcasted state.
// TxHasReorg should return true because all sigs are < transaction state = Confirmed
hasReorg := txs.TxHasReorg(txID)
require.True(t, hasReorg, "expected reorg when all signatures are < transaction state")
})
}
Loading

0 comments on commit fdc8068

Please sign in to comment.