Skip to content

Commit

Permalink
[chore] Make some timeouts customizable (#568)
Browse files Browse the repository at this point in the history
* [chore] Add request timeout to config

* [chore] Add ExecuteSyncWithTimeout to allow customising the timeout
  • Loading branch information
cedric-cordenier authored Jun 11, 2024
1 parent c9bc0a2 commit 2cd3e10
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 6 deletions.
7 changes: 6 additions & 1 deletion pkg/capabilities/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,12 @@ var maximumExecuteTimeout = 60 * time.Second
// There is default timeout of 10 seconds. If a capability takes longer than
// that then it should be executed asynchronously.
func ExecuteSync(ctx context.Context, c CallbackExecutable, request CapabilityRequest) (*values.List, error) {
ctxWithT, cancel := context.WithTimeout(ctx, maximumExecuteTimeout)
return ExecuteSyncWithTimeout(ctx, c, request, maximumExecuteTimeout)
}

// ExecuteSyncWithTimeout allows explicitly passing in a timeout to customise the desired duration.
func ExecuteSyncWithTimeout(ctx context.Context, c CallbackExecutable, request CapabilityRequest, timeout time.Duration) (*values.List, error) {
ctxWithT, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

responseCh, err := c.Execute(ctxWithT, request)
Expand Down
15 changes: 13 additions & 2 deletions pkg/capabilities/consensus/ocr3/capability.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,19 @@ func (o *capability) Execute(ctx context.Context, r capabilities.CapabilityReque
func (o *capability) queueRequestForProcessing(
ctx context.Context,
metadata capabilities.RequestMetadata,
i *inputs, c *config) (<-chan capabilities.CapabilityResponse, error) {
i *inputs,
c *config,
) (<-chan capabilities.CapabilityResponse, error) {
callbackCh := make(chan capabilities.CapabilityResponse, o.callbackChannelBufferSize)

// Use the capability-level request timeout unless the request's config specifies
// its own timeout, in which case we'll use that instead. This allows the workflow spec
// to configure more granular timeouts depending on the circumstances.
requestTimeout := o.requestTimeout
if c.RequestTimeoutMS != 0 {
requestTimeout = time.Duration(c.RequestTimeoutMS) * time.Millisecond
}

r := &request{
StopCh: make(chan struct{}),
CallbackCh: callbackCh,
Expand All @@ -238,7 +249,7 @@ func (o *capability) queueRequestForProcessing(
ReportID: c.ReportID,
WorkflowDonID: metadata.WorkflowDonID,
Observations: i.Observations,
ExpiresAt: o.clock.Now().Add(o.requestTimeout),
ExpiresAt: o.clock.Now().Add(requestTimeout),
}

o.lggr.Debugw("Execute - adding to store", "workflowID", r.WorkflowID, "workflowExecutionID", r.WorkflowExecutionID, "observations", r.Observations)
Expand Down
52 changes: 52 additions & 0 deletions pkg/capabilities/consensus/ocr3/capability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,58 @@ func TestOCR3Capability_Eviction(t *testing.T) {
assert.False(t, ok)
}

func TestOCR3Capability_EvictionUsingConfig(t *testing.T) {
n := time.Now()
fc := clockwork.NewFakeClockAt(n)
lggr := logger.Test(t)

ctx := tests.Context(t)
// This is the default expired at
rea := time.Hour
s := newStore()
cp := newCapability(s, fc, rea, mockAggregatorFactory, mockEncoderFactory, lggr, 10)
require.NoError(t, cp.Start(ctx))

config, err := values.NewMap(
map[string]any{
"aggregation_method": "data_feeds",
"aggregation_config": map[string]any{},
"encoder_config": map[string]any{},
"encoder": "evm",
"report_id": "aaaa",
"request_timeout_ms": 10000,
},
)
require.NoError(t, err)

ethUsdValue, err := decimal.NewFromString("1.123456")
require.NoError(t, err)
inputs, err := values.NewMap(map[string]any{"observations": []any{map[string]any{"ETH_USD": ethUsdValue}}})
require.NoError(t, err)

rid := uuid.New().String()
executeReq := capabilities.CapabilityRequest{
Metadata: capabilities.RequestMetadata{
WorkflowID: workflowTestID,
WorkflowExecutionID: rid,
},
Config: config,
Inputs: inputs,
}

callback, err := cp.Execute(ctx, executeReq)
require.NoError(t, err)

// 1 minute is more than the config timeout we provided, but less than
// the hardcoded timeout.
fc.Advance(1 * time.Minute)
resp := <-callback
assert.ErrorContains(t, resp.Err, "timeout exceeded: could not process request before expiry")

_, ok := s.requests[rid]
assert.False(t, ok)
}

func TestOCR3Capability_Registration(t *testing.T) {
n := time.Now()
fc := clockwork.NewFakeClockAt(n)
Expand Down
1 change: 1 addition & 0 deletions pkg/capabilities/consensus/ocr3/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type config struct {
Encoder string `mapstructure:"encoder" json:"encoder"`
EncoderConfig *values.Map `mapstructure:"encoder_config" json:"encoder_config"`
ReportID string `mapstructure:"report_id" json:"report_id" jsonschema:"required,pattern=^[a-f0-9]{4}$"`
RequestTimeoutMS int64 `mapstructure:"request_timeout_ms" json:"request_timeout_ms"`
}

type inputs struct {
Expand Down
2 changes: 1 addition & 1 deletion pkg/capabilities/consensus/ocr3/ocr3.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Config struct {
}

const (
defaultRequestExpiry time.Duration = 5 * time.Minute
defaultRequestExpiry time.Duration = 20 * time.Second
defaultBatchSize = 20
defaultSendBufferSize = 10
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
"report_id": {
"type": "string",
"pattern": "^[a-f0-9]{4}$"
},
"request_timeout_ms": {
"type": "integer"
}
},
"additionalProperties": false,
Expand All @@ -49,7 +52,8 @@
"aggregation_config",
"encoder",
"encoder_config",
"report_id"
"report_id",
"request_timeout_ms"
]
},
"inputs": {
Expand Down Expand Up @@ -99,4 +103,4 @@
"outputs"
],
"description": "OCR3 consensus exposed as a capability."
}
}

0 comments on commit 2cd3e10

Please sign in to comment.