Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Commit

Permalink
Have broker fanout handle timeout gracefully for each individual targ…
Browse files Browse the repository at this point in the history
…et (#960)

* Have broker fanout handle timeout gracefully for each individual target

* fix comment

* fix renaming bug

* address comments
  • Loading branch information
yolocs authored Apr 30, 2020
1 parent 88c9041 commit aa7d86b
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 29 deletions.
14 changes: 8 additions & 6 deletions cmd/broker/fanout/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@ var (
)

type envConfig struct {
PodName string `envconfig:"POD_NAME" required:"true"`
ProjectID string `envconfig:"PROJECT_ID"`
TargetsConfigPath string `envconfig:"TARGETS_CONFIG_PATH" default:"/var/run/cloud-run-events/broker/targets"`
HandlerConcurrency int `envconfig:"HANDLER_CONCURRENCY"`
MaxConcurrencyPerEvent int `envconfig:"MAX_CONCURRENCY_PER_EVENT"`
TimeoutPerEvent time.Duration `envconfig:"TIMEOUT_PER_EVENT"`
PodName string `envconfig:"POD_NAME" required:"true"`
ProjectID string `envconfig:"PROJECT_ID"`
TargetsConfigPath string `envconfig:"TARGETS_CONFIG_PATH" default:"/var/run/cloud-run-events/broker/targets"`
HandlerConcurrency int `envconfig:"HANDLER_CONCURRENCY"`
MaxConcurrencyPerEvent int `envconfig:"MAX_CONCURRENCY_PER_EVENT"`

// Max to 10m.
TimeoutPerEvent time.Duration `envconfig:"TIMEOUT_PER_EVENT"`
}

func main() {
Expand Down
12 changes: 7 additions & 5 deletions cmd/broker/retry/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@ var (
)

type envConfig struct {
PodName string `envconfig:"POD_NAME" required:"true"`
ProjectID string `envconfig:"PROJECT_ID"`
TargetsConfigPath string `envconfig:"TARGETS_CONFIG_PATH" default:"/var/run/cloud-run-events/broker/targets"`
HandlerConcurrency int `envconfig:"HANDLER_CONCURRENCY"`
TimeoutPerEvent time.Duration `envconfig:"TIMEOUT_PER_EVENT"`
PodName string `envconfig:"POD_NAME" required:"true"`
ProjectID string `envconfig:"PROJECT_ID"`
TargetsConfigPath string `envconfig:"TARGETS_CONFIG_PATH" default:"/var/run/cloud-run-events/broker/targets"`
HandlerConcurrency int `envconfig:"HANDLER_CONCURRENCY"`

// Max to 10m.
TimeoutPerEvent time.Duration `envconfig:"TIMEOUT_PER_EVENT"`
}

func main() {
Expand Down
14 changes: 14 additions & 0 deletions pkg/broker/handler/pool/fanout/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"sync"
"time"

ceclient "github.com/cloudevents/sdk-go/v2/client"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
Expand Down Expand Up @@ -52,6 +53,12 @@ type SyncPool struct {
// For initial events delivery. We only need a shared client.
// And we can set target address dynamically.
deliverClient ceclient.Client
// For fanout delivery, we need a slightly shorter timeout
// than the handler timeout per event.
// It allows the delivery processor to timeout the delivery
// before the handler nacks the pubsub message, which will
// cause event re-delivery for all targets.
deliverTimeout time.Duration
}

type handlerCache struct {
Expand Down Expand Up @@ -102,11 +109,17 @@ func NewSyncPool(ctx context.Context, targets config.ReadonlyTargets, opts ...po
return nil, err
}

if options.TimeoutPerEvent < 5*time.Second {
return nil, fmt.Errorf("timeout per event cannot be lower than %v", 5*time.Second)
}

p := &SyncPool{
targets: targets,
options: options,
deliverClient: deliverClient,
deliverRetryClient: retryClient,
// Set the deliver timeout slightly less than the total timeout for each event.
deliverTimeout: options.TimeoutPerEvent - (5 * time.Second),
}
return p, nil
}
Expand Down Expand Up @@ -173,6 +186,7 @@ func (p *SyncPool) SyncOnce(ctx context.Context) error {
Targets: p.targets,
RetryOnFailure: true,
DeliverRetryClient: p.deliverRetryClient,
DeliverTimeout: p.deliverTimeout,
},
),
},
Expand Down
29 changes: 26 additions & 3 deletions pkg/broker/handler/pool/fanout/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ func TestFanoutSyncPoolE2E(t *testing.T) {
signal := make(chan struct{})
syncPool, err := NewSyncPool(ctx, helper.Targets,
pool.WithPubsubClient(helper.PubsubClient),
pool.WithProjectID(testProject))
pool.WithProjectID(testProject),
pool.WithDeliveryTimeout(500*time.Millisecond),
)
if err != nil {
t.Errorf("unexpected error from getting sync pool: %v", err)
}
Expand Down Expand Up @@ -191,13 +193,34 @@ func TestFanoutSyncPoolE2E(t *testing.T) {
vctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()

go helper.VerifyAndRespondNextTargetEvent(ctx, t, t3.Key(), &e, nil, http.StatusInternalServerError)
go helper.VerifyAndRespondNextTargetEvent(ctx, t, t3.Key(), &e, nil, http.StatusInternalServerError, 0)
go helper.VerifyNextTargetRetryEvent(ctx, t, t3.Key(), &e)

helper.SendEventToDecoupleQueue(ctx, t, b2.Key(), &e)
<-vctx.Done()
})

