From 2cd3e10171a8fd2a420a4eb1b6ed3b329403f017 Mon Sep 17 00:00:00 2001 From: Cedric Date: Tue, 11 Jun 2024 11:20:59 +0100 Subject: [PATCH] [chore] Make some timeouts customizable (#568) * [chore] Add request timeout to config * [chore] Add ExecuteSyncWithTimeout to allow customising the timeout --- pkg/capabilities/capabilities.go | 7 ++- pkg/capabilities/consensus/ocr3/capability.go | 15 +++++- .../consensus/ocr3/capability_test.go | 52 +++++++++++++++++++ pkg/capabilities/consensus/ocr3/models.go | 1 + pkg/capabilities/consensus/ocr3/ocr3.go | 2 +- .../testdata/fixtures/capability/schema.json | 8 ++- 6 files changed, 79 insertions(+), 6 deletions(-) diff --git a/pkg/capabilities/capabilities.go b/pkg/capabilities/capabilities.go index 5e4b7d5a4..04a51f459 100644 --- a/pkg/capabilities/capabilities.go +++ b/pkg/capabilities/capabilities.go @@ -302,7 +302,12 @@ var maximumExecuteTimeout = 60 * time.Second // There is default timeout of 10 seconds. If a capability takes longer than // that then it should be executed asynchronously. func ExecuteSync(ctx context.Context, c CallbackExecutable, request CapabilityRequest) (*values.List, error) { - ctxWithT, cancel := context.WithTimeout(ctx, maximumExecuteTimeout) + return ExecuteSyncWithTimeout(ctx, c, request, maximumExecuteTimeout) +} + +// ExecuteSyncWithTimeout allows explicitly passing in a timeout to customise the desired duration. +func ExecuteSyncWithTimeout(ctx context.Context, c CallbackExecutable, request CapabilityRequest, timeout time.Duration) (*values.List, error) { + ctxWithT, cancel := context.WithTimeout(ctx, timeout) defer cancel() responseCh, err := c.Execute(ctxWithT, request) diff --git a/pkg/capabilities/consensus/ocr3/capability.go b/pkg/capabilities/consensus/ocr3/capability.go index faec0983a..964257515 100644 --- a/pkg/capabilities/consensus/ocr3/capability.go +++ b/pkg/capabilities/consensus/ocr3/capability.go @@ -226,8 +226,19 @@ func (o *capability) Execute(ctx context.Context, r capabilities.CapabilityReque func (o *capability) queueRequestForProcessing( ctx context.Context, metadata capabilities.RequestMetadata, - i *inputs, c *config) (<-chan capabilities.CapabilityResponse, error) { + i *inputs, + c *config, +) (<-chan capabilities.CapabilityResponse, error) { callbackCh := make(chan capabilities.CapabilityResponse, o.callbackChannelBufferSize) + + // Use the capability-level request timeout unless the request's config specifies + // its own timeout, in which case we'll use that instead. This allows the workflow spec + // to configure more granular timeouts depending on the circumstances. + requestTimeout := o.requestTimeout + if c.RequestTimeoutMS != 0 { + requestTimeout = time.Duration(c.RequestTimeoutMS) * time.Millisecond + } + r := &request{ StopCh: make(chan struct{}), CallbackCh: callbackCh, @@ -238,7 +249,7 @@ func (o *capability) queueRequestForProcessing( ReportID: c.ReportID, WorkflowDonID: metadata.WorkflowDonID, Observations: i.Observations, - ExpiresAt: o.clock.Now().Add(o.requestTimeout), + ExpiresAt: o.clock.Now().Add(requestTimeout), } o.lggr.Debugw("Execute - adding to store", "workflowID", r.WorkflowID, "workflowExecutionID", r.WorkflowExecutionID, "observations", r.Observations) diff --git a/pkg/capabilities/consensus/ocr3/capability_test.go b/pkg/capabilities/consensus/ocr3/capability_test.go index 6fa47a430..20a8986e4 100644 --- a/pkg/capabilities/consensus/ocr3/capability_test.go +++ b/pkg/capabilities/consensus/ocr3/capability_test.go @@ -186,6 +186,58 @@ func TestOCR3Capability_Eviction(t *testing.T) { assert.False(t, ok) } +func TestOCR3Capability_EvictionUsingConfig(t *testing.T) { + n := time.Now() + fc := clockwork.NewFakeClockAt(n) + lggr := logger.Test(t) + + ctx := tests.Context(t) + // This is the default expired at + rea := time.Hour + s := newStore() + cp := newCapability(s, fc, rea, mockAggregatorFactory, mockEncoderFactory, lggr, 10) + require.NoError(t, cp.Start(ctx)) + + config, err := values.NewMap( + map[string]any{ + "aggregation_method": "data_feeds", + "aggregation_config": map[string]any{}, + "encoder_config": map[string]any{}, + "encoder": "evm", + "report_id": "aaaa", + "request_timeout_ms": 10000, + }, + ) + require.NoError(t, err) + + ethUsdValue, err := decimal.NewFromString("1.123456") + require.NoError(t, err) + inputs, err := values.NewMap(map[string]any{"observations": []any{map[string]any{"ETH_USD": ethUsdValue}}}) + require.NoError(t, err) + + rid := uuid.New().String() + executeReq := capabilities.CapabilityRequest{ + Metadata: capabilities.RequestMetadata{ + WorkflowID: workflowTestID, + WorkflowExecutionID: rid, + }, + Config: config, + Inputs: inputs, + } + + callback, err := cp.Execute(ctx, executeReq) + require.NoError(t, err) + + // 1 minute is more than the config timeout we provided, but less than + // the hardcoded timeout. + fc.Advance(1 * time.Minute) + resp := <-callback + assert.ErrorContains(t, resp.Err, "timeout exceeded: could not process request before expiry") + + _, ok := s.requests[rid] + assert.False(t, ok) +} + func TestOCR3Capability_Registration(t *testing.T) { n := time.Now() fc := clockwork.NewFakeClockAt(n) diff --git a/pkg/capabilities/consensus/ocr3/models.go b/pkg/capabilities/consensus/ocr3/models.go index 03eda462f..6844309a4 100644 --- a/pkg/capabilities/consensus/ocr3/models.go +++ b/pkg/capabilities/consensus/ocr3/models.go @@ -14,6 +14,7 @@ type config struct { Encoder string `mapstructure:"encoder" json:"encoder"` EncoderConfig *values.Map `mapstructure:"encoder_config" json:"encoder_config"` ReportID string `mapstructure:"report_id" json:"report_id" jsonschema:"required,pattern=^[a-f0-9]{4}$"` + RequestTimeoutMS int64 `mapstructure:"request_timeout_ms" json:"request_timeout_ms"` } type inputs struct { diff --git a/pkg/capabilities/consensus/ocr3/ocr3.go b/pkg/capabilities/consensus/ocr3/ocr3.go index 0851e7750..2facc2652 100644 --- a/pkg/capabilities/consensus/ocr3/ocr3.go +++ b/pkg/capabilities/consensus/ocr3/ocr3.go @@ -34,7 +34,7 @@ type Config struct { } const ( - defaultRequestExpiry time.Duration = 5 * time.Minute + defaultRequestExpiry time.Duration = 20 * time.Second defaultBatchSize = 20 defaultSendBufferSize = 10 ) diff --git a/pkg/capabilities/consensus/ocr3/testdata/fixtures/capability/schema.json b/pkg/capabilities/consensus/ocr3/testdata/fixtures/capability/schema.json index 6cd189b1d..1126ede11 100644 --- a/pkg/capabilities/consensus/ocr3/testdata/fixtures/capability/schema.json +++ b/pkg/capabilities/consensus/ocr3/testdata/fixtures/capability/schema.json @@ -40,6 +40,9 @@ "report_id": { "type": "string", "pattern": "^[a-f0-9]{4}$" + }, + "request_timeout_ms": { + "type": "integer" } }, "additionalProperties": false, @@ -49,7 +52,8 @@ "aggregation_config", "encoder", "encoder_config", - "report_id" + "report_id", + "request_timeout_ms" ] }, "inputs": { @@ -99,4 +103,4 @@ "outputs" ], "description": "OCR3 consensus exposed as a capability." -} +} \ No newline at end of file