From aa7d86b360964c9891af1065c3ba32359c446ec9 Mon Sep 17 00:00:00 2001 From: cshou Date: Thu, 30 Apr 2020 10:37:42 -0700 Subject: [PATCH] Have broker fanout handle timeout gracefully for each individual target (#960) * Have broker fanout handle timeout gracefully for each individual target * fix comment * fix renaming bug * address comments --- cmd/broker/fanout/main.go | 14 ++++--- cmd/broker/retry/main.go | 12 +++--- pkg/broker/handler/pool/fanout/pool.go | 14 +++++++ pkg/broker/handler/pool/fanout/pool_test.go | 29 +++++++++++-- pkg/broker/handler/pool/options.go | 22 +++++++++- pkg/broker/handler/pool/options_test.go | 24 ++++++++++- pkg/broker/handler/pool/testing/helper.go | 16 +++++-- .../handler/processors/deliver/processor.go | 16 ++++++- .../processors/deliver/processor_test.go | 42 +++++++++++++++---- 9 files changed, 160 insertions(+), 29 deletions(-) diff --git a/cmd/broker/fanout/main.go b/cmd/broker/fanout/main.go index 1c14f3a5e4..9362b2275a 100644 --- a/cmd/broker/fanout/main.go +++ b/cmd/broker/fanout/main.go @@ -52,12 +52,14 @@ var ( ) type envConfig struct { - PodName string `envconfig:"POD_NAME" required:"true"` - ProjectID string `envconfig:"PROJECT_ID"` - TargetsConfigPath string `envconfig:"TARGETS_CONFIG_PATH" default:"/var/run/cloud-run-events/broker/targets"` - HandlerConcurrency int `envconfig:"HANDLER_CONCURRENCY"` - MaxConcurrencyPerEvent int `envconfig:"MAX_CONCURRENCY_PER_EVENT"` - TimeoutPerEvent time.Duration `envconfig:"TIMEOUT_PER_EVENT"` + PodName string `envconfig:"POD_NAME" required:"true"` + ProjectID string `envconfig:"PROJECT_ID"` + TargetsConfigPath string `envconfig:"TARGETS_CONFIG_PATH" default:"/var/run/cloud-run-events/broker/targets"` + HandlerConcurrency int `envconfig:"HANDLER_CONCURRENCY"` + MaxConcurrencyPerEvent int `envconfig:"MAX_CONCURRENCY_PER_EVENT"` + + // Max to 10m. + TimeoutPerEvent time.Duration `envconfig:"TIMEOUT_PER_EVENT"` } func main() { diff --git a/cmd/broker/retry/main.go b/cmd/broker/retry/main.go index a4137ec808..56ad0ab489 100644 --- a/cmd/broker/retry/main.go +++ b/cmd/broker/retry/main.go @@ -53,11 +53,13 @@ var ( ) type envConfig struct { - PodName string `envconfig:"POD_NAME" required:"true"` - ProjectID string `envconfig:"PROJECT_ID"` - TargetsConfigPath string `envconfig:"TARGETS_CONFIG_PATH" default:"/var/run/cloud-run-events/broker/targets"` - HandlerConcurrency int `envconfig:"HANDLER_CONCURRENCY"` - TimeoutPerEvent time.Duration `envconfig:"TIMEOUT_PER_EVENT"` + PodName string `envconfig:"POD_NAME" required:"true"` + ProjectID string `envconfig:"PROJECT_ID"` + TargetsConfigPath string `envconfig:"TARGETS_CONFIG_PATH" default:"/var/run/cloud-run-events/broker/targets"` + HandlerConcurrency int `envconfig:"HANDLER_CONCURRENCY"` + + // Max to 10m. + TimeoutPerEvent time.Duration `envconfig:"TIMEOUT_PER_EVENT"` } func main() { diff --git a/pkg/broker/handler/pool/fanout/pool.go b/pkg/broker/handler/pool/fanout/pool.go index 1c85a0cefc..fd0ee87c27 100644 --- a/pkg/broker/handler/pool/fanout/pool.go +++ b/pkg/broker/handler/pool/fanout/pool.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "sync" + "time" ceclient "github.com/cloudevents/sdk-go/v2/client" cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" @@ -52,6 +53,12 @@ type SyncPool struct { // For initial events delivery. We only need a shared client. // And we can set target address dynamically. deliverClient ceclient.Client + // For fanout delivery, we need a slightly shorter timeout + // than the handler timeout per event. + // It allows the delivery processor to timeout the delivery + // before the handler nacks the pubsub message, which will + // cause event re-delivery for all targets. + deliverTimeout time.Duration } type handlerCache struct { @@ -102,11 +109,17 @@ func NewSyncPool(ctx context.Context, targets config.ReadonlyTargets, opts ...po return nil, err } + if options.TimeoutPerEvent < 5*time.Second { + return nil, fmt.Errorf("timeout per event cannot be lower than %v", 5*time.Second) + } + p := &SyncPool{ targets: targets, options: options, deliverClient: deliverClient, deliverRetryClient: retryClient, + // Set the deliver timeout slightly less than the total timeout for each event. + deliverTimeout: options.TimeoutPerEvent - (5 * time.Second), } return p, nil } @@ -173,6 +186,7 @@ func (p *SyncPool) SyncOnce(ctx context.Context) error { Targets: p.targets, RetryOnFailure: true, DeliverRetryClient: p.deliverRetryClient, + DeliverTimeout: p.deliverTimeout, }, ), }, diff --git a/pkg/broker/handler/pool/fanout/pool_test.go b/pkg/broker/handler/pool/fanout/pool_test.go index 197580c3c0..e5e686e57e 100644 --- a/pkg/broker/handler/pool/fanout/pool_test.go +++ b/pkg/broker/handler/pool/fanout/pool_test.go @@ -115,7 +115,9 @@ func TestFanoutSyncPoolE2E(t *testing.T) { signal := make(chan struct{}) syncPool, err := NewSyncPool(ctx, helper.Targets, pool.WithPubsubClient(helper.PubsubClient), - pool.WithProjectID(testProject)) + pool.WithProjectID(testProject), + pool.WithDeliveryTimeout(500*time.Millisecond), + ) if err != nil { t.Errorf("unexpected error from getting sync pool: %v", err) } @@ -191,13 +193,34 @@ func TestFanoutSyncPoolE2E(t *testing.T) { vctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() - go helper.VerifyAndRespondNextTargetEvent(ctx, t, t3.Key(), &e, nil, http.StatusInternalServerError) + go helper.VerifyAndRespondNextTargetEvent(ctx, t, t3.Key(), &e, nil, http.StatusInternalServerError, 0) go helper.VerifyNextTargetRetryEvent(ctx, t, t3.Key(), &e) helper.SendEventToDecoupleQueue(ctx, t, b2.Key(), &e) <-vctx.Done() }) + t.Run("event with delivery timeout was sent to retry queue", func(t *testing.T) { + // Set timeout context so that verification can be done before + // exiting test func. + vctx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + + // Verify the event to t1 was received. + go helper.VerifyNextTargetEventAndDelayResp(ctx, t, t1.Key(), &e, time.Second) + // Because of the delay, t1 delivery should timeout. + // Thus the event should have been sent to the retry queue. + go helper.VerifyNextTargetRetryEvent(ctx, t, t1.Key(), &e) + // The same event should be received by t2. + go helper.VerifyNextTargetEvent(vctx, t, t2.Key(), &e) + // But t2 shouldn't receive any retry event because the initial delay + // was successful. + go helper.VerifyNextTargetRetryEvent(vctx, t, t2.Key(), nil) + + helper.SendEventToDecoupleQueue(ctx, t, b1.Key(), &e) + <-vctx.Done() + }) + t.Run("event replied was sent to broker ingress", func(t *testing.T) { reply := event.New() reply.SetSubject("bar") @@ -210,7 +233,7 @@ func TestFanoutSyncPoolE2E(t *testing.T) { vctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() - go helper.VerifyAndRespondNextTargetEvent(ctx, t, t3.Key(), &e, &reply, http.StatusOK) + go helper.VerifyAndRespondNextTargetEvent(ctx, t, t3.Key(), &e, &reply, http.StatusOK, 0) go helper.VerifyNextBrokerIngressEvent(ctx, t, b2.Key(), &reply) helper.SendEventToDecoupleQueue(ctx, t, b2.Key(), &e) diff --git a/pkg/broker/handler/pool/options.go b/pkg/broker/handler/pool/options.go index 6f54ee31c3..0c39443b15 100644 --- a/pkg/broker/handler/pool/options.go +++ b/pkg/broker/handler/pool/options.go @@ -35,6 +35,13 @@ var ( ceclient.WithTimeNow(), ceclient.WithTracePropagation(), } + + // This is the pubsub default MaxExtension. + // It would not make sense for handler timeout per event be greater + // than this value because the message would be nacked before the handler + // timeouts. + // TODO: consider allow changing this value? + maxTimeout = 10 * time.Minute ) // Options holds all the options for create handler pool. @@ -49,6 +56,8 @@ type Options struct { MaxConcurrencyPerEvent int // TimeoutPerEvent is the timeout for handling an event. TimeoutPerEvent time.Duration + // DeliveryTimeout is the timeout for delivering an event to a consumer. + DeliveryTimeout time.Duration // PubsubClient is the pubsub client used to receive pubsub messages. PubsubClient *pubsub.Client // PubsubReceiveSettings is the pubsub receive settings. @@ -108,7 +117,11 @@ func WithMaxConcurrentPerEvent(c int) Option { // WithTimeoutPerEvent sets TimeoutPerEvent. func WithTimeoutPerEvent(t time.Duration) Option { return func(o *Options) { - o.TimeoutPerEvent = t + if t > maxTimeout { + o.TimeoutPerEvent = maxTimeout + } else { + o.TimeoutPerEvent = t + } } } @@ -125,3 +138,10 @@ func WithPubsubReceiveSettings(s pubsub.ReceiveSettings) Option { o.PubsubReceiveSettings = s } } + +// WithDeliveryTimeout sets the DeliveryTimeout. +func WithDeliveryTimeout(t time.Duration) Option { + return func(o *Options) { + o.DeliveryTimeout = t + } +} diff --git a/pkg/broker/handler/pool/options_test.go b/pkg/broker/handler/pool/options_test.go index db8dd7460a..af9af1767b 100644 --- a/pkg/broker/handler/pool/options_test.go +++ b/pkg/broker/handler/pool/options_test.go @@ -60,7 +60,7 @@ func TestWithMaxConcurrency(t *testing.T) { } func TestWithTimeout(t *testing.T) { - want := 10 * time.Minute + want := 2 * time.Minute // Always add project id because the default value can only be retrieved on GCE/GKE machines. opt, err := NewOptions(WithTimeoutPerEvent(want), WithProjectID("pid")) if err != nil { @@ -69,6 +69,16 @@ func TestWithTimeout(t *testing.T) { if opt.TimeoutPerEvent != want { t.Errorf("options timeout per event got=%v, want=%v", opt.TimeoutPerEvent, want) } + + // Set timeout greater than the max value and verify it fallbacks to the max value. + // Always add project id because the default value can only be retrieved on GCE/GKE machines. + opt, err = NewOptions(WithTimeoutPerEvent(20*time.Minute), WithProjectID("pid")) + if err != nil { + t.Errorf("NewOptions got unexpected error: %v", err) + } + if opt.TimeoutPerEvent != maxTimeout { + t.Errorf("options timeout per event got=%v, want=%v", opt.TimeoutPerEvent, maxTimeout) + } } func TestWithReceiveSettings(t *testing.T) { @@ -104,3 +114,15 @@ func TestWithPubsubClient(t *testing.T) { t.Error("options PubsubClient got=nil, want=non-nil client") } } + +func TestWithDeliveryTimeout(t *testing.T) { + want := 10 * time.Minute + // Always add project id because the default value can only be retrieved on GCE/GKE machines. + opt, err := NewOptions(WithDeliveryTimeout(want), WithProjectID("pid")) + if err != nil { + t.Errorf("NewOptions got unexpected error: %v", err) + } + if opt.TimeoutPerEvent != want { + t.Errorf("options timeout per event got=%v, want=%v", opt.DeliveryTimeout, want) + } +} diff --git a/pkg/broker/handler/pool/testing/helper.go b/pkg/broker/handler/pool/testing/helper.go index 18de76fa2d..6d1c34016a 100644 --- a/pkg/broker/handler/pool/testing/helper.go +++ b/pkg/broker/handler/pool/testing/helper.go @@ -22,6 +22,7 @@ import ( "net/http" "net/http/httptest" "testing" + "time" "cloud.google.com/go/pubsub" "cloud.google.com/go/pubsub/pstest" @@ -370,13 +371,20 @@ func (h *Helper) VerifyNextBrokerIngressEvent(ctx context.Context, t *testing.T, // This function is blocking and should be invoked in a separate goroutine with context timeout. func (h *Helper) VerifyNextTargetEvent(ctx context.Context, t *testing.T, targetKey string, wantEvent *event.Event) { t.Helper() - h.VerifyAndRespondNextTargetEvent(ctx, t, targetKey, wantEvent, nil, http.StatusOK) + h.VerifyAndRespondNextTargetEvent(ctx, t, targetKey, wantEvent, nil, http.StatusOK, 0) +} + +// VerifyNextTargetEventAndDelayResp verifies the next event the subscriber receives +// but not respond a success infinitely. +func (h *Helper) VerifyNextTargetEventAndDelayResp(ctx context.Context, t *testing.T, targetKey string, wantEvent *event.Event, delay time.Duration) { + t.Helper() + h.VerifyAndRespondNextTargetEvent(ctx, t, targetKey, wantEvent, nil, http.StatusOK, delay) } // VerifyAndRespondNextTargetEvent verifies the next event the subscriber receives and replies with the given parameters. // If wantEvent is nil, then it means such an event is not expected. // This function is blocking and should be invoked in a separate goroutine with context timeout. -func (h *Helper) VerifyAndRespondNextTargetEvent(ctx context.Context, t *testing.T, targetKey string, wantEvent, replyEvent *event.Event, statusCode int) { +func (h *Helper) VerifyAndRespondNextTargetEvent(ctx context.Context, t *testing.T, targetKey string, wantEvent, replyEvent *event.Event, statusCode int, delay time.Duration) { t.Helper() consumer, ok := h.consumers[targetKey] @@ -395,12 +403,13 @@ func (h *Helper) VerifyAndRespondNextTargetEvent(ctx context.Context, t *testing // In case Receive is stopped. return } - msg.Finish(nil) gotEvent, err = binding.ToEvent(ctx, msg) if err != nil { t.Errorf("target (key=%q) received invalid cloudevent: %v", targetKey, err) } + time.Sleep(delay) + var replyMsg binding.Message if replyEvent != nil { replyMsg = binding.ToMessage(replyEvent) @@ -408,6 +417,7 @@ func (h *Helper) VerifyAndRespondNextTargetEvent(ctx context.Context, t *testing if err := respFn(ctx, replyMsg, &cehttp.Result{StatusCode: statusCode}); err != nil { t.Errorf("unexpected error from responding target (key=%q) event: %v", targetKey, err) } + msg.Finish(nil) } // VerifyNextTargetRetryEvent verifies the next event the target retry queue receives. diff --git a/pkg/broker/handler/processors/deliver/processor.go b/pkg/broker/handler/processors/deliver/processor.go index 918faba637..24d1babba4 100644 --- a/pkg/broker/handler/processors/deliver/processor.go +++ b/pkg/broker/handler/processors/deliver/processor.go @@ -19,6 +19,7 @@ package deliver import ( "context" "fmt" + "time" ceclient "github.com/cloudevents/sdk-go/v2/client" cecontext "github.com/cloudevents/sdk-go/v2/context" @@ -49,6 +50,10 @@ type Processor struct { // DeliverRetryClient is the cloudevents client to send events // to the retry topic. DeliverRetryClient ceclient.Client + + // DeliverTimeout is the timeout applied to cancel delivery. + // If zero, not additional timeout is applied. + DeliverTimeout time.Duration } var _ processors.Interface = (*Processor)(nil) @@ -76,7 +81,14 @@ func (p *Processor) Process(ctx context.Context, event *event.Event) error { return nil } - resp, res := p.DeliverClient.Request(cecontext.WithTarget(ctx, target.Address), *event) + dctx := ctx + if p.DeliverTimeout > 0 { + var cancel context.CancelFunc + dctx, cancel = context.WithTimeout(dctx, p.DeliverTimeout) + defer cancel() + } + + resp, res := p.DeliverClient.Request(cecontext.WithTarget(dctx, target.Address), *event) if !protocol.IsACK(res) { if !p.RetryOnFailure { return fmt.Errorf("target delivery failed: %v", res.Error()) @@ -89,7 +101,7 @@ func (p *Processor) Process(ctx context.Context, event *event.Event) error { return nil } - if res := p.DeliverClient.Send(cecontext.WithTarget(ctx, broker.Address), *resp); !protocol.IsACK(res) { + if res := p.DeliverClient.Send(cecontext.WithTarget(dctx, broker.Address), *resp); !protocol.IsACK(res) { if !p.RetryOnFailure { return fmt.Errorf("delivery of replied event failed: %v", res.Error()) } diff --git a/pkg/broker/handler/processors/deliver/processor_test.go b/pkg/broker/handler/processors/deliver/processor_test.go index 036ecaeace..6ff3d3f9cb 100644 --- a/pkg/broker/handler/processors/deliver/processor_test.go +++ b/pkg/broker/handler/processors/deliver/processor_test.go @@ -142,21 +142,36 @@ func TestDeliverSuccess(t *testing.T) { func TestDeliverFailure(t *testing.T) { cases := []struct { - name string - withRetry bool - failRetry bool - wantErr bool + name string + withRetry bool + withRespDelay time.Duration + failRetry bool + wantErr bool }{{ - name: "no retry", + name: "delivery error no retry", wantErr: true, }, { - name: "retry success", + name: "delivery error retry success", withRetry: true, }, { - name: "retry failure", + name: "delivery error retry failure", withRetry: true, failRetry: true, wantErr: true, + }, { + name: "delivery timeout no retry", + withRespDelay: time.Second, + wantErr: true, + }, { + name: "delivery timeout retry success", + withRespDelay: time.Second, + withRetry: true, + }, { + name: "delivery timeout retry failure", + withRetry: true, + withRespDelay: time.Second, + failRetry: true, + wantErr: true, }} for _, tc := range cases { @@ -214,6 +229,7 @@ func TestDeliverFailure(t *testing.T) { Targets: testTargets, RetryOnFailure: tc.withRetry, DeliverRetryClient: deliverRetryClient, + DeliverTimeout: 500 * time.Millisecond, } origin := event.New() @@ -230,12 +246,22 @@ func TestDeliverFailure(t *testing.T) { if err != nil { t.Errorf("unexpected error from target receiving event: %v", err) } + defer msg.Finish(nil) + + // If with delay, we reply OK so that we know the error is for sure caused by timeout. + if tc.withRespDelay > 0 { + time.Sleep(tc.withRespDelay) + if err := resp(ctx, nil, &cehttp.Result{StatusCode: http.StatusOK}); err != nil { + t.Errorf("unexpected error from target responding event: %v", err) + } + return + } + // Due to https://github.com/cloudevents/sdk-go/issues/433 // it's not possible to use Receive to easily return error. if err := resp(ctx, nil, &cehttp.Result{StatusCode: http.StatusInternalServerError}); err != nil { t.Errorf("unexpected error from target responding event: %v", err) } - defer msg.Finish(nil) }() err = p.Process(ctx, &origin)