t.Run("event with delivery timeout was sent to retry queue", func(t *testing.T) {
// Set timeout context so that verification can be done before
// exiting test func.
vctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()

// Verify the event to t1 was received.
go helper.VerifyNextTargetEventAndDelayResp(ctx, t, t1.Key(), &e, time.Second)
// Because of the delay, t1 delivery should timeout.
// Thus the event should have been sent to the retry queue.
go helper.VerifyNextTargetRetryEvent(ctx, t, t1.Key(), &e)
// The same event should be received by t2.
go helper.VerifyNextTargetEvent(vctx, t, t2.Key(), &e)
// But t2 shouldn't receive any retry event because the initial delay
// was successful.
go helper.VerifyNextTargetRetryEvent(vctx, t, t2.Key(), nil)

helper.SendEventToDecoupleQueue(ctx, t, b1.Key(), &e)
<-vctx.Done()
})

t.Run("event replied was sent to broker ingress", func(t *testing.T) {
reply := event.New()
reply.SetSubject("bar")
Expand All @@ -210,7 +233,7 @@ func TestFanoutSyncPoolE2E(t *testing.T) {
vctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()

go helper.VerifyAndRespondNextTargetEvent(ctx, t, t3.Key(), &e, &reply, http.StatusOK)
go helper.VerifyAndRespondNextTargetEvent(ctx, t, t3.Key(), &e, &reply, http.StatusOK, 0)
go helper.VerifyNextBrokerIngressEvent(ctx, t, b2.Key(), &reply)

helper.SendEventToDecoupleQueue(ctx, t, b2.Key(), &e)
Expand Down
22 changes: 21 additions & 1 deletion pkg/broker/handler/pool/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ var (
ceclient.WithTimeNow(),
ceclient.WithTracePropagation(),
}

// This is the pubsub default MaxExtension.
// It would not make sense for handler timeout per event be greater
// than this value because the message would be nacked before the handler
// timeouts.
// TODO: consider allow changing this value?
maxTimeout = 10 * time.Minute
)

// Options holds all the options for create handler pool.
Expand All @@ -49,6 +56,8 @@ type Options struct {
MaxConcurrencyPerEvent int
// TimeoutPerEvent is the timeout for handling an event.
TimeoutPerEvent time.Duration
// DeliveryTimeout is the timeout for delivering an event to a consumer.
DeliveryTimeout time.Duration
// PubsubClient is the pubsub client used to receive pubsub messages.
PubsubClient *pubsub.Client
// PubsubReceiveSettings is the pubsub receive settings.
Expand Down Expand Up @@ -108,7 +117,11 @@ func WithMaxConcurrentPerEvent(c int) Option {
// WithTimeoutPerEvent sets TimeoutPerEvent.
func WithTimeoutPerEvent(t time.Duration) Option {
return func(o *Options) {
o.TimeoutPerEvent = t
if t > maxTimeout {
o.TimeoutPerEvent = maxTimeout
} else {
o.TimeoutPerEvent = t
}
}
}

Expand All @@ -125,3 +138,10 @@ func WithPubsubReceiveSettings(s pubsub.ReceiveSettings) Option {
o.PubsubReceiveSettings = s
}
}

// WithDeliveryTimeout sets the DeliveryTimeout.
func WithDeliveryTimeout(t time.Duration) Option {
return func(o *Options) {
o.DeliveryTimeout = t
}
}
24 changes: 23 additions & 1 deletion pkg/broker/handler/pool/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func TestWithMaxConcurrency(t *testing.T) {
}

func TestWithTimeout(t *testing.T) {
want := 10 * time.Minute
want := 2 * time.Minute
// Always add project id because the default value can only be retrieved on GCE/GKE machines.
opt, err := NewOptions(WithTimeoutPerEvent(want), WithProjectID("pid"))
if err != nil {
Expand All @@ -69,6 +69,16 @@ func TestWithTimeout(t *testing.T) {
if opt.TimeoutPerEvent != want {
t.Errorf("options timeout per event got=%v, want=%v", opt.TimeoutPerEvent, want)
}

// Set timeout greater than the max value and verify it fallbacks to the max value.
// Always add project id because the default value can only be retrieved on GCE/GKE machines.
opt, err = NewOptions(WithTimeoutPerEvent(20*time.Minute), WithProjectID("pid"))
if err != nil {
t.Errorf("NewOptions got unexpected error: %v", err)
}
if opt.TimeoutPerEvent != maxTimeout {
t.Errorf("options timeout per event got=%v, want=%v", opt.TimeoutPerEvent, maxTimeout)
}
}

func TestWithReceiveSettings(t *testing.T) {
Expand Down Expand Up @@ -104,3 +114,15 @@ func TestWithPubsubClient(t *testing.T) {
t.Error("options PubsubClient got=nil, want=non-nil client")
}
}

func TestWithDeliveryTimeout(t *testing.T) {
want := 10 * time.Minute
// Always add project id because the default value can only be retrieved on GCE/GKE machines.
opt, err := NewOptions(WithDeliveryTimeout(want), WithProjectID("pid"))
if err != nil {
t.Errorf("NewOptions got unexpected error: %v", err)
}
if opt.TimeoutPerEvent != want {
t.Errorf("options timeout per event got=%v, want=%v", opt.DeliveryTimeout, want)
}
}
16 changes: 13 additions & 3 deletions pkg/broker/handler/pool/testing/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net/http"
"net/http/httptest"
"testing"
"time"

"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsub/pstest"
Expand Down Expand Up @@ -370,13 +371,20 @@ func (h *Helper) VerifyNextBrokerIngressEvent(ctx context.Context, t *testing.T,
// This function is blocking and should be invoked in a separate goroutine with context timeout.
func (h *Helper) VerifyNextTargetEvent(ctx context.Context, t *testing.T, targetKey string, wantEvent *event.Event) {
t.Helper()
h.VerifyAndRespondNextTargetEvent(ctx, t, targetKey, wantEvent, nil, http.StatusOK)
h.VerifyAndRespondNextTargetEvent(ctx, t, targetKey, wantEvent, nil, http.StatusOK, 0)
}

// VerifyNextTargetEventAndDelayResp verifies the next event the subscriber receives
// but not respond a success infinitely.
func (h *Helper) VerifyNextTargetEventAndDelayResp(ctx context.Context, t *testing.T, targetKey string, wantEvent *event.Event, delay time.Duration) {
t.Helper()
h.VerifyAndRespondNextTargetEvent(ctx, t, targetKey, wantEvent, nil, http.StatusOK, delay)
}

// VerifyAndRespondNextTargetEvent verifies the next event the subscriber receives and replies with the given parameters.
// If wantEvent is nil, then it means such an event is not expected.
// This function is blocking and should be invoked in a separate goroutine with context timeout.
func (h *Helper) VerifyAndRespondNextTargetEvent(ctx context.Context, t *testing.T, targetKey string, wantEvent, replyEvent *event.Event, statusCode int) {
func (h *Helper) VerifyAndRespondNextTargetEvent(ctx context.Context, t *testing.T, targetKey string, wantEvent, replyEvent *event.Event, statusCode int, delay time.Duration) {
t.Helper()

consumer, ok := h.consumers[targetKey]
Expand All @@ -395,19 +403,21 @@ func (h *Helper) VerifyAndRespondNextTargetEvent(ctx context.Context, t *testing
// In case Receive is stopped.
return
}
msg.Finish(nil)
gotEvent, err = binding.ToEvent(ctx, msg)
if err != nil {
t.Errorf("target (key=%q) received invalid cloudevent: %v", targetKey, err)
}

time.Sleep(delay)

var replyMsg binding.Message
if replyEvent != nil {
replyMsg = binding.ToMessage(replyEvent)
}
if err := respFn(ctx, replyMsg, &cehttp.Result{StatusCode: statusCode}); err != nil {
t.Errorf("unexpected error from responding target (key=%q) event: %v", targetKey, err)
}
msg.Finish(nil)
}

// VerifyNextTargetRetryEvent verifies the next event the target retry queue receives.
Expand Down
16 changes: 14 additions & 2 deletions pkg/broker/handler/processors/deliver/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package deliver
import (
"context"
"fmt"
"time"

ceclient "github.com/cloudevents/sdk-go/v2/client"
cecontext "github.com/cloudevents/sdk-go/v2/context"
Expand Down Expand Up @@ -49,6 +50,10 @@ type Processor struct {
// DeliverRetryClient is the cloudevents client to send events
// to the retry topic.
DeliverRetryClient ceclient.Client

// DeliverTimeout is the timeout applied to cancel delivery.
// If zero, not additional timeout is applied.
DeliverTimeout time.Duration
}

var _ processors.Interface = (*Processor)(nil)
Expand Down Expand Up @@ -76,7 +81,14 @@ func (p *Processor) Process(ctx context.Context, event *event.Event) error {
return nil
}

resp, res := p.DeliverClient.Request(cecontext.WithTarget(ctx, target.Address), *event)
dctx := ctx
if p.DeliverTimeout > 0 {
var cancel context.CancelFunc
dctx, cancel = context.WithTimeout(dctx, p.DeliverTimeout)
defer cancel()
}

resp, res := p.DeliverClient.Request(cecontext.WithTarget(dctx, target.Address), *event)
if !protocol.IsACK(res) {
if !p.RetryOnFailure {
return fmt.Errorf("target delivery failed: %v", res.Error())
Expand All @@ -89,7 +101,7 @@ func (p *Processor) Process(ctx context.Context, event *event.Event) error {
return nil
}

if res := p.DeliverClient.Send(cecontext.WithTarget(ctx, broker.Address), *resp); !protocol.IsACK(res) {
if res := p.DeliverClient.Send(cecontext.WithTarget(dctx, broker.Address), *resp); !protocol.IsACK(res) {
if !p.RetryOnFailure {
return fmt.Errorf("delivery of replied event failed: %v", res.Error())
}
Expand Down
Loading

0 comments on commit aa7d86b

Please sign in to comment.