From c89554df6ec70d51cc7c71208e5fe22653207543 Mon Sep 17 00:00:00 2001 From: Dimitris Date: Wed, 17 Apr 2024 15:54:33 +0300 Subject: [PATCH 1/5] Drop unused queryTimeout config from TXM strategy --- common/txmgr/strategies.go | 18 ++++++------------ core/chains/evm/txmgr/evm_tx_store_test.go | 4 ++-- core/chains/evm/txmgr/strategies_test.go | 8 ++------ core/services/blockhashstore/bhs.go | 2 +- core/services/fluxmonitorv2/delegate.go | 2 +- core/services/ocr/delegate.go | 2 +- core/services/relay/evm/evm.go | 2 +- core/services/relay/evm/functions.go | 2 +- 8 files changed, 15 insertions(+), 25 deletions(-) diff --git a/common/txmgr/strategies.go b/common/txmgr/strategies.go index 3772e6d1d20..6e037658854 100644 --- a/common/txmgr/strategies.go +++ b/common/txmgr/strategies.go @@ -3,7 +3,6 @@ package txmgr import ( "context" "fmt" - "time" "github.com/google/uuid" @@ -14,9 +13,9 @@ var _ txmgrtypes.TxStrategy = SendEveryStrategy{} // NewQueueingTxStrategy creates a new TxStrategy that drops the oldest transactions after the // queue size is exceeded if a queue size is specified, and otherwise does not drop transactions. -func NewQueueingTxStrategy(subject uuid.UUID, queueSize uint32, queryTimeout time.Duration) (strategy txmgrtypes.TxStrategy) { +func NewQueueingTxStrategy(subject uuid.UUID, queueSize uint32) (strategy txmgrtypes.TxStrategy) { if queueSize > 0 { - strategy = NewDropOldestStrategy(subject, queueSize, queryTimeout) + strategy = NewDropOldestStrategy(subject, queueSize) } else { strategy = SendEveryStrategy{} } @@ -41,15 +40,14 @@ var _ txmgrtypes.TxStrategy = DropOldestStrategy{} // DropOldestStrategy will send the newest N transactions, older ones will be // removed from the queue type DropOldestStrategy struct { - subject uuid.UUID - queueSize uint32 - queryTimeout time.Duration + subject uuid.UUID + queueSize uint32 } // NewDropOldestStrategy creates a new TxStrategy that drops the oldest transactions after the // queue size is exceeded. -func NewDropOldestStrategy(subject uuid.UUID, queueSize uint32, queryTimeout time.Duration) DropOldestStrategy { - return DropOldestStrategy{subject, queueSize, queryTimeout} +func NewDropOldestStrategy(subject uuid.UUID, queueSize uint32) DropOldestStrategy { + return DropOldestStrategy{subject, queueSize} } func (s DropOldestStrategy) Subject() uuid.NullUUID { @@ -57,10 +55,6 @@ func (s DropOldestStrategy) Subject() uuid.NullUUID { } func (s DropOldestStrategy) PruneQueue(ctx context.Context, pruneService txmgrtypes.UnstartedTxQueuePruner) (ids []int64, err error) { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, s.queryTimeout) - defer cancel() - // NOTE: We prune one less than the queue size to prevent the queue from exceeding the max queue size. Which could occur if a new transaction is added to the queue right after we prune. ids, err = pruneService.PruneUnstartedTxQueue(ctx, s.queueSize-1, s.subject) if err != nil { diff --git a/core/chains/evm/txmgr/evm_tx_store_test.go b/core/chains/evm/txmgr/evm_tx_store_test.go index 5bb131862ed..6cfc01c20d0 100644 --- a/core/chains/evm/txmgr/evm_tx_store_test.go +++ b/core/chains/evm/txmgr/evm_tx_store_test.go @@ -1841,7 +1841,7 @@ func TestORM_PruneUnstartedTxQueue(t *testing.T) { t.Run("does not prune if queue has not exceeded capacity-1", func(t *testing.T) { subject1 := uuid.New() - strategy1 := txmgrcommon.NewDropOldestStrategy(subject1, uint32(5), cfg.Database().DefaultQueryTimeout()) + strategy1 := txmgrcommon.NewDropOldestStrategy(subject1, uint32(5)) for i := 0; i < 5; i++ { mustCreateUnstartedGeneratedTx(t, txStore, fromAddress, &cltest.FixtureChainID, txRequestWithStrategy(strategy1)) } @@ -1850,7 +1850,7 @@ func TestORM_PruneUnstartedTxQueue(t *testing.T) { t.Run("prunes if queue has exceeded capacity-1", func(t *testing.T) { subject2 := uuid.New() - strategy2 := txmgrcommon.NewDropOldestStrategy(subject2, uint32(3), cfg.Database().DefaultQueryTimeout()) + strategy2 := txmgrcommon.NewDropOldestStrategy(subject2, uint32(3)) for i := 0; i < 5; i++ { mustCreateUnstartedGeneratedTx(t, txStore, fromAddress, &cltest.FixtureChainID, txRequestWithStrategy(strategy2)) } diff --git a/core/chains/evm/txmgr/strategies_test.go b/core/chains/evm/txmgr/strategies_test.go index 19f5f197289..d7f4ceaf450 100644 --- a/core/chains/evm/txmgr/strategies_test.go +++ b/core/chains/evm/txmgr/strategies_test.go @@ -11,7 +11,6 @@ import ( txmgrcommon "github.com/smartcontractkit/chainlink/v2/common/txmgr" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr/mocks" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest" ) func Test_SendEveryStrategy(t *testing.T) { @@ -28,10 +27,9 @@ func Test_SendEveryStrategy(t *testing.T) { func Test_DropOldestStrategy_Subject(t *testing.T) { t.Parallel() - cfg := configtest.NewGeneralConfig(t, nil) subject := uuid.New() - s := txmgrcommon.NewDropOldestStrategy(subject, 1, cfg.Database().DefaultQueryTimeout()) + s := txmgrcommon.NewDropOldestStrategy(subject, 1) assert.True(t, s.Subject().Valid) assert.Equal(t, subject, s.Subject().UUID) @@ -39,14 +37,12 @@ func Test_DropOldestStrategy_Subject(t *testing.T) { func Test_DropOldestStrategy_PruneQueue(t *testing.T) { t.Parallel() - cfg := configtest.NewGeneralConfig(t, nil) subject := uuid.New() queueSize := uint32(2) - queryTimeout := cfg.Database().DefaultQueryTimeout() mockTxStore := mocks.NewEvmTxStore(t) t.Run("calls PrineUnstartedTxQueue for the given subject and queueSize, ignoring fromAddress", func(t *testing.T) { - strategy1 := txmgrcommon.NewDropOldestStrategy(subject, queueSize, queryTimeout) + strategy1 := txmgrcommon.NewDropOldestStrategy(subject, queueSize) mockTxStore.On("PruneUnstartedTxQueue", mock.Anything, queueSize-1, subject, mock.Anything, mock.Anything).Once().Return([]int64{1, 2}, nil) ids, err := strategy1.PruneQueue(testutils.Context(t), mockTxStore) require.NoError(t, err) diff --git a/core/services/blockhashstore/bhs.go b/core/services/blockhashstore/bhs.go index d4dd52c5661..4d1fe761c88 100644 --- a/core/services/blockhashstore/bhs.go +++ b/core/services/blockhashstore/bhs.go @@ -104,7 +104,7 @@ func (c *BulletproofBHS) Store(ctx context.Context, blockNum uint64) error { // Set a queue size of 256. At most we store the blockhash of every block, and only the // latest 256 can possibly be stored. - Strategy: txmgrcommon.NewQueueingTxStrategy(c.jobID, 256, c.dbConfig.DefaultQueryTimeout()), + Strategy: txmgrcommon.NewQueueingTxStrategy(c.jobID, 256), }) if err != nil { return errors.Wrap(err, "creating transaction") diff --git a/core/services/fluxmonitorv2/delegate.go b/core/services/fluxmonitorv2/delegate.go index 1e2eba8d000..a56931b6a1e 100644 --- a/core/services/fluxmonitorv2/delegate.go +++ b/core/services/fluxmonitorv2/delegate.go @@ -71,7 +71,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, jb job.Job) (services [] return nil, err } cfg := chain.Config() - strategy := txmgrcommon.NewQueueingTxStrategy(jb.ExternalJobID, cfg.FluxMonitor().DefaultTransactionQueueDepth(), cfg.Database().DefaultQueryTimeout()) + strategy := txmgrcommon.NewQueueingTxStrategy(jb.ExternalJobID, cfg.FluxMonitor().DefaultTransactionQueueDepth()) var checker txmgr.TransmitCheckerSpec if chain.Config().FluxMonitor().SimulateTransactions() { checker.CheckerType = txmgr.TransmitCheckerTypeSimulate diff --git a/core/services/ocr/delegate.go b/core/services/ocr/delegate.go index bcdda397e20..be3869570c5 100644 --- a/core/services/ocr/delegate.go +++ b/core/services/ocr/delegate.go @@ -199,7 +199,7 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, jb job.Job) (services [] } cfg := chain.Config() - strategy := txmgrcommon.NewQueueingTxStrategy(jb.ExternalJobID, cfg.OCR().DefaultTransactionQueueDepth(), cfg.Database().DefaultQueryTimeout()) + strategy := txmgrcommon.NewQueueingTxStrategy(jb.ExternalJobID, cfg.OCR().DefaultTransactionQueueDepth()) var checker txmgr.TransmitCheckerSpec if chain.Config().OCR().SimulateTransactions() { diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index 1a09e681f8a..4f31110fda1 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -521,7 +521,7 @@ func newOnChainContractTransmitter(ctx context.Context, lggr logger.Logger, rarg subject = *opts.subjectID } scoped := configWatcher.chain.Config() - strategy := txmgrcommon.NewQueueingTxStrategy(subject, scoped.OCR2().DefaultTransactionQueueDepth(), scoped.Database().DefaultQueryTimeout()) + strategy := txmgrcommon.NewQueueingTxStrategy(subject, scoped.OCR2().DefaultTransactionQueueDepth()) var checker txm.TransmitCheckerSpec if configWatcher.chain.Config().OCR2().SimulateTransactions() { diff --git a/core/services/relay/evm/functions.go b/core/services/relay/evm/functions.go index ed7b247f46b..9444ab4164d 100644 --- a/core/services/relay/evm/functions.go +++ b/core/services/relay/evm/functions.go @@ -183,7 +183,7 @@ func newFunctionsContractTransmitter(ctx context.Context, contractVersion uint32 } scoped := configWatcher.chain.Config() - strategy := txmgrcommon.NewQueueingTxStrategy(rargs.ExternalJobID, scoped.OCR2().DefaultTransactionQueueDepth(), scoped.Database().DefaultQueryTimeout()) + strategy := txmgrcommon.NewQueueingTxStrategy(rargs.ExternalJobID, scoped.OCR2().DefaultTransactionQueueDepth()) var checker txm.TransmitCheckerSpec if configWatcher.chain.Config().OCR2().SimulateTransactions() { From f5a7a6a348a1ac4937907f32b66b9102efd1e195 Mon Sep 17 00:00:00 2001 From: Dimitris Date: Wed, 17 Apr 2024 16:04:58 +0300 Subject: [PATCH 2/5] Add changeset --- .changeset/new-forks-grab.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/new-forks-grab.md diff --git a/.changeset/new-forks-grab.md b/.changeset/new-forks-grab.md new file mode 100644 index 00000000000..90103277e4a --- /dev/null +++ b/.changeset/new-forks-grab.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +Drop unused queryTimeout config from TXM strategy From b8df0ba50876b30335b169897dc2387c8fb2907a Mon Sep 17 00:00:00 2001 From: Dimitris Date: Wed, 17 Apr 2024 16:09:52 +0300 Subject: [PATCH 3/5] Fix changeset --- .changeset/cyan-crabs-explode.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changeset/cyan-crabs-explode.md b/.changeset/cyan-crabs-explode.md index 5018e2d555c..9fab81c0e89 100644 --- a/.changeset/cyan-crabs-explode.md +++ b/.changeset/cyan-crabs-explode.md @@ -1,5 +1,5 @@ --- -"chainlink": minor +"chainlink": removed --- Add support for workflow jobs to Operator UI #wip #added From 0e5bedd8d21be5972410f67d28da199d0ec93702 Mon Sep 17 00:00:00 2001 From: Dimitris Date: Wed, 17 Apr 2024 16:11:06 +0300 Subject: [PATCH 4/5] Fix changeset error --- .changeset/cyan-crabs-explode.md | 2 +- .changeset/new-forks-grab.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.changeset/cyan-crabs-explode.md b/.changeset/cyan-crabs-explode.md index 9fab81c0e89..5018e2d555c 100644 --- a/.changeset/cyan-crabs-explode.md +++ b/.changeset/cyan-crabs-explode.md @@ -1,5 +1,5 @@ --- -"chainlink": removed +"chainlink": minor --- Add support for workflow jobs to Operator UI #wip #added diff --git a/.changeset/new-forks-grab.md b/.changeset/new-forks-grab.md index 90103277e4a..6344132221f 100644 --- a/.changeset/new-forks-grab.md +++ b/.changeset/new-forks-grab.md @@ -1,5 +1,5 @@ --- -"chainlink": minor +"chainlink": removed --- Drop unused queryTimeout config from TXM strategy From e580e040d823371742804cc951e76c566dc85aba Mon Sep 17 00:00:00 2001 From: Dimitris Date: Wed, 17 Apr 2024 16:41:48 +0300 Subject: [PATCH 5/5] Add internal tag --- .changeset/new-forks-grab.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changeset/new-forks-grab.md b/.changeset/new-forks-grab.md index 6344132221f..cb078beb29b 100644 --- a/.changeset/new-forks-grab.md +++ b/.changeset/new-forks-grab.md @@ -2,4 +2,4 @@ "chainlink": removed --- -Drop unused queryTimeout config from TXM strategy +Drop unused queryTimeout config from TXM strategy #internal