Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop unused queryTimeout config from TXM strategy #12859

Merged
merged 5 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/new-forks-grab.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": removed
---

Drop unused queryTimeout config from TXM strategy #internal
18 changes: 6 additions & 12 deletions common/txmgr/strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package txmgr
import (
"context"
"fmt"
"time"

"github.com/google/uuid"

Expand All @@ -14,9 +13,9 @@ var _ txmgrtypes.TxStrategy = SendEveryStrategy{}

// NewQueueingTxStrategy creates a new TxStrategy that drops the oldest transactions after the
// queue size is exceeded if a queue size is specified, and otherwise does not drop transactions.
func NewQueueingTxStrategy(subject uuid.UUID, queueSize uint32, queryTimeout time.Duration) (strategy txmgrtypes.TxStrategy) {
func NewQueueingTxStrategy(subject uuid.UUID, queueSize uint32) (strategy txmgrtypes.TxStrategy) {
if queueSize > 0 {
strategy = NewDropOldestStrategy(subject, queueSize, queryTimeout)
strategy = NewDropOldestStrategy(subject, queueSize)
} else {
strategy = SendEveryStrategy{}
}
Expand All @@ -41,26 +40,21 @@ var _ txmgrtypes.TxStrategy = DropOldestStrategy{}
// DropOldestStrategy will send the newest N transactions, older ones will be
// removed from the queue
type DropOldestStrategy struct {
subject uuid.UUID
queueSize uint32
queryTimeout time.Duration
subject uuid.UUID
queueSize uint32
}

// NewDropOldestStrategy creates a new TxStrategy that drops the oldest transactions after the
// queue size is exceeded.
func NewDropOldestStrategy(subject uuid.UUID, queueSize uint32, queryTimeout time.Duration) DropOldestStrategy {
return DropOldestStrategy{subject, queueSize, queryTimeout}
func NewDropOldestStrategy(subject uuid.UUID, queueSize uint32) DropOldestStrategy {
return DropOldestStrategy{subject, queueSize}
}

func (s DropOldestStrategy) Subject() uuid.NullUUID {
return uuid.NullUUID{UUID: s.subject, Valid: true}
}

func (s DropOldestStrategy) PruneQueue(ctx context.Context, pruneService txmgrtypes.UnstartedTxQueuePruner) (ids []int64, err error) {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, s.queryTimeout)
defer cancel()

// NOTE: We prune one less than the queue size to prevent the queue from exceeding the max queue size. Which could occur if a new transaction is added to the queue right after we prune.
ids, err = pruneService.PruneUnstartedTxQueue(ctx, s.queueSize-1, s.subject)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/txmgr/evm_tx_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1841,7 +1841,7 @@ func TestORM_PruneUnstartedTxQueue(t *testing.T) {

t.Run("does not prune if queue has not exceeded capacity-1", func(t *testing.T) {
subject1 := uuid.New()
strategy1 := txmgrcommon.NewDropOldestStrategy(subject1, uint32(5), cfg.Database().DefaultQueryTimeout())
strategy1 := txmgrcommon.NewDropOldestStrategy(subject1, uint32(5))
for i := 0; i < 5; i++ {
mustCreateUnstartedGeneratedTx(t, txStore, fromAddress, &cltest.FixtureChainID, txRequestWithStrategy(strategy1))
}
Expand All @@ -1850,7 +1850,7 @@ func TestORM_PruneUnstartedTxQueue(t *testing.T) {

t.Run("prunes if queue has exceeded capacity-1", func(t *testing.T) {
subject2 := uuid.New()
strategy2 := txmgrcommon.NewDropOldestStrategy(subject2, uint32(3), cfg.Database().DefaultQueryTimeout())
strategy2 := txmgrcommon.NewDropOldestStrategy(subject2, uint32(3))
for i := 0; i < 5; i++ {
mustCreateUnstartedGeneratedTx(t, txStore, fromAddress, &cltest.FixtureChainID, txRequestWithStrategy(strategy2))
}
Expand Down
8 changes: 2 additions & 6 deletions core/chains/evm/txmgr/strategies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
txmgrcommon "github.com/smartcontractkit/chainlink/v2/common/txmgr"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr/mocks"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest"
)

func Test_SendEveryStrategy(t *testing.T) {
Expand All @@ -28,25 +27,22 @@ func Test_SendEveryStrategy(t *testing.T) {

func Test_DropOldestStrategy_Subject(t *testing.T) {
t.Parallel()
cfg := configtest.NewGeneralConfig(t, nil)

subject := uuid.New()
s := txmgrcommon.NewDropOldestStrategy(subject, 1, cfg.Database().DefaultQueryTimeout())
s := txmgrcommon.NewDropOldestStrategy(subject, 1)

assert.True(t, s.Subject().Valid)
assert.Equal(t, subject, s.Subject().UUID)
}

func Test_DropOldestStrategy_PruneQueue(t *testing.T) {
t.Parallel()
cfg := configtest.NewGeneralConfig(t, nil)
subject := uuid.New()
queueSize := uint32(2)
queryTimeout := cfg.Database().DefaultQueryTimeout()
mockTxStore := mocks.NewEvmTxStore(t)

t.Run("calls PrineUnstartedTxQueue for the given subject and queueSize, ignoring fromAddress", func(t *testing.T) {
strategy1 := txmgrcommon.NewDropOldestStrategy(subject, queueSize, queryTimeout)
strategy1 := txmgrcommon.NewDropOldestStrategy(subject, queueSize)
mockTxStore.On("PruneUnstartedTxQueue", mock.Anything, queueSize-1, subject, mock.Anything, mock.Anything).Once().Return([]int64{1, 2}, nil)
ids, err := strategy1.PruneQueue(testutils.Context(t), mockTxStore)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion core/services/blockhashstore/bhs.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (c *BulletproofBHS) Store(ctx context.Context, blockNum uint64) error {

// Set a queue size of 256. At most we store the blockhash of every block, and only the
// latest 256 can possibly be stored.
Strategy: txmgrcommon.NewQueueingTxStrategy(c.jobID, 256, c.dbConfig.DefaultQueryTimeout()),
Strategy: txmgrcommon.NewQueueingTxStrategy(c.jobID, 256),
})
if err != nil {
return errors.Wrap(err, "creating transaction")
Expand Down
2 changes: 1 addition & 1 deletion core/services/fluxmonitorv2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, jb job.Job) (services []
return nil, err
}
cfg := chain.Config()
strategy := txmgrcommon.NewQueueingTxStrategy(jb.ExternalJobID, cfg.FluxMonitor().DefaultTransactionQueueDepth(), cfg.Database().DefaultQueryTimeout())
strategy := txmgrcommon.NewQueueingTxStrategy(jb.ExternalJobID, cfg.FluxMonitor().DefaultTransactionQueueDepth())
var checker txmgr.TransmitCheckerSpec
if chain.Config().FluxMonitor().SimulateTransactions() {
checker.CheckerType = txmgr.TransmitCheckerTypeSimulate
Expand Down
2 changes: 1 addition & 1 deletion core/services/ocr/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, jb job.Job) (services []
}

cfg := chain.Config()
strategy := txmgrcommon.NewQueueingTxStrategy(jb.ExternalJobID, cfg.OCR().DefaultTransactionQueueDepth(), cfg.Database().DefaultQueryTimeout())
strategy := txmgrcommon.NewQueueingTxStrategy(jb.ExternalJobID, cfg.OCR().DefaultTransactionQueueDepth())

var checker txmgr.TransmitCheckerSpec
if chain.Config().OCR().SimulateTransactions() {
Expand Down
2 changes: 1 addition & 1 deletion core/services/relay/evm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ func newOnChainContractTransmitter(ctx context.Context, lggr logger.Logger, rarg
subject = *opts.subjectID
}
scoped := configWatcher.chain.Config()
strategy := txmgrcommon.NewQueueingTxStrategy(subject, scoped.OCR2().DefaultTransactionQueueDepth(), scoped.Database().DefaultQueryTimeout())
strategy := txmgrcommon.NewQueueingTxStrategy(subject, scoped.OCR2().DefaultTransactionQueueDepth())

var checker txm.TransmitCheckerSpec
if configWatcher.chain.Config().OCR2().SimulateTransactions() {
Expand Down
2 changes: 1 addition & 1 deletion core/services/relay/evm/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func newFunctionsContractTransmitter(ctx context.Context, contractVersion uint32
}

scoped := configWatcher.chain.Config()
strategy := txmgrcommon.NewQueueingTxStrategy(rargs.ExternalJobID, scoped.OCR2().DefaultTransactionQueueDepth(), scoped.Database().DefaultQueryTimeout())
strategy := txmgrcommon.NewQueueingTxStrategy(rargs.ExternalJobID, scoped.OCR2().DefaultTransactionQueueDepth())

var checker txm.TransmitCheckerSpec
if configWatcher.chain.Config().OCR2().SimulateTransactions() {
Expand Down
Loading