From e21d92ecea387651bc88936ad709e84d946a1e25 Mon Sep 17 00:00:00 2001 From: Chen Shou Date: Tue, 28 Apr 2020 20:06:42 +0000 Subject: [PATCH 1/4] Have broker fanout handle timeout gracefully for each individual target --- cmd/broker/fanout/main.go | 14 ++++--- cmd/broker/retry/main.go | 12 +++--- pkg/broker/handler/pool/fanout/pool.go | 10 +++++ pkg/broker/handler/pool/fanout/pool_test.go | 25 ++++++++++- pkg/broker/handler/pool/options.go | 22 +++++++++- pkg/broker/handler/pool/options_test.go | 24 ++++++++++- pkg/broker/handler/pool/testing/helper.go | 30 +++++++++++++ .../handler/processors/deliver/processor.go | 16 ++++++- .../processors/deliver/processor_test.go | 42 +++++++++++++++---- 9 files changed, 171 insertions(+), 24 deletions(-) diff --git a/cmd/broker/fanout/main.go b/cmd/broker/fanout/main.go index 980b57c05c..66a49d14fa 100644 --- a/cmd/broker/fanout/main.go +++ b/cmd/broker/fanout/main.go @@ -54,12 +54,14 @@ var ( ) type envConfig struct { - PodName string `envconfig:"POD_NAME" required:"true"` - ProjectID string `envconfig:"PROJECT_ID"` - TargetsConfigPath string `envconfig:"TARGETS_CONFIG_PATH" default:"/var/run/cloud-run-events/broker/targets"` - HandlerConcurrency int `envconfig:"HANDLER_CONCURRENCY"` - MaxConcurrencyPerEvent int `envconfig:"MAX_CONCURRENCY_PER_EVENT"` - TimeoutPerEvent time.Duration `envconfig:"TIMEOUT_PER_EVENT"` + PodName string `envconfig:"POD_NAME" required:"true"` + ProjectID string `envconfig:"PROJECT_ID"` + TargetsConfigPath string `envconfig:"TARGETS_CONFIG_PATH" default:"/var/run/cloud-run-events/broker/targets"` + HandlerConcurrency int `envconfig:"HANDLER_CONCURRENCY"` + MaxConcurrencyPerEvent int `envconfig:"MAX_CONCURRENCY_PER_EVENT"` + + // Max to 10m. + TimeoutPerEvent time.Duration `envconfig:"TIMEOUT_PER_EVENT"` } func main() { diff --git a/cmd/broker/retry/main.go b/cmd/broker/retry/main.go index 9e1d527800..603be8cbb6 100644 --- a/cmd/broker/retry/main.go +++ b/cmd/broker/retry/main.go @@ -54,11 +54,13 @@ var ( ) type envConfig struct { - PodName string `envconfig:"POD_NAME" required:"true"` - ProjectID string `envconfig:"PROJECT_ID"` - TargetsConfigPath string `envconfig:"TARGETS_CONFIG_PATH" default:"/var/run/cloud-run-events/broker/targets"` - HandlerConcurrency int `envconfig:"HANDLER_CONCURRENCY"` - TimeoutPerEvent time.Duration `envconfig:"TIMEOUT_PER_EVENT"` + PodName string `envconfig:"POD_NAME" required:"true"` + ProjectID string `envconfig:"PROJECT_ID"` + TargetsConfigPath string `envconfig:"TARGETS_CONFIG_PATH" default:"/var/run/cloud-run-events/broker/targets"` + HandlerConcurrency int `envconfig:"HANDLER_CONCURRENCY"` + + // Max to 10m. + TimeoutPerEvent time.Duration `envconfig:"TIMEOUT_PER_EVENT"` } func main() { diff --git a/pkg/broker/handler/pool/fanout/pool.go b/pkg/broker/handler/pool/fanout/pool.go index 1c85a0cefc..edd28537e0 100644 --- a/pkg/broker/handler/pool/fanout/pool.go +++ b/pkg/broker/handler/pool/fanout/pool.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "sync" + "time" ceclient "github.com/cloudevents/sdk-go/v2/client" cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" @@ -52,6 +53,12 @@ type SyncPool struct { // For initial events delivery. We only need a shared client. // And we can set target address dynamically. deliverClient ceclient.Client + // For fanout delivery, we need a slightly shorter timeout + // than the handler timeout per event. + // It allows the delivery processor to timeout the delivery + // before the handler nacks the pubsub message, which will + // cause event re-delivery for all targets. + deliverTimeout time.Duration } type handlerCache struct { @@ -107,6 +114,8 @@ func NewSyncPool(ctx context.Context, targets config.ReadonlyTargets, opts ...po options: options, deliverClient: deliverClient, deliverRetryClient: retryClient, + // Set the deliver timeout slightly less than the total timeout for each event. + deliverTimeout: options.TimeoutPerEvent - (5 * time.Second), } return p, nil } @@ -173,6 +182,7 @@ func (p *SyncPool) SyncOnce(ctx context.Context) error { Targets: p.targets, RetryOnFailure: true, DeliverRetryClient: p.deliverRetryClient, + DeliverTimeout: p.deliverTimeout, }, ), }, diff --git a/pkg/broker/handler/pool/fanout/pool_test.go b/pkg/broker/handler/pool/fanout/pool_test.go index 197580c3c0..ff0712db0e 100644 --- a/pkg/broker/handler/pool/fanout/pool_test.go +++ b/pkg/broker/handler/pool/fanout/pool_test.go @@ -115,7 +115,9 @@ func TestFanoutSyncPoolE2E(t *testing.T) { signal := make(chan struct{}) syncPool, err := NewSyncPool(ctx, helper.Targets, pool.WithPubsubClient(helper.PubsubClient), - pool.WithProjectID(testProject)) + pool.WithProjectID(testProject), + pool.WithDeliveryTimeout(500*time.Millisecond), + ) if err != nil { t.Errorf("unexpected error from getting sync pool: %v", err) } @@ -198,6 +200,27 @@ func TestFanoutSyncPoolE2E(t *testing.T) { <-vctx.Done() }) + t.Run("event initial delivery failed with timeout was sent to retry queue", func(t *testing.T) { + // Set timeout context so that verification can be done before + // exiting test func. + vctx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + + // Verify the event to t1 was received. + go helper.VerifyNextTargetEventAndDelayResp(ctx, t, t1.Key(), &e, time.Second) + // Because of the delay, t1 delivery should timeout. + // Thus the event should have been sent to the retry queue. + go helper.VerifyNextTargetRetryEvent(ctx, t, t1.Key(), &e) + // The same event should be received by t2. + go helper.VerifyNextTargetEvent(vctx, t, t2.Key(), &e) + // But t2 should receive any retry event because the initial delay + // was successful. + go helper.VerifyNextTargetRetryEvent(vctx, t, t2.Key(), nil) + + helper.SendEventToDecoupleQueue(ctx, t, b1.Key(), &e) + <-vctx.Done() + }) + t.Run("event replied was sent to broker ingress", func(t *testing.T) { reply := event.New() reply.SetSubject("bar") diff --git a/pkg/broker/handler/pool/options.go b/pkg/broker/handler/pool/options.go index 6f54ee31c3..0c39443b15 100644 --- a/pkg/broker/handler/pool/options.go +++ b/pkg/broker/handler/pool/options.go @@ -35,6 +35,13 @@ var ( ceclient.WithTimeNow(), ceclient.WithTracePropagation(), } + + // This is the pubsub default MaxExtension. + // It would not make sense for handler timeout per event be greater + // than this value because the message would be nacked before the handler + // timeouts. + // TODO: consider allow changing this value? + maxTimeout = 10 * time.Minute ) // Options holds all the options for create handler pool. @@ -49,6 +56,8 @@ type Options struct { MaxConcurrencyPerEvent int // TimeoutPerEvent is the timeout for handling an event. TimeoutPerEvent time.Duration + // DeliveryTimeout is the timeout for delivering an event to a consumer. + DeliveryTimeout time.Duration // PubsubClient is the pubsub client used to receive pubsub messages. PubsubClient *pubsub.Client // PubsubReceiveSettings is the pubsub receive settings. @@ -108,7 +117,11 @@ func WithMaxConcurrentPerEvent(c int) Option { // WithTimeoutPerEvent sets TimeoutPerEvent. func WithTimeoutPerEvent(t time.Duration) Option { return func(o *Options) { - o.TimeoutPerEvent = t + if t > maxTimeout { + o.TimeoutPerEvent = maxTimeout + } else { + o.TimeoutPerEvent = t + } } } @@ -125,3 +138,10 @@ func WithPubsubReceiveSettings(s pubsub.ReceiveSettings) Option { o.PubsubReceiveSettings = s } } + +// WithDeliveryTimeout sets the DeliveryTimeout. +func WithDeliveryTimeout(t time.Duration) Option { + return func(o *Options) { + o.DeliveryTimeout = t + } +} diff --git a/pkg/broker/handler/pool/options_test.go b/pkg/broker/handler/pool/options_test.go index db8dd7460a..0b485aaff1 100644 --- a/pkg/broker/handler/pool/options_test.go +++ b/pkg/broker/handler/pool/options_test.go @@ -60,7 +60,7 @@ func TestWithMaxConcurrency(t *testing.T) { } func TestWithTimeout(t *testing.T) { - want := 10 * time.Minute + want := 2 * time.Minute // Always add project id because the default value can only be retrieved on GCE/GKE machines. opt, err := NewOptions(WithTimeoutPerEvent(want), WithProjectID("pid")) if err != nil { @@ -69,6 +69,16 @@ func TestWithTimeout(t *testing.T) { if opt.TimeoutPerEvent != want { t.Errorf("options timeout per event got=%v, want=%v", opt.TimeoutPerEvent, want) } + + // Set timeout greater than the max value and verify it fallbacks to the max value. + // Always add project id because the default value can only be retrieved on GCE/GKE machines. + opt, err = NewOptions(WithTimeoutPerEvent(20*time.Minute), WithProjectID("pid")) + if err != nil { + t.Errorf("NewOptions got unexpected error: %v", err) + } + if opt.TimeoutPerEvent != maxTime { + t.Errorf("options timeout per event got=%v, want=%v", opt.TimeoutPerEvent, want) + } } func TestWithReceiveSettings(t *testing.T) { @@ -104,3 +114,15 @@ func TestWithPubsubClient(t *testing.T) { t.Error("options PubsubClient got=nil, want=non-nil client") } } + +func TestWithDeliveryTimeout(t *testing.T) { + want := 10 * time.Minute + // Always add project id because the default value can only be retrieved on GCE/GKE machines. + opt, err := NewOptions(WithDeliveryTimeout(want), WithProjectID("pid")) + if err != nil { + t.Errorf("NewOptions got unexpected error: %v", err) + } + if opt.TimeoutPerEvent != want { + t.Errorf("options timeout per event got=%v, want=%v", opt.DeliveryTimeout, want) + } +} diff --git a/pkg/broker/handler/pool/testing/helper.go b/pkg/broker/handler/pool/testing/helper.go index 18de76fa2d..bd7d604712 100644 --- a/pkg/broker/handler/pool/testing/helper.go +++ b/pkg/broker/handler/pool/testing/helper.go @@ -22,6 +22,7 @@ import ( "net/http" "net/http/httptest" "testing" + "time" "cloud.google.com/go/pubsub" "cloud.google.com/go/pubsub/pstest" @@ -373,6 +374,35 @@ func (h *Helper) VerifyNextTargetEvent(ctx context.Context, t *testing.T, target h.VerifyAndRespondNextTargetEvent(ctx, t, targetKey, wantEvent, nil, http.StatusOK) } +// VerifyNextTargetEventWithInfiniteDelay verifies the next event the subscriber receives +// but not respond a success infinitely. +func (h *Helper) VerifyNextTargetEventAndDelayResp(ctx context.Context, t *testing.T, targetKey string, wantEvent *event.Event, delay time.Duration) { + t.Helper() + + consumer, ok := h.consumers[targetKey] + if !ok { + t.Errorf("target with key %q doesn't exist", targetKey) + } + + // On timeout or receiving an event, the defer function verifies the event in the end. + var gotEvent *event.Event + defer func() { + assertEvent(t, wantEvent, gotEvent, fmt.Sprintf("target (key=%q)", targetKey)) + }() + + msg, err := consumer.client.Receive(ctx) + if err != nil { + // In case Receive is stopped. + return + } + gotEvent, err = binding.ToEvent(ctx, msg) + if err != nil { + t.Errorf("target (key=%q) received invalid cloudevent: %v", targetKey, err) + } + time.Sleep(delay) + msg.Finish(nil) +} + // VerifyAndRespondNextTargetEvent verifies the next event the subscriber receives and replies with the given parameters. // If wantEvent is nil, then it means such an event is not expected. // This function is blocking and should be invoked in a separate goroutine with context timeout. diff --git a/pkg/broker/handler/processors/deliver/processor.go b/pkg/broker/handler/processors/deliver/processor.go index 918faba637..8c618329c5 100644 --- a/pkg/broker/handler/processors/deliver/processor.go +++ b/pkg/broker/handler/processors/deliver/processor.go @@ -19,6 +19,7 @@ package deliver import ( "context" "fmt" + "time" ceclient "github.com/cloudevents/sdk-go/v2/client" cecontext "github.com/cloudevents/sdk-go/v2/context" @@ -49,6 +50,10 @@ type Processor struct { // DeliverRetryClient is the cloudevents client to send events // to the retry topic. DeliverRetryClient ceclient.Client + + // Timeout is the delivery timeout. + // If set, the delivery will be time-boxed. + DeliverTimeout time.Duration } var _ processors.Interface = (*Processor)(nil) @@ -76,7 +81,14 @@ func (p *Processor) Process(ctx context.Context, event *event.Event) error { return nil } - resp, res := p.DeliverClient.Request(cecontext.WithTarget(ctx, target.Address), *event) + dctx := ctx + if p.DeliverTimeout > 0 { + var cancel context.CancelFunc + dctx, cancel = context.WithTimeout(dctx, p.DeliverTimeout) + defer cancel() + } + + resp, res := p.DeliverClient.Request(cecontext.WithTarget(dctx, target.Address), *event) if !protocol.IsACK(res) { if !p.RetryOnFailure { return fmt.Errorf("target delivery failed: %v", res.Error()) @@ -89,7 +101,7 @@ func (p *Processor) Process(ctx context.Context, event *event.Event) error { return nil } - if res := p.DeliverClient.Send(cecontext.WithTarget(ctx, broker.Address), *resp); !protocol.IsACK(res) { + if res := p.DeliverClient.Send(cecontext.WithTarget(dctx, broker.Address), *resp); !protocol.IsACK(res) { if !p.RetryOnFailure { return fmt.Errorf("delivery of replied event failed: %v", res.Error()) } diff --git a/pkg/broker/handler/processors/deliver/processor_test.go b/pkg/broker/handler/processors/deliver/processor_test.go index 036ecaeace..6ff3d3f9cb 100644 --- a/pkg/broker/handler/processors/deliver/processor_test.go +++ b/pkg/broker/handler/processors/deliver/processor_test.go @@ -142,21 +142,36 @@ func TestDeliverSuccess(t *testing.T) { func TestDeliverFailure(t *testing.T) { cases := []struct { - name string - withRetry bool - failRetry bool - wantErr bool + name string + withRetry bool + withRespDelay time.Duration + failRetry bool + wantErr bool }{{ - name: "no retry", + name: "delivery error no retry", wantErr: true, }, { - name: "retry success", + name: "delivery error retry success", withRetry: true, }, { - name: "retry failure", + name: "delivery error retry failure", withRetry: true, failRetry: true, wantErr: true, + }, { + name: "delivery timeout no retry", + withRespDelay: time.Second, + wantErr: true, + }, { + name: "delivery timeout retry success", + withRespDelay: time.Second, + withRetry: true, + }, { + name: "delivery timeout retry failure", + withRetry: true, + withRespDelay: time.Second, + failRetry: true, + wantErr: true, }} for _, tc := range cases { @@ -214,6 +229,7 @@ func TestDeliverFailure(t *testing.T) { Targets: testTargets, RetryOnFailure: tc.withRetry, DeliverRetryClient: deliverRetryClient, + DeliverTimeout: 500 * time.Millisecond, } origin := event.New() @@ -230,12 +246,22 @@ func TestDeliverFailure(t *testing.T) { if err != nil { t.Errorf("unexpected error from target receiving event: %v", err) } + defer msg.Finish(nil) + + // If with delay, we reply OK so that we know the error is for sure caused by timeout. + if tc.withRespDelay > 0 { + time.Sleep(tc.withRespDelay) + if err := resp(ctx, nil, &cehttp.Result{StatusCode: http.StatusOK}); err != nil { + t.Errorf("unexpected error from target responding event: %v", err) + } + return + } + // Due to https://github.com/cloudevents/sdk-go/issues/433 // it's not possible to use Receive to easily return error. if err := resp(ctx, nil, &cehttp.Result{StatusCode: http.StatusInternalServerError}); err != nil { t.Errorf("unexpected error from target responding event: %v", err) } - defer msg.Finish(nil) }() err = p.Process(ctx, &origin) From c37ddff2ca444acbbb80a0a19b9199fb641965fb Mon Sep 17 00:00:00 2001 From: Chen Shou Date: Tue, 28 Apr 2020 20:11:24 +0000 Subject: [PATCH 2/4] fix comment --- pkg/broker/handler/pool/fanout/pool_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/broker/handler/pool/fanout/pool_test.go b/pkg/broker/handler/pool/fanout/pool_test.go index ff0712db0e..d20e553364 100644 --- a/pkg/broker/handler/pool/fanout/pool_test.go +++ b/pkg/broker/handler/pool/fanout/pool_test.go @@ -213,7 +213,7 @@ func TestFanoutSyncPoolE2E(t *testing.T) { go helper.VerifyNextTargetRetryEvent(ctx, t, t1.Key(), &e) // The same event should be received by t2. go helper.VerifyNextTargetEvent(vctx, t, t2.Key(), &e) - // But t2 should receive any retry event because the initial delay + // But t2 shouldn't receive any retry event because the initial delay // was successful. go helper.VerifyNextTargetRetryEvent(vctx, t, t2.Key(), nil) From c6c5e29dff8fbe26d6adbfb0f8f2136885c8702d Mon Sep 17 00:00:00 2001 From: Chen Shou Date: Tue, 28 Apr 2020 20:24:45 +0000 Subject: [PATCH 3/4] fix renaming bug --- pkg/broker/handler/pool/options_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/broker/handler/pool/options_test.go b/pkg/broker/handler/pool/options_test.go index 0b485aaff1..af9af1767b 100644 --- a/pkg/broker/handler/pool/options_test.go +++ b/pkg/broker/handler/pool/options_test.go @@ -76,8 +76,8 @@ func TestWithTimeout(t *testing.T) { if err != nil { t.Errorf("NewOptions got unexpected error: %v", err) } - if opt.TimeoutPerEvent != maxTime { - t.Errorf("options timeout per event got=%v, want=%v", opt.TimeoutPerEvent, want) + if opt.TimeoutPerEvent != maxTimeout { + t.Errorf("options timeout per event got=%v, want=%v", opt.TimeoutPerEvent, maxTimeout) } } From 487ead11f4240eddafe3f11dbcc45950e5c22d8b Mon Sep 17 00:00:00 2001 From: Chen Shou Date: Tue, 28 Apr 2020 21:34:50 +0000 Subject: [PATCH 4/4] address comments --- pkg/broker/handler/pool/fanout/pool.go | 4 +++ pkg/broker/handler/pool/fanout/pool_test.go | 6 ++-- pkg/broker/handler/pool/testing/helper.go | 34 ++++--------------- .../handler/processors/deliver/processor.go | 4 +-- 4 files changed, 16 insertions(+), 32 deletions(-) diff --git a/pkg/broker/handler/pool/fanout/pool.go b/pkg/broker/handler/pool/fanout/pool.go index edd28537e0..fd0ee87c27 100644 --- a/pkg/broker/handler/pool/fanout/pool.go +++ b/pkg/broker/handler/pool/fanout/pool.go @@ -109,6 +109,10 @@ func NewSyncPool(ctx context.Context, targets config.ReadonlyTargets, opts ...po return nil, err } + if options.TimeoutPerEvent < 5*time.Second { + return nil, fmt.Errorf("timeout per event cannot be lower than %v", 5*time.Second) + } + p := &SyncPool{ targets: targets, options: options, diff --git a/pkg/broker/handler/pool/fanout/pool_test.go b/pkg/broker/handler/pool/fanout/pool_test.go index d20e553364..e5e686e57e 100644 --- a/pkg/broker/handler/pool/fanout/pool_test.go +++ b/pkg/broker/handler/pool/fanout/pool_test.go @@ -193,14 +193,14 @@ func TestFanoutSyncPoolE2E(t *testing.T) { vctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() - go helper.VerifyAndRespondNextTargetEvent(ctx, t, t3.Key(), &e, nil, http.StatusInternalServerError) + go helper.VerifyAndRespondNextTargetEvent(ctx, t, t3.Key(), &e, nil, http.StatusInternalServerError, 0) go helper.VerifyNextTargetRetryEvent(ctx, t, t3.Key(), &e) helper.SendEventToDecoupleQueue(ctx, t, b2.Key(), &e) <-vctx.Done() }) - t.Run("event initial delivery failed with timeout was sent to retry queue", func(t *testing.T) { + t.Run("event with delivery timeout was sent to retry queue", func(t *testing.T) { // Set timeout context so that verification can be done before // exiting test func. vctx, cancel := context.WithTimeout(ctx, 2*time.Second) @@ -233,7 +233,7 @@ func TestFanoutSyncPoolE2E(t *testing.T) { vctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() - go helper.VerifyAndRespondNextTargetEvent(ctx, t, t3.Key(), &e, &reply, http.StatusOK) + go helper.VerifyAndRespondNextTargetEvent(ctx, t, t3.Key(), &e, &reply, http.StatusOK, 0) go helper.VerifyNextBrokerIngressEvent(ctx, t, b2.Key(), &reply) helper.SendEventToDecoupleQueue(ctx, t, b2.Key(), &e) diff --git a/pkg/broker/handler/pool/testing/helper.go b/pkg/broker/handler/pool/testing/helper.go index bd7d604712..6d1c34016a 100644 --- a/pkg/broker/handler/pool/testing/helper.go +++ b/pkg/broker/handler/pool/testing/helper.go @@ -371,42 +371,20 @@ func (h *Helper) VerifyNextBrokerIngressEvent(ctx context.Context, t *testing.T, // This function is blocking and should be invoked in a separate goroutine with context timeout. func (h *Helper) VerifyNextTargetEvent(ctx context.Context, t *testing.T, targetKey string, wantEvent *event.Event) { t.Helper() - h.VerifyAndRespondNextTargetEvent(ctx, t, targetKey, wantEvent, nil, http.StatusOK) + h.VerifyAndRespondNextTargetEvent(ctx, t, targetKey, wantEvent, nil, http.StatusOK, 0) } -// VerifyNextTargetEventWithInfiniteDelay verifies the next event the subscriber receives +// VerifyNextTargetEventAndDelayResp verifies the next event the subscriber receives // but not respond a success infinitely. func (h *Helper) VerifyNextTargetEventAndDelayResp(ctx context.Context, t *testing.T, targetKey string, wantEvent *event.Event, delay time.Duration) { t.Helper() - - consumer, ok := h.consumers[targetKey] - if !ok { - t.Errorf("target with key %q doesn't exist", targetKey) - } - - // On timeout or receiving an event, the defer function verifies the event in the end. - var gotEvent *event.Event - defer func() { - assertEvent(t, wantEvent, gotEvent, fmt.Sprintf("target (key=%q)", targetKey)) - }() - - msg, err := consumer.client.Receive(ctx) - if err != nil { - // In case Receive is stopped. - return - } - gotEvent, err = binding.ToEvent(ctx, msg) - if err != nil { - t.Errorf("target (key=%q) received invalid cloudevent: %v", targetKey, err) - } - time.Sleep(delay) - msg.Finish(nil) + h.VerifyAndRespondNextTargetEvent(ctx, t, targetKey, wantEvent, nil, http.StatusOK, delay) } // VerifyAndRespondNextTargetEvent verifies the next event the subscriber receives and replies with the given parameters. // If wantEvent is nil, then it means such an event is not expected. // This function is blocking and should be invoked in a separate goroutine with context timeout. -func (h *Helper) VerifyAndRespondNextTargetEvent(ctx context.Context, t *testing.T, targetKey string, wantEvent, replyEvent *event.Event, statusCode int) { +func (h *Helper) VerifyAndRespondNextTargetEvent(ctx context.Context, t *testing.T, targetKey string, wantEvent, replyEvent *event.Event, statusCode int, delay time.Duration) { t.Helper() consumer, ok := h.consumers[targetKey] @@ -425,12 +403,13 @@ func (h *Helper) VerifyAndRespondNextTargetEvent(ctx context.Context, t *testing // In case Receive is stopped. return } - msg.Finish(nil) gotEvent, err = binding.ToEvent(ctx, msg) if err != nil { t.Errorf("target (key=%q) received invalid cloudevent: %v", targetKey, err) } + time.Sleep(delay) + var replyMsg binding.Message if replyEvent != nil { replyMsg = binding.ToMessage(replyEvent) @@ -438,6 +417,7 @@ func (h *Helper) VerifyAndRespondNextTargetEvent(ctx context.Context, t *testing if err := respFn(ctx, replyMsg, &cehttp.Result{StatusCode: statusCode}); err != nil { t.Errorf("unexpected error from responding target (key=%q) event: %v", targetKey, err) } + msg.Finish(nil) } // VerifyNextTargetRetryEvent verifies the next event the target retry queue receives. diff --git a/pkg/broker/handler/processors/deliver/processor.go b/pkg/broker/handler/processors/deliver/processor.go index 8c618329c5..24d1babba4 100644 --- a/pkg/broker/handler/processors/deliver/processor.go +++ b/pkg/broker/handler/processors/deliver/processor.go @@ -51,8 +51,8 @@ type Processor struct { // to the retry topic. DeliverRetryClient ceclient.Client - // Timeout is the delivery timeout. - // If set, the delivery will be time-boxed. + // DeliverTimeout is the timeout applied to cancel delivery. + // If zero, not additional timeout is applied. DeliverTimeout time.Duration }