Skip to content
This repository has been archived by the owner on Jun 19, 2022. It is now read-only.

Add client side backoff retries for pubsub messages #1218

Merged
merged 10 commits into from
Jun 9, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/broker/fanout/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func main() {
volume.WithPath(env.TargetsConfigPath),
volume.WithNotifyChan(targetsUpdateCh),
},
buildPoolOptions(env)...,
buildHandlerOptions(env)...,
)
if err != nil {
logger.Fatal("Failed to create fanout sync pool", zap.Error(err))
Expand Down Expand Up @@ -124,7 +124,7 @@ func poolSyncSignal(ctx context.Context, targetsUpdateCh chan struct{}) chan str
return ch
}

func buildPoolOptions(env envConfig) []handler.Option {
func buildHandlerOptions(env envConfig) []handler.Option {
rs := pubsub.DefaultReceiveSettings
var opts []handler.Option
if env.HandlerConcurrency > 0 {
Expand Down
11 changes: 9 additions & 2 deletions cmd/broker/retry/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type envConfig struct {

// Max to 10m.
TimeoutPerEvent time.Duration `envconfig:"TIMEOUT_PER_EVENT"`

MinRetryBackoff time.Duration `envconfig:"MIN_RETRY_BACKOFF" default:"1s"`
MaxRetryBackoff time.Duration `envconfig:"MAX_RETRY_BACKOFF" default:"30s"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if this should be a bit longer, like a few minutes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How long would you suggest? I don't have a preference. I simply don't want to over blocking the queue because of the limitation we have today.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to 1m now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just think if the backoff reaches 30s, it's very likely the issue is not transient, and some longer backoff is probably better.

But just a guess. We can always tune this later. 1min seems good to me.

}

func main() {
Expand Down Expand Up @@ -93,7 +96,7 @@ func main() {
volume.WithPath(env.TargetsConfigPath),
volume.WithNotifyChan(targetsUpdateCh),
},
buildPoolOptions(env)...,
buildHandlerOptions(env)...,
)
if err != nil {
logger.Fatal("Failed to get retry sync pool", zap.Error(err))
Expand Down Expand Up @@ -127,7 +130,7 @@ func poolSyncSignal(ctx context.Context, targetsUpdateCh chan struct{}) chan str
return ch
}

func buildPoolOptions(env envConfig) []handler.Option {
func buildHandlerOptions(env envConfig) []handler.Option {
rs := pubsub.DefaultReceiveSettings
// If Synchronous is true, then no more than MaxOutstandingMessages will be in memory at one time.
// MaxOutstandingBytes still refers to the total bytes processed, rather than in memory.
Expand All @@ -144,6 +147,10 @@ func buildPoolOptions(env envConfig) []handler.Option {
if env.TimeoutPerEvent > 0 {
opts = append(opts, handler.WithTimeoutPerEvent(env.TimeoutPerEvent))
}
opts = append(opts, handler.WithRetryPolicy(handler.RetryPolicy{
MinBackoff: env.MinRetryBackoff,
MaxBackoff: env.MaxRetryBackoff,
}))
opts = append(opts, handler.WithPubsubReceiveSettings(rs))
// The default CeClient is good?
return opts
Expand Down
6 changes: 3 additions & 3 deletions pkg/broker/handler/fanout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

"github.com/google/knative-gcp/pkg/broker/config"
"github.com/google/knative-gcp/pkg/broker/eventutil"
pooltesting "github.com/google/knative-gcp/pkg/broker/handler/testing"
handlertesting "github.com/google/knative-gcp/pkg/broker/handler/testing"
reportertest "github.com/google/knative-gcp/pkg/metrics/testing"

_ "knative.dev/pkg/metrics/testing"
Expand All @@ -44,7 +44,7 @@ func TestFanoutWatchAndSync(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testProject := "test-project"
helper, err := pooltesting.NewHelper(ctx, testProject)
helper, err := handlertesting.NewHelper(ctx, testProject)
if err != nil {
t.Fatalf("failed to create pool testing helper: %v", err)
}
Expand Down Expand Up @@ -123,7 +123,7 @@ func TestFanoutSyncPoolE2E(t *testing.T) {
defer cancel()
testProject := "test-project"

helper, err := pooltesting.NewHelper(ctx, testProject)
helper, err := handlertesting.NewHelper(ctx, testProject)
if err != nil {
t.Fatalf("failed to create pool testing helper: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/broker/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (h *Handler) receive(ctx context.Context, msg *pubsub.Message) {
}
if err := h.Processor.Process(ctx, event); err != nil {
backoffPeriod := h.retryLimiter.When(msg.ID)
logging.FromContext(ctx).Error("failed to process event; backoff nack", zap.Any("event", event), zap.Float64("backoffPeriod", backoffPeriod.Seconds()), zap.Error(err))
logging.FromContext(ctx).Error("failed to process event; backoff nack", zap.String("eventID", event.ID()), zap.Duration("backoffPeriod", backoffPeriod), zap.Error(err))
h.delayNack(backoffPeriod)
msg.Nack()
return
Expand Down
18 changes: 12 additions & 6 deletions pkg/broker/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,18 @@ func TestHandler(t *testing.T) {
})
}

type alwaysErrProc struct {
type firstNErrProc struct {
processors.BaseProcessor
desiredErrCount, currErrCount int
successSignal chan struct{}
}

func (p *alwaysErrProc) Process(_ context.Context, _ *event.Event) error {
func (p *firstNErrProc) Process(_ context.Context, _ *event.Event) error {
if p.currErrCount < p.desiredErrCount {
p.currErrCount++
return errors.New("always error")
}
p.successSignal <- struct{}{}
return nil
yolocs marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down Expand Up @@ -197,7 +199,11 @@ func TestRetryBackoff(t *testing.T) {

delays := []time.Duration{}
desiredErrCount := 8
processor := &alwaysErrProc{desiredErrCount: desiredErrCount}
successSignal := make(chan struct{})
processor := &firstNErrProc{
desiredErrCount: desiredErrCount,
successSignal: successSignal,
}
h := NewHandler(sub, processor, time.Second, RetryPolicy{MinBackoff: time.Millisecond, MaxBackoff: 16 * time.Millisecond})
// Mock sleep func to collect nack backoffs.
h.delayNack = func(d time.Duration) {
Expand All @@ -206,7 +212,7 @@ func TestRetryBackoff(t *testing.T) {
h.Start(ctx, func(err error) {})
defer h.Stop()
if !h.IsAlive() {
t.Error("start handler didn't bring it alive")
t.Fatal("start handler didn't bring it alive")
}

testEvent := event.New()
Expand All @@ -216,12 +222,12 @@ func TestRetryBackoff(t *testing.T) {
testEvent.SetType("type")

if err := p.Send(ctx, binding.ToMessage(&testEvent)); err != nil {
t.Errorf("failed to seed event to pubsub: %v", err)
t.Fatalf("failed to seed event to pubsub: %v", err)
}

// Wait until all desired errors were returned.
// Then stop the handler by cancel the context.
time.Sleep(time.Second)
<-successSignal
cancel()

if len(delays) != desiredErrCount {
Expand Down