Skip to content

Commit

Permalink
[NONEVM-706][Solana] - Refactor TXM + Rebroadcast Expired Tx function…
Browse files Browse the repository at this point in the history
…ality (#946)

* refactor so txm owns blockhash assignment

* lastValidBlockHeight shouldn't be exported

* better comment

* refactor sendWithRetry to make it clearer

* confirm loop refactor

* fix infinite loop

* move accountID inside msg

* lint fix

* base58 does not contain lower l

* fix hash errors

* fix generate random hash

* remove blockhash as we only need block height

* expired tx changes without tests

* add maybe to mocks

* expiration tests

* send txes through queue

* revert pendingtx leakage of information. overwrite blockhash

* fix order of confirm loop and not found signature check

* fix mocks

* prevent confirmation loop to mark tx as errored when it needs to be rebroadcasted

* fix test

* fix pointer

* add comments

* reduce rpc calls + refactors

* tests + check to save rpc calls

* address feedback + remove redundant impl

* iface comment

* address feedback on compute unit limit and lastValidBlockHeight assignment

* blockhash assignment inside txm.sendWithRetry

* address feedback

* Merge branch 'develop' into nonevm-706-support-custom-bumping-strategy-rpc-expiration-within-confirmation

* refactors after merge

* fix interactive rebase

* fix whitespace diffs

* fix import

* fix mocks

* add on prebroadcaste error

* remove rebroadcast count and fix package

* improve docs

* fix comparison against blockHeight instead of slotHeight

* address feedback

* fix lint

* fix log

* address feedback

* remove useless slot height

* address feedback

* validate that tx doesn't exist in any of maps when adding new tx

* callers set lastValidBlockheight + get blockhash on expiration + integration tests

* add enq iface comm to help callers

* address feedback
  • Loading branch information
Farber98 authored and dhaidashenko committed Dec 20, 2024
1 parent 6367394 commit ec749e1
Show file tree
Hide file tree
Showing 16 changed files with 1,459 additions and 473 deletions.
5 changes: 3 additions & 2 deletions docs/relay/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ chainlink nodes solana create --name=<node-name> --chain-id=<chain-id> --url=<ur
| `OCR2CacheTTL` | stale OCR2 cache deadline | 1m | |
| `TxTimeout` | timeout to send tx to rpc endpoint | 1m | |
| `TxRetryTimeout` | duration for tx to be rebroadcast to rpc, txm stops rebroadcast after timeout | 10s | |
| `TxConfirmTimeout` | duration when confirming a tx signature before signature is discarded as unconfirmed | 30s | |
| `SkipPreflight` | enable or disable preflight checks when sending tx | `true` | `true`, `false` |
| `TxConfirmTimeout` | duration when confirming a tx signature before signature is discarded as unconfirmed | 30s |
| `TxExpirationRebroadcast` | enables or disables transaction rebroadcast if expired. Expiration check is performed every `ConfirmPollPeriod`. A transaction is considered expired if the blockhash it was sent with is 150 blocks older than the latest blockhash. | `false` | `true`, `false` |
| `SkipPreflight` | enables or disables preflight checks when sending tx | `true` | `true`, `false` |
| `Commitment` | Confirmation level for solana state and transactions. ([documentation](https://docs.solana.com/developing/clients/jsonrpc-api#configuring-state-commitment)) | `confirmed` | `processed`, `confirmed`, `finalized` |
| `MaxRetries` | Parameter when sending transactions, how many times the RPC node will automatically rebroadcast a tx, default = `0` for custom txm rebroadcasting method, set to `-1` to use the RPC node's default retry strategy | `0` | |
2 changes: 1 addition & 1 deletion pkg/solana/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ func (c *chain) sendTx(ctx context.Context, from, to string, amount *big.Int, ba
}

chainTxm := c.TxManager()
err = chainTxm.Enqueue(ctx, "", tx, nil,
err = chainTxm.Enqueue(ctx, "", tx, nil, blockhash.Value.LastValidBlockHeight,
txm.SetComputeUnitLimit(500), // reduce from default 200K limit - should only take 450 compute units
// no fee bumping and no additional fee - makes validating balance accurate
txm.SetComputeUnitPriceMax(0),
Expand Down
13 changes: 8 additions & 5 deletions pkg/solana/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ func TestSolanaChain_MultiNode_Txm(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, uint64(0), receiverBal)

createTx := func(signer solana.PublicKey, sender solana.PublicKey, receiver solana.PublicKey, amt uint64) *solana.Transaction {
createTx := func(signer solana.PublicKey, sender solana.PublicKey, receiver solana.PublicKey, amt uint64) (*solana.Transaction, uint64) {
selectedClient, err = testChain.getClient()
assert.NoError(t, err)
hash, hashErr := selectedClient.LatestBlockhash(tests.Context(t))
Expand All @@ -553,11 +553,12 @@ func TestSolanaChain_MultiNode_Txm(t *testing.T) {
solana.TransactionPayer(signer),
)
require.NoError(t, txErr)
return tx
return tx, hash.Value.LastValidBlockHeight
}

// Send funds twice, along with an invalid transaction
require.NoError(t, testChain.txm.Enqueue(tests.Context(t), "test_success", createTx(pubKey, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL), nil))
tx, lastValidBlockHeight := createTx(pubKey, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL)
require.NoError(t, testChain.txm.Enqueue(tests.Context(t), "test_success", tx, nil, lastValidBlockHeight))

// Wait for new block hash
currentBh, err := selectedClient.LatestBlockhash(tests.Context(t))
Expand All @@ -578,8 +579,10 @@ NewBlockHash:
}
}

require.NoError(t, testChain.txm.Enqueue(tests.Context(t), "test_success_2", createTx(pubKey, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL), nil))
require.Error(t, testChain.txm.Enqueue(tests.Context(t), "test_invalidSigner", createTx(pubKeyReceiver, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL), nil)) // cannot sign tx before enqueuing
tx2, lastValidBlockHeight2 := createTx(pubKey, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL)
require.NoError(t, testChain.txm.Enqueue(tests.Context(t), "test_success_2", tx2, nil, lastValidBlockHeight2))
tx3, lastValidBlockHeight3 := createTx(pubKeyReceiver, pubKey, pubKeyReceiver, solana.LAMPORTS_PER_SOL)
require.Error(t, testChain.txm.Enqueue(tests.Context(t), "test_invalidSigner", tx3, nil, lastValidBlockHeight3)) // cannot sign tx before enqueuing

// wait for all txes to finish
ctx, cancel := context.WithCancel(tests.Context(t))
Expand Down
28 changes: 17 additions & 11 deletions pkg/solana/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@ import (

// Global solana defaults.
var defaultConfigSet = Chain{
BalancePollPeriod: config.MustNewDuration(5 * time.Second), // poll period for balance monitoring
ConfirmPollPeriod: config.MustNewDuration(500 * time.Millisecond), // polling for tx confirmation
OCR2CachePollPeriod: config.MustNewDuration(time.Second), // cache polling rate
OCR2CacheTTL: config.MustNewDuration(time.Minute), // stale cache deadline
TxTimeout: config.MustNewDuration(time.Minute), // timeout for send tx method in client
TxRetryTimeout: config.MustNewDuration(10 * time.Second), // duration for tx rebroadcasting to RPC node
TxConfirmTimeout: config.MustNewDuration(30 * time.Second), // duration before discarding tx as unconfirmed. Set to 0 to disable discarding tx.
TxRetentionTimeout: config.MustNewDuration(0 * time.Second), // duration to retain transactions after being marked as finalized or errored. Set to 0 to immediately drop transactions.
SkipPreflight: ptr(true), // to enable or disable preflight checks
Commitment: ptr(string(rpc.CommitmentConfirmed)),
MaxRetries: ptr(int64(0)), // max number of retries (default = 0). when config.MaxRetries < 0), interpreted as MaxRetries = nil and rpc node will do a reasonable number of retries
BalancePollPeriod: config.MustNewDuration(5 * time.Second), // poll period for balance monitoring
ConfirmPollPeriod: config.MustNewDuration(500 * time.Millisecond), // polling for tx confirmation
OCR2CachePollPeriod: config.MustNewDuration(time.Second), // cache polling rate
OCR2CacheTTL: config.MustNewDuration(time.Minute), // stale cache deadline
TxTimeout: config.MustNewDuration(time.Minute), // timeout for send tx method in client
TxRetryTimeout: config.MustNewDuration(10 * time.Second), // duration for tx rebroadcasting to RPC node
TxConfirmTimeout: config.MustNewDuration(30 * time.Second), // duration before discarding tx as unconfirmed. Set to 0 to disable discarding tx.
TxExpirationRebroadcast: ptr(false), // to enable rebroadcasting of expired transactions
TxRetentionTimeout: config.MustNewDuration(0 * time.Second), // duration to retain transactions after being marked as finalized or errored. Set to 0 to immediately drop transactions.
SkipPreflight: ptr(true), // to enable or disable preflight checks
Commitment: ptr(string(rpc.CommitmentConfirmed)),
MaxRetries: ptr(int64(0)), // max number of retries (default = 0). when config.MaxRetries < 0), interpreted as MaxRetries = nil and rpc node will do a reasonable number of retries

// fee estimator
FeeEstimatorMode: ptr("fixed"),
Expand All @@ -43,6 +44,7 @@ type Config interface {
TxTimeout() time.Duration
TxRetryTimeout() time.Duration
TxConfirmTimeout() time.Duration
TxExpirationRebroadcast() bool
TxRetentionTimeout() time.Duration
SkipPreflight() bool
Commitment() rpc.CommitmentType
Expand All @@ -68,6 +70,7 @@ type Chain struct {
TxTimeout *config.Duration
TxRetryTimeout *config.Duration
TxConfirmTimeout *config.Duration
TxExpirationRebroadcast *bool
TxRetentionTimeout *config.Duration
SkipPreflight *bool
Commitment *string
Expand Down Expand Up @@ -105,6 +108,9 @@ func (c *Chain) SetDefaults() {
if c.TxConfirmTimeout == nil {
c.TxConfirmTimeout = defaultConfigSet.TxConfirmTimeout
}
if c.TxExpirationRebroadcast == nil {
c.TxExpirationRebroadcast = defaultConfigSet.TxExpirationRebroadcast
}
if c.TxRetentionTimeout == nil {
c.TxRetentionTimeout = defaultConfigSet.TxRetentionTimeout
}
Expand Down
45 changes: 45 additions & 0 deletions pkg/solana/config/mocks/config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions pkg/solana/config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ func setFromChain(c, f *Chain) {
if f.TxConfirmTimeout != nil {
c.TxConfirmTimeout = f.TxConfirmTimeout
}
if f.TxExpirationRebroadcast != nil {
c.TxExpirationRebroadcast = f.TxExpirationRebroadcast
}
if f.TxRetentionTimeout != nil {
c.TxRetentionTimeout = f.TxRetentionTimeout
}
Expand Down Expand Up @@ -241,6 +244,10 @@ func (c *TOMLConfig) TxConfirmTimeout() time.Duration {
return c.Chain.TxConfirmTimeout.Duration()
}

func (c *TOMLConfig) TxExpirationRebroadcast() bool {
return *c.Chain.TxExpirationRebroadcast
}

func (c *TOMLConfig) TxRetentionTimeout() time.Duration {
return c.Chain.TxRetentionTimeout.Duration()
}
Expand Down
10 changes: 9 additions & 1 deletion pkg/solana/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,15 @@ import (
var _ TxManager = (*txm.Txm)(nil)

type TxManager interface {
Enqueue(ctx context.Context, accountID string, tx *solana.Transaction, txID *string, txCfgs ...txm.SetTxConfig) error
// Enqueue adds a tx to the txm queue for processing and submitting to the Solana network.
// An error is returned if the txm is not ready, if the tx is invalid, or if the queue is full.
//
// Important Notes:
// - The tx must contain at least one account key. The first account will be used to sign the tx (fee payer's public key).
// - txCfgs can be used to set custom tx configurations.
// - If a txID is provided, it will be used to identify the tx. Otherwise, a random UUID will be generated.
// - The caller needs to set the tx.Message.RecentBlockhash and provide the corresponding lastValidBlockHeight. These values are obtained from the GetLatestBlockhash RPC call.
Enqueue(ctx context.Context, accountID string, tx *solana.Transaction, txID *string, lastValidBlockHeight uint64, txCfgs ...txm.SetTxConfig) error
}

var _ relaytypes.Relayer = &Relayer{} //nolint:staticcheck
Expand Down
2 changes: 1 addition & 1 deletion pkg/solana/transmitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (c *Transmitter) Transmit(

// pass transmit payload to tx manager queue
c.lggr.Debugf("Queuing transmit tx: state (%s) + transmissions (%s)", c.stateID.String(), c.transmissionsID.String())
if err = c.txManager.Enqueue(ctx, c.stateID.String(), tx, nil); err != nil {
if err = c.txManager.Enqueue(ctx, c.stateID.String(), tx, nil, blockhash.Value.LastValidBlockHeight); err != nil {
return fmt.Errorf("error on Transmit.txManager.Enqueue: %w", err)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/solana/transmitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type verifyTxSize struct {
s *solana.PrivateKey
}

func (txm verifyTxSize) Enqueue(_ context.Context, _ string, tx *solana.Transaction, txID *string, _ ...txm.SetTxConfig) error {
func (txm verifyTxSize) Enqueue(_ context.Context, _ string, tx *solana.Transaction, txID *string, _ uint64, _ ...txm.SetTxConfig) error {
// additional components that transaction manager adds to the transaction
require.NoError(txm.t, fees.SetComputeUnitPrice(tx, 0))
require.NoError(txm.t, fees.SetComputeUnitLimit(tx, 0))
Expand Down
Loading

0 comments on commit ec749e1

Please sign in to comment.