Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Have broker fanout handle timeout gracefully for each individual target #960

Merged
merged 4 commits into from
Apr 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions cmd/broker/fanout/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,14 @@ var (
)

type envConfig struct {
PodName string `envconfig:"POD_NAME" required:"true"`
ProjectID string `envconfig:"PROJECT_ID"`
TargetsConfigPath string `envconfig:"TARGETS_CONFIG_PATH" default:"/var/run/cloud-run-events/broker/targets"`
HandlerConcurrency int `envconfig:"HANDLER_CONCURRENCY"`
MaxConcurrencyPerEvent int `envconfig:"MAX_CONCURRENCY_PER_EVENT"`
TimeoutPerEvent time.Duration `envconfig:"TIMEOUT_PER_EVENT"`
PodName string `envconfig:"POD_NAME" required:"true"`
ProjectID string `envconfig:"PROJECT_ID"`
TargetsConfigPath string `envconfig:"TARGETS_CONFIG_PATH" default:"/var/run/cloud-run-events/broker/targets"`
HandlerConcurrency int `envconfig:"HANDLER_CONCURRENCY"`
MaxConcurrencyPerEvent int `envconfig:"MAX_CONCURRENCY_PER_EVENT"`

// Max to 10m.
TimeoutPerEvent time.Duration `envconfig:"TIMEOUT_PER_EVENT"`
yolocs marked this conversation as resolved.
Show resolved Hide resolved
yolocs marked this conversation as resolved.
Show resolved Hide resolved
}

func main() {
Expand Down
12 changes: 7 additions & 5 deletions cmd/broker/retry/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,13 @@ var (
)

type envConfig struct {
PodName string `envconfig:"POD_NAME" required:"true"`
ProjectID string `envconfig:"PROJECT_ID"`
TargetsConfigPath string `envconfig:"TARGETS_CONFIG_PATH" default:"/var/run/cloud-run-events/broker/targets"`
HandlerConcurrency int `envconfig:"HANDLER_CONCURRENCY"`
TimeoutPerEvent time.Duration `envconfig:"TIMEOUT_PER_EVENT"`
PodName string `envconfig:"POD_NAME" required:"true"`
ProjectID string `envconfig:"PROJECT_ID"`
TargetsConfigPath string `envconfig:"TARGETS_CONFIG_PATH" default:"/var/run/cloud-run-events/broker/targets"`
HandlerConcurrency int `envconfig:"HANDLER_CONCURRENCY"`

// Max to 10m.
TimeoutPerEvent time.Duration `envconfig:"TIMEOUT_PER_EVENT"`
}

func main() {
Expand Down
14 changes: 14 additions & 0 deletions pkg/broker/handler/pool/fanout/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"sync"
"time"

ceclient "github.com/cloudevents/sdk-go/v2/client"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
Expand Down Expand Up @@ -52,6 +53,12 @@ type SyncPool struct {
// For initial events delivery. We only need a shared client.
// And we can set target address dynamically.
deliverClient ceclient.Client
// For fanout delivery, we need a slightly shorter timeout
// than the handler timeout per event.
// It allows the delivery processor to timeout the delivery
// before the handler nacks the pubsub message, which will
// cause event re-delivery for all targets.
deliverTimeout time.Duration
}

type handlerCache struct {
Expand Down Expand Up @@ -102,11 +109,17 @@ func NewSyncPool(ctx context.Context, targets config.ReadonlyTargets, opts ...po
return nil, err
}

if options.TimeoutPerEvent < 5*time.Second {
return nil, fmt.Errorf("timeout per event cannot be lower than %v", 5*time.Second)
}

p := &SyncPool{
targets: targets,
options: options,
deliverClient: deliverClient,
deliverRetryClient: retryClient,
// Set the deliver timeout slightly less than the total timeout for each event.
deliverTimeout: options.TimeoutPerEvent - (5 * time.Second),
yolocs marked this conversation as resolved.
Show resolved Hide resolved
}
return p, nil
}
Expand Down Expand Up @@ -173,6 +186,7 @@ func (p *SyncPool) SyncOnce(ctx context.Context) error {
Targets: p.targets,
RetryOnFailure: true,
DeliverRetryClient: p.deliverRetryClient,
DeliverTimeout: p.deliverTimeout,
},
),
},
Expand Down
29 changes: 26 additions & 3 deletions pkg/broker/handler/pool/fanout/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ func TestFanoutSyncPoolE2E(t *testing.T) {
signal := make(chan struct{})
syncPool, err := NewSyncPool(ctx, helper.Targets,
pool.WithPubsubClient(helper.PubsubClient),
pool.WithProjectID(testProject))
pool.WithProjectID(testProject),
pool.WithDeliveryTimeout(500*time.Millisecond),
)
if err != nil {
t.Errorf("unexpected error from getting sync pool: %v", err)
}
Expand Down Expand Up @@ -191,13 +193,34 @@ func TestFanoutSyncPoolE2E(t *testing.T) {
vctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()

go helper.VerifyAndRespondNextTargetEvent(ctx, t, t3.Key(), &e, nil, http.StatusInternalServerError)
go helper.VerifyAndRespondNextTargetEvent(ctx, t, t3.Key(), &e, nil, http.StatusInternalServerError, 0)
go helper.VerifyNextTargetRetryEvent(ctx, t, t3.Key(), &e)

helper.SendEventToDecoupleQueue(ctx, t, b2.Key(), &e)
<-vctx.Done()
})

t.Run("event with delivery timeout was sent to retry queue", func(t *testing.T) {
// Set timeout context so that verification can be done before
// exiting test func.
vctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()

// Verify the event to t1 was received.
go helper.VerifyNextTargetEventAndDelayResp(ctx, t, t1.Key(), &e, time.Second)
// Because of the delay, t1 delivery should timeout.
// Thus the event should have been sent to the retry queue.
go helper.VerifyNextTargetRetryEvent(ctx, t, t1.Key(), &e)
// The same event should be received by t2.
go helper.VerifyNextTargetEvent(vctx, t, t2.Key(), &e)
// But t2 shouldn't receive any retry event because the initial delay
// was successful.
go helper.VerifyNextTargetRetryEvent(vctx, t, t2.Key(), nil)

helper.SendEventToDecoupleQueue(ctx, t, b1.Key(), &e)
<-vctx.Done()
})

t.Run("event replied was sent to broker ingress", func(t *testing.T) {
reply := event.New()
reply.SetSubject("bar")
Expand All @@ -210,7 +233,7 @@ func TestFanoutSyncPoolE2E(t *testing.T) {
vctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()

go helper.VerifyAndRespondNextTargetEvent(ctx, t, t3.Key(), &e, &reply, http.StatusOK)
go helper.VerifyAndRespondNextTargetEvent(ctx, t, t3.Key(), &e, &reply, http.StatusOK, 0)
go helper.VerifyNextBrokerIngressEvent(ctx, t, b2.Key(), &reply)

helper.SendEventToDecoupleQueue(ctx, t, b2.Key(), &e)
Expand Down
22 changes: 21 additions & 1 deletion pkg/broker/handler/pool/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ var (
ceclient.WithTimeNow(),
ceclient.WithTracePropagation(),
}

// This is the pubsub default MaxExtension.
// It would not make sense for handler timeout per event be greater
// than this value because the message would be nacked before the handler
// timeouts.
// TODO: consider allow changing this value?
maxTimeout = 10 * time.Minute
)

// Options holds all the options for create handler pool.
Expand All @@ -49,6 +56,8 @@ type Options struct {
MaxConcurrencyPerEvent int
// TimeoutPerEvent is the timeout for handling an event.
TimeoutPerEvent time.Duration
// DeliveryTimeout is the timeout for delivering an event to a consumer.
DeliveryTimeout time.Duration
// PubsubClient is the pubsub client used to receive pubsub messages.
PubsubClient *pubsub.Client
// PubsubReceiveSettings is the pubsub receive settings.
Expand Down Expand Up @@ -108,7 +117,11 @@ func WithMaxConcurrentPerEvent(c int) Option {
// WithTimeoutPerEvent sets TimeoutPerEvent.
func WithTimeoutPerEvent(t time.Duration) Option {
return func(o *Options) {
o.TimeoutPerEvent = t
if t > maxTimeout {
o.TimeoutPerEvent = maxTimeout
} else {
o.TimeoutPerEvent = t
}
}
}

Expand All @@ -125,3 +138,10 @@ func WithPubsubReceiveSettings(s pubsub.ReceiveSettings) Option {
o.PubsubReceiveSettings = s
}
}

// WithDeliveryTimeout sets the DeliveryTimeout.
func WithDeliveryTimeout(t time.Duration) Option {
return func(o *Options) {
o.DeliveryTimeout = t
}
}
24 changes: 23 additions & 1 deletion pkg/broker/handler/pool/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestWithMaxConcurrency(t *testing.T) {
}

func TestWithTimeout(t *testing.T) {
want := 10 * time.Minute
want := 2 * time.Minute
// Always add project id because the default value can only be retrieved on GCE/GKE machines.
opt, err := NewOptions(WithTimeoutPerEvent(want), WithProjectID("pid"))
if err != nil {
Expand All @@ -69,6 +69,16 @@ func TestWithTimeout(t *testing.T) {
if opt.TimeoutPerEvent != want {
t.Errorf("options timeout per event got=%v, want=%v", opt.TimeoutPerEvent, want)
}

// Set timeout greater than the max value and verify it fallbacks to the max value.
// Always add project id because the default value can only be retrieved on GCE/GKE machines.
opt, err = NewOptions(WithTimeoutPerEvent(20*time.Minute), WithProjectID("pid"))
if err != nil {
t.Errorf("NewOptions got unexpected error: %v", err)
}
if opt.TimeoutPerEvent != maxTimeout {
t.Errorf("options timeout per event got=%v, want=%v", opt.TimeoutPerEvent, maxTimeout)
}
}

func TestWithReceiveSettings(t *testing.T) {
Expand Down Expand Up @@ -104,3 +114,15 @@ func TestWithPubsubClient(t *testing.T) {
t.Error("options PubsubClient got=nil, want=non-nil client")
}
}

func TestWithDeliveryTimeout(t *testing.T) {
want := 10 * time.Minute
// Always add project id because the default value can only be retrieved on GCE/GKE machines.
opt, err := NewOptions(WithDeliveryTimeout(want), WithProjectID("pid"))
if err != nil {
t.Errorf("NewOptions got unexpected error: %v", err)
}
if opt.TimeoutPerEvent != want {
t.Errorf("options timeout per event got=%v, want=%v", opt.DeliveryTimeout, want)
}
}
16 changes: 13 additions & 3 deletions pkg/broker/handler/pool/testing/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net/http"
"net/http/httptest"
"testing"
"time"

"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsub/pstest"
Expand Down Expand Up @@ -370,13 +371,20 @@ func (h *Helper) VerifyNextBrokerIngressEvent(ctx context.Context, t *testing.T,
// This function is blocking and should be invoked in a separate goroutine with context timeout.
func (h *Helper) VerifyNextTargetEvent(ctx context.Context, t *testing.T, targetKey string, wantEvent *event.Event) {
t.Helper()
h.VerifyAndRespondNextTargetEvent(ctx, t, targetKey, wantEvent, nil, http.StatusOK)
h.VerifyAndRespondNextTargetEvent(ctx, t, targetKey, wantEvent, nil, http.StatusOK, 0)
}

// VerifyNextTargetEventAndDelayResp verifies the next event the subscriber receives
// but not respond a success infinitely.
func (h *Helper) VerifyNextTargetEventAndDelayResp(ctx context.Context, t *testing.T, targetKey string, wantEvent *event.Event, delay time.Duration) {
t.Helper()
yolocs marked this conversation as resolved.
Show resolved Hide resolved
h.VerifyAndRespondNextTargetEvent(ctx, t, targetKey, wantEvent, nil, http.StatusOK, delay)
}

// VerifyAndRespondNextTargetEvent verifies the next event the subscriber receives and replies with the given parameters.
// If wantEvent is nil, then it means such an event is not expected.
// This function is blocking and should be invoked in a separate goroutine with context timeout.
func (h *Helper) VerifyAndRespondNextTargetEvent(ctx context.Context, t *testing.T, targetKey string, wantEvent, replyEvent *event.Event, statusCode int) {
func (h *Helper) VerifyAndRespondNextTargetEvent(ctx context.Context, t *testing.T, targetKey string, wantEvent, replyEvent *event.Event, statusCode int, delay time.Duration) {
t.Helper()

consumer, ok := h.consumers[targetKey]
Expand All @@ -395,19 +403,21 @@ func (h *Helper) VerifyAndRespondNextTargetEvent(ctx context.Context, t *testing
// In case Receive is stopped.
return
}
msg.Finish(nil)
gotEvent, err = binding.ToEvent(ctx, msg)
if err != nil {
t.Errorf("target (key=%q) received invalid cloudevent: %v", targetKey, err)
}

time.Sleep(delay)

var replyMsg binding.Message
if replyEvent != nil {
replyMsg = binding.ToMessage(replyEvent)
}
if err := respFn(ctx, replyMsg, &cehttp.Result{StatusCode: statusCode}); err != nil {
t.Errorf("unexpected error from responding target (key=%q) event: %v", targetKey, err)
}
msg.Finish(nil)
}

// VerifyNextTargetRetryEvent verifies the next event the target retry queue receives.
Expand Down
16 changes: 14 additions & 2 deletions pkg/broker/handler/processors/deliver/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package deliver
import (
"context"
"fmt"
"time"

ceclient "github.com/cloudevents/sdk-go/v2/client"
cecontext "github.com/cloudevents/sdk-go/v2/context"
Expand Down Expand Up @@ -49,6 +50,10 @@ type Processor struct {
// DeliverRetryClient is the cloudevents client to send events
// to the retry topic.
DeliverRetryClient ceclient.Client

// DeliverTimeout is the timeout applied to cancel delivery.
// If zero, not additional timeout is applied.
DeliverTimeout time.Duration
}

var _ processors.Interface = (*Processor)(nil)
Expand Down Expand Up @@ -76,7 +81,14 @@ func (p *Processor) Process(ctx context.Context, event *event.Event) error {
return nil
}

resp, res := p.DeliverClient.Request(cecontext.WithTarget(ctx, target.Address), *event)
dctx := ctx
if p.DeliverTimeout > 0 {
var cancel context.CancelFunc
dctx, cancel = context.WithTimeout(dctx, p.DeliverTimeout)
defer cancel()
}

resp, res := p.DeliverClient.Request(cecontext.WithTarget(dctx, target.Address), *event)
if !protocol.IsACK(res) {
if !p.RetryOnFailure {
return fmt.Errorf("target delivery failed: %v", res.Error())
Expand All @@ -89,7 +101,7 @@ func (p *Processor) Process(ctx context.Context, event *event.Event) error {
return nil
}

if res := p.DeliverClient.Send(cecontext.WithTarget(ctx, broker.Address), *resp); !protocol.IsACK(res) {
if res := p.DeliverClient.Send(cecontext.WithTarget(dctx, broker.Address), *resp); !protocol.IsACK(res) {
if !p.RetryOnFailure {
return fmt.Errorf("delivery of replied event failed: %v", res.Error())
}
Expand Down
Loading