From e884ad143e9ba931951cd38d5bc713479a21e8f9 Mon Sep 17 00:00:00 2001 From: Chen Shou Date: Wed, 3 Jun 2020 22:11:42 +0000 Subject: [PATCH 1/5] Temporary implementation of backoff retry --- pkg/broker/handler/handler.go | 17 +++++- pkg/broker/handler/handler_test.go | 91 ++++++++++++++++++++++++++++++ pkg/broker/handler/pool/fanout.go | 2 + pkg/broker/handler/pool/options.go | 15 +++++ pkg/broker/handler/pool/retry.go | 2 + 5 files changed, 124 insertions(+), 3 deletions(-) diff --git a/pkg/broker/handler/handler.go b/pkg/broker/handler/handler.go index a53ef7c381..c3044fd4a1 100644 --- a/pkg/broker/handler/handler.go +++ b/pkg/broker/handler/handler.go @@ -27,6 +27,7 @@ import ( "github.com/google/knative-gcp/pkg/broker/handler/processors" "github.com/google/knative-gcp/pkg/metrics" "go.uber.org/zap" + "k8s.io/client-go/util/workqueue" "knative.dev/eventing/pkg/logging" ) @@ -43,9 +44,15 @@ type Handler struct { // Timeout is the timeout for processing each individual event. Timeout time.Duration + // RetryLimiter limits how fast to retry failed events. + RetryLimiter workqueue.RateLimiter + + DelayNack func(time.Duration) + // cancel is function to stop pulling messages. - cancel context.CancelFunc - alive atomic.Value + cancel context.CancelFunc + alive atomic.Value + delayNackFunc func(duration time.Duration) } // Start starts the handler. @@ -86,9 +93,13 @@ func (h *Handler) receive(ctx context.Context, msg *pubsub.Message) { defer cancel() } if err := h.Processor.Process(ctx, event); err != nil { - logging.FromContext(ctx).Error("failed to process event", zap.Any("event", event), zap.Error(err)) + backoffPeriod := h.RetryLimiter.When(msg.ID) + logging.FromContext(ctx).Error("failed to process event; backoff nack", zap.Any("event", event), zap.Float64("backoffPeriod", backoffPeriod.Seconds()), zap.Error(err)) + h.DelayNack(backoffPeriod) msg.Nack() return } + + h.RetryLimiter.Forget(msg.ID) msg.Ack() } diff --git a/pkg/broker/handler/handler_test.go b/pkg/broker/handler/handler_test.go index fa88484c00..0b77b93540 100644 --- a/pkg/broker/handler/handler_test.go +++ b/pkg/broker/handler/handler_test.go @@ -18,6 +18,7 @@ package handler import ( "context" + "errors" "testing" "time" @@ -29,6 +30,7 @@ import ( "github.com/google/go-cmp/cmp" "google.golang.org/api/option" "google.golang.org/grpc" + "k8s.io/client-go/util/workqueue" "github.com/google/knative-gcp/pkg/broker/handler/processors" ) @@ -82,6 +84,8 @@ func TestHandler(t *testing.T) { Subscription: sub, Processor: processor, Timeout: time.Second, + RetryLimiter: workqueue.NewItemExponentialFailureRateLimiter(time.Duration(0), time.Duration(0)), + DelayNack: time.Sleep, } h.Start(ctx, func(err error) {}) defer h.Stop() @@ -141,6 +145,93 @@ func TestHandler(t *testing.T) { }) } +type alwaysErrProc struct { + processors.BaseProcessor + desiredErrCount, currErrCount int +} + +func (p *alwaysErrProc) Process(_ context.Context, _ *event.Event) error { + if p.currErrCount < p.desiredErrCount { + p.currErrCount++ + return errors.New("always error") + } + return nil +} + +func TestRetryBackoff(t *testing.T) { + ctx := context.Background() + c, close := testPubsubClient(ctx, t, "test-project") + defer close() + + topic, err := c.CreateTopic(ctx, "test-topic") + if err != nil { + t.Fatalf("failed to create topic: %v", err) + } + sub, err := c.CreateSubscription(ctx, "test-sub", pubsub.SubscriptionConfig{ + Topic: topic, + }) + if err != nil { + t.Fatalf("failed to create subscription: %v", err) + } + + p, err := cepubsub.New(context.Background(), + cepubsub.WithClient(c), + cepubsub.WithProjectID("test-project"), + cepubsub.WithTopicID("test-topic"), + ) + if err != nil { + t.Fatalf("failed to create cloudevents pubsub protocol: %v", err) + } + + delays := []time.Duration{} + delayNack := func(d time.Duration) { + delays = append(delays, d) + } + + desiredErrCount := 8 + processor := &alwaysErrProc{desiredErrCount: desiredErrCount} + h := &Handler{ + Subscription: sub, + Processor: processor, + Timeout: time.Second, + RetryLimiter: workqueue.NewItemExponentialFailureRateLimiter(time.Millisecond, 16*time.Millisecond), + DelayNack: delayNack, + } + h.Start(ctx, func(err error) {}) + defer h.Stop() + if !h.IsAlive() { + t.Error("start handler didn't bring it alive") + } + + testEvent := event.New() + testEvent.SetID("id") + testEvent.SetSource("source") + testEvent.SetSubject("subject") + testEvent.SetType("type") + + if err := p.Send(ctx, binding.ToMessage(&testEvent)); err != nil { + t.Errorf("failed to seed event to pubsub: %v", err) + } + + time.Sleep(time.Second) + + if len(delays) != desiredErrCount { + t.Errorf("retried times got=%d, want=%d", len(delays), desiredErrCount) + } + if delays[0] != time.Millisecond { + t.Errorf("initial nack delay got=%v, want=%v", delays[0], time.Millisecond) + } + for i := 1; i < len(delays); i++ { + wantDelay := 2 * delays[i-1] + if wantDelay > 16*time.Millisecond { + wantDelay = 16 * time.Millisecond + } + if delays[i] != wantDelay { + t.Errorf("delay #%d got=%v, want=%v", i+1, delays[i], wantDelay) + } + } +} + func nextEventWithTimeout(eventCh <-chan *event.Event) *event.Event { select { case <-time.After(30 * time.Second): diff --git a/pkg/broker/handler/pool/fanout.go b/pkg/broker/handler/pool/fanout.go index 3e8e91509a..a126e94692 100644 --- a/pkg/broker/handler/pool/fanout.go +++ b/pkg/broker/handler/pool/fanout.go @@ -26,6 +26,7 @@ import ( "cloud.google.com/go/pubsub" ceclient "github.com/cloudevents/sdk-go/v2/client" "go.uber.org/zap" + "k8s.io/client-go/util/workqueue" "knative.dev/eventing/pkg/logging" "github.com/google/knative-gcp/pkg/broker/config" @@ -159,6 +160,7 @@ func (p *FanoutPool) SyncOnce(ctx context.Context) error { Handler: handler.Handler{ Timeout: p.options.TimeoutPerEvent, Subscription: sub, + RetryLimiter: workqueue.NewItemExponentialFailureRateLimiter(p.options.RetryPolicy.MinBackoff, p.options.RetryPolicy.MaxBackoff), Processor: processors.ChainProcessors( &fanout.Processor{MaxConcurrency: p.options.MaxConcurrencyPerEvent, Targets: p.targets}, &filter.Processor{Targets: p.targets}, diff --git a/pkg/broker/handler/pool/options.go b/pkg/broker/handler/pool/options.go index 726a8073da..0e04f6e10f 100644 --- a/pkg/broker/handler/pool/options.go +++ b/pkg/broker/handler/pool/options.go @@ -36,6 +36,12 @@ var ( maxTimeout = 10 * time.Minute ) +// RetryPolicy defines the retry policy for pubsub messages. +// TODO: https://github.com/google/knative-gcp/issues/1100#issuecomment-638304147 +type RetryPolicy struct { + MinBackoff, MaxBackoff time.Duration +} + // Options holds all the options for create handler pool. type Options struct { // HandlerConcurrency is the number of goroutines @@ -50,6 +56,8 @@ type Options struct { DeliveryTimeout time.Duration // PubsubReceiveSettings is the pubsub receive settings. PubsubReceiveSettings pubsub.ReceiveSettings + // RetryPolicy defines the retry policy for pubsub messages. + RetryPolicy RetryPolicy } // NewOptions creates a Options. @@ -107,3 +115,10 @@ func WithDeliveryTimeout(t time.Duration) Option { o.DeliveryTimeout = t } } + +// WithRetryPolicy sets the RetryPolicy. +func WithRetryPolicy(r RetryPolicy) Option { + return func(o *Options) { + o.RetryPolicy = r + } +} diff --git a/pkg/broker/handler/pool/retry.go b/pkg/broker/handler/pool/retry.go index 9f2de7c556..bb9eea8584 100644 --- a/pkg/broker/handler/pool/retry.go +++ b/pkg/broker/handler/pool/retry.go @@ -23,6 +23,7 @@ import ( "sync" "go.uber.org/zap" + "k8s.io/client-go/util/workqueue" "knative.dev/eventing/pkg/logging" "cloud.google.com/go/pubsub" @@ -139,6 +140,7 @@ func (p *RetryPool) SyncOnce(ctx context.Context) error { Handler: handler.Handler{ Timeout: p.options.TimeoutPerEvent, Subscription: sub, + RetryLimiter: workqueue.NewItemExponentialFailureRateLimiter(p.options.RetryPolicy.MinBackoff, p.options.RetryPolicy.MaxBackoff), Processor: processors.ChainProcessors( &filter.Processor{Targets: p.targets}, &deliver.Processor{ From acc5c28631365ac5c15e29695dda299b9283d185 Mon Sep 17 00:00:00 2001 From: Chen Shou Date: Thu, 4 Jun 2020 18:36:20 +0000 Subject: [PATCH 2/5] refactor --- cmd/broker/fanout/main.go | 18 ++++---- cmd/broker/fanout/wire.go | 10 ++--- cmd/broker/fanout/wire_gen.go | 14 +++--- cmd/broker/retry/main.go | 16 +++---- cmd/broker/retry/wire.go | 8 ++-- cmd/broker/retry/wire_gen.go | 10 ++--- pkg/broker/handler/{pool => }/fanout.go | 43 +++++++++---------- pkg/broker/handler/{pool => }/fanout_test.go | 4 +- pkg/broker/handler/handler.go | 31 +++++++++---- pkg/broker/handler/handler_test.go | 33 ++++++-------- pkg/broker/handler/{pool => }/options.go | 2 +- pkg/broker/handler/{pool => }/options_test.go | 2 +- pkg/broker/handler/{pool => }/pool.go | 2 +- pkg/broker/handler/{pool => }/pool_test.go | 2 +- pkg/broker/handler/{pool => }/providers.go | 2 +- pkg/broker/handler/{pool => }/retry.go | 35 ++++++++------- pkg/broker/handler/{pool => }/retry_test.go | 8 ++-- .../handler/{pool => }/testing/helper.go | 0 pkg/broker/handler/{pool => }/wire.go | 2 +- pkg/broker/handler/{pool => }/wire_gen.go | 2 +- 20 files changed, 124 insertions(+), 120 deletions(-) rename pkg/broker/handler/{pool => }/fanout.go (86%) rename pkg/broker/handler/{pool => }/fanout_test.go (99%) rename pkg/broker/handler/{pool => }/options.go (99%) rename pkg/broker/handler/{pool => }/options_test.go (99%) rename pkg/broker/handler/{pool => }/pool.go (98%) rename pkg/broker/handler/{pool => }/pool_test.go (99%) rename pkg/broker/handler/{pool => }/providers.go (99%) rename pkg/broker/handler/{pool => }/retry.go (89%) rename pkg/broker/handler/{pool => }/retry_test.go (98%) rename pkg/broker/handler/{pool => }/testing/helper.go (100%) rename pkg/broker/handler/{pool => }/wire.go (98%) rename pkg/broker/handler/{pool => }/wire_gen.go (98%) diff --git a/cmd/broker/fanout/main.go b/cmd/broker/fanout/main.go index 76ba61d5d1..7cfca7e863 100644 --- a/cmd/broker/fanout/main.go +++ b/cmd/broker/fanout/main.go @@ -23,7 +23,7 @@ import ( "cloud.google.com/go/pubsub" "github.com/google/knative-gcp/pkg/broker/config/volume" - "github.com/google/knative-gcp/pkg/broker/handler/pool" + "github.com/google/knative-gcp/pkg/broker/handler" metadataClient "github.com/google/knative-gcp/pkg/gclient/metadata" "github.com/google/knative-gcp/pkg/metrics" "github.com/google/knative-gcp/pkg/utils" @@ -70,7 +70,7 @@ func main() { syncSignal := poolSyncSignal(ctx, targetsUpdateCh) syncPool, err := InitializeSyncPool( ctx, - pool.ProjectID(projectID), + handler.ProjectID(projectID), metrics.PodName(env.PodName), metrics.ContainerName(component), []volume.Option{ @@ -82,7 +82,7 @@ func main() { if err != nil { logger.Fatal("Failed to create fanout sync pool", zap.Error(err)) } - if _, err := pool.StartSyncPool(ctx, syncPool, syncSignal); err != nil { + if _, err := handler.StartSyncPool(ctx, syncPool, syncSignal); err != nil { logger.Fatalw("Failed to start fanout sync pool", zap.Error(err)) } @@ -113,21 +113,21 @@ func poolSyncSignal(ctx context.Context, targetsUpdateCh chan struct{}) chan str return ch } -func buildPoolOptions(env envConfig) []pool.Option { +func buildPoolOptions(env envConfig) []handler.Option { rs := pubsub.DefaultReceiveSettings - var opts []pool.Option + var opts []handler.Option if env.HandlerConcurrency > 0 { // Let the pubsub subscription and handler have the same concurrency? - opts = append(opts, pool.WithHandlerConcurrency(env.HandlerConcurrency)) + opts = append(opts, handler.WithHandlerConcurrency(env.HandlerConcurrency)) rs.NumGoroutines = env.HandlerConcurrency } if env.MaxConcurrencyPerEvent > 0 { - opts = append(opts, pool.WithMaxConcurrentPerEvent(env.MaxConcurrencyPerEvent)) + opts = append(opts, handler.WithMaxConcurrentPerEvent(env.MaxConcurrencyPerEvent)) } if env.TimeoutPerEvent > 0 { - opts = append(opts, pool.WithTimeoutPerEvent(env.TimeoutPerEvent)) + opts = append(opts, handler.WithTimeoutPerEvent(env.TimeoutPerEvent)) } - opts = append(opts, pool.WithPubsubReceiveSettings(rs)) + opts = append(opts, handler.WithPubsubReceiveSettings(rs)) // The default CeClient is good? return opts } diff --git a/cmd/broker/fanout/wire.go b/cmd/broker/fanout/wire.go index c4780c4080..08f8fb819e 100644 --- a/cmd/broker/fanout/wire.go +++ b/cmd/broker/fanout/wire.go @@ -22,7 +22,7 @@ import ( "context" "github.com/google/knative-gcp/pkg/broker/config/volume" - "github.com/google/knative-gcp/pkg/broker/handler/pool" + "github.com/google/knative-gcp/pkg/broker/handler" "github.com/google/knative-gcp/pkg/metrics" "github.com/google/wire" ) @@ -31,13 +31,13 @@ import ( // retry pool's pubsub client and uses targetsVolumeOpts to initialize the targets volume watcher. func InitializeSyncPool( ctx context.Context, - projectID pool.ProjectID, + projectID handler.ProjectID, podName metrics.PodName, containerName metrics.ContainerName, targetsVolumeOpts []volume.Option, - opts ...pool.Option, -) (*pool.FanoutPool, error) { + opts ...handler.Option, +) (*handler.FanoutPool, error) { // Implementation generated by wire. Providers for required FanoutPool dependencies should be // added here. - panic(wire.Build(pool.ProviderSet, volume.NewTargetsFromFile, metrics.NewDeliveryReporter)) + panic(wire.Build(handler.ProviderSet, volume.NewTargetsFromFile, metrics.NewDeliveryReporter)) } diff --git a/cmd/broker/fanout/wire_gen.go b/cmd/broker/fanout/wire_gen.go index 3c3c433c27..19272d22a3 100644 --- a/cmd/broker/fanout/wire_gen.go +++ b/cmd/broker/fanout/wire_gen.go @@ -8,24 +8,24 @@ package main import ( "context" "github.com/google/knative-gcp/pkg/broker/config/volume" - "github.com/google/knative-gcp/pkg/broker/handler/pool" + "github.com/google/knative-gcp/pkg/broker/handler" "github.com/google/knative-gcp/pkg/metrics" ) // Injectors from wire.go: -func InitializeSyncPool(ctx context.Context, projectID pool.ProjectID, podName metrics.PodName, containerName metrics.ContainerName, targetsVolumeOpts []volume.Option, opts ...pool.Option) (*pool.FanoutPool, error) { +func InitializeSyncPool(ctx context.Context, projectID handler.ProjectID, podName metrics.PodName, containerName metrics.ContainerName, targetsVolumeOpts []volume.Option, opts ...handler.Option) (*handler.FanoutPool, error) { readonlyTargets, err := volume.NewTargetsFromFile(targetsVolumeOpts...) if err != nil { return nil, err } - client, err := pool.NewPubsubClient(ctx, projectID) + client, err := handler.NewPubsubClient(ctx, projectID) if err != nil { return nil, err } httpClient := _wireClientValue v := _wireValue - retryClient, err := pool.NewRetryClient(ctx, client, v...) + retryClient, err := handler.NewRetryClient(ctx, client, v...) if err != nil { return nil, err } @@ -33,7 +33,7 @@ func InitializeSyncPool(ctx context.Context, projectID pool.ProjectID, podName m if err != nil { return nil, err } - fanoutPool, err := pool.NewFanoutPool(readonlyTargets, client, httpClient, retryClient, deliveryReporter, opts...) + fanoutPool, err := handler.NewFanoutPool(readonlyTargets, client, httpClient, retryClient, deliveryReporter, opts...) if err != nil { return nil, err } @@ -41,6 +41,6 @@ func InitializeSyncPool(ctx context.Context, projectID pool.ProjectID, podName m } var ( - _wireClientValue = pool.DefaultHTTPClient - _wireValue = pool.DefaultCEClientOpts + _wireClientValue = handler.DefaultHTTPClient + _wireValue = handler.DefaultCEClientOpts ) diff --git a/cmd/broker/retry/main.go b/cmd/broker/retry/main.go index 1e34d426c3..45c8f0e90f 100644 --- a/cmd/broker/retry/main.go +++ b/cmd/broker/retry/main.go @@ -25,7 +25,7 @@ import ( "go.uber.org/zap" "github.com/google/knative-gcp/pkg/broker/config/volume" - "github.com/google/knative-gcp/pkg/broker/handler/pool" + "github.com/google/knative-gcp/pkg/broker/handler" metadataClient "github.com/google/knative-gcp/pkg/gclient/metadata" "github.com/google/knative-gcp/pkg/metrics" "github.com/google/knative-gcp/pkg/utils" @@ -75,7 +75,7 @@ func main() { syncSignal := poolSyncSignal(ctx, targetsUpdateCh) syncPool, err := InitializeSyncPool( ctx, - pool.ProjectID(projectID), + handler.ProjectID(projectID), metrics.PodName(env.PodName), metrics.ContainerName(component), []volume.Option{ @@ -87,7 +87,7 @@ func main() { if err != nil { logger.Fatal("Failed to get retry sync pool", zap.Error(err)) } - if _, err := pool.StartSyncPool(ctx, syncPool, syncSignal); err != nil { + if _, err := handler.StartSyncPool(ctx, syncPool, syncSignal); err != nil { logger.Fatal("Failed to start retry sync pool", zap.Error(err)) } @@ -116,7 +116,7 @@ func poolSyncSignal(ctx context.Context, targetsUpdateCh chan struct{}) chan str return ch } -func buildPoolOptions(env envConfig) []pool.Option { +func buildPoolOptions(env envConfig) []handler.Option { rs := pubsub.DefaultReceiveSettings // If Synchronous is true, then no more than MaxOutstandingMessages will be in memory at one time. // MaxOutstandingBytes still refers to the total bytes processed, rather than in memory. @@ -125,15 +125,15 @@ func buildPoolOptions(env envConfig) []pool.Option { rs.Synchronous = true rs.MaxOutstandingMessages = env.OutstandingMessagesPerSub rs.MaxOutstandingBytes = env.OutstandingBytesPerSub - var opts []pool.Option + var opts []handler.Option if env.HandlerConcurrency > 0 { - opts = append(opts, pool.WithHandlerConcurrency(env.HandlerConcurrency)) + opts = append(opts, handler.WithHandlerConcurrency(env.HandlerConcurrency)) rs.NumGoroutines = env.HandlerConcurrency } if env.TimeoutPerEvent > 0 { - opts = append(opts, pool.WithTimeoutPerEvent(env.TimeoutPerEvent)) + opts = append(opts, handler.WithTimeoutPerEvent(env.TimeoutPerEvent)) } - opts = append(opts, pool.WithPubsubReceiveSettings(rs)) + opts = append(opts, handler.WithPubsubReceiveSettings(rs)) // The default CeClient is good? return opts } diff --git a/cmd/broker/retry/wire.go b/cmd/broker/retry/wire.go index 3a089034c2..6cf68e6c41 100644 --- a/cmd/broker/retry/wire.go +++ b/cmd/broker/retry/wire.go @@ -22,7 +22,7 @@ import ( "context" "github.com/google/knative-gcp/pkg/broker/config/volume" - "github.com/google/knative-gcp/pkg/broker/handler/pool" + "github.com/google/knative-gcp/pkg/broker/handler" "github.com/google/knative-gcp/pkg/metrics" "github.com/google/wire" ) @@ -31,12 +31,12 @@ import ( // retry pool's pubsub client and uses targetsVolumeOpts to initialize the targets volume watcher. func InitializeSyncPool( ctx context.Context, - projectID pool.ProjectID, + projectID handler.ProjectID, podName metrics.PodName, containerName metrics.ContainerName, targetsVolumeOpts []volume.Option, - opts ...pool.Option) (*pool.RetryPool, error) { + opts ...handler.Option) (*handler.RetryPool, error) { // Implementation generated by wire. Providers for required RetryPool dependencies should be // added here. - panic(wire.Build(pool.ProviderSet, volume.NewTargetsFromFile, metrics.NewDeliveryReporter)) + panic(wire.Build(handler.ProviderSet, volume.NewTargetsFromFile, metrics.NewDeliveryReporter)) } diff --git a/cmd/broker/retry/wire_gen.go b/cmd/broker/retry/wire_gen.go index fc1dfb8c81..040fb55b16 100644 --- a/cmd/broker/retry/wire_gen.go +++ b/cmd/broker/retry/wire_gen.go @@ -8,18 +8,18 @@ package main import ( "context" "github.com/google/knative-gcp/pkg/broker/config/volume" - "github.com/google/knative-gcp/pkg/broker/handler/pool" + "github.com/google/knative-gcp/pkg/broker/handler" "github.com/google/knative-gcp/pkg/metrics" ) // Injectors from wire.go: -func InitializeSyncPool(ctx context.Context, projectID pool.ProjectID, podName metrics.PodName, containerName metrics.ContainerName, targetsVolumeOpts []volume.Option, opts ...pool.Option) (*pool.RetryPool, error) { +func InitializeSyncPool(ctx context.Context, projectID handler.ProjectID, podName metrics.PodName, containerName metrics.ContainerName, targetsVolumeOpts []volume.Option, opts ...handler.Option) (*handler.RetryPool, error) { readonlyTargets, err := volume.NewTargetsFromFile(targetsVolumeOpts...) if err != nil { return nil, err } - client, err := pool.NewPubsubClient(ctx, projectID) + client, err := handler.NewPubsubClient(ctx, projectID) if err != nil { return nil, err } @@ -28,7 +28,7 @@ func InitializeSyncPool(ctx context.Context, projectID pool.ProjectID, podName m if err != nil { return nil, err } - retryPool, err := pool.NewRetryPool(readonlyTargets, client, httpClient, deliveryReporter, opts...) + retryPool, err := handler.NewRetryPool(readonlyTargets, client, httpClient, deliveryReporter, opts...) if err != nil { return nil, err } @@ -36,5 +36,5 @@ func InitializeSyncPool(ctx context.Context, projectID pool.ProjectID, podName m } var ( - _wireClientValue = pool.DefaultHTTPClient + _wireClientValue = handler.DefaultHTTPClient ) diff --git a/pkg/broker/handler/pool/fanout.go b/pkg/broker/handler/fanout.go similarity index 86% rename from pkg/broker/handler/pool/fanout.go rename to pkg/broker/handler/fanout.go index a126e94692..19453e48d3 100644 --- a/pkg/broker/handler/pool/fanout.go +++ b/pkg/broker/handler/fanout.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package pool +package handler import ( "context" @@ -26,11 +26,9 @@ import ( "cloud.google.com/go/pubsub" ceclient "github.com/cloudevents/sdk-go/v2/client" "go.uber.org/zap" - "k8s.io/client-go/util/workqueue" "knative.dev/eventing/pkg/logging" "github.com/google/knative-gcp/pkg/broker/config" - "github.com/google/knative-gcp/pkg/broker/handler" handlerctx "github.com/google/knative-gcp/pkg/broker/handler/context" "github.com/google/knative-gcp/pkg/broker/handler/processors" "github.com/google/knative-gcp/pkg/broker/handler/processors/deliver" @@ -60,7 +58,7 @@ type FanoutPool struct { } type fanoutHandlerCache struct { - handler.Handler + Handler b *config.Broker } @@ -156,25 +154,26 @@ func (p *FanoutPool) SyncOnce(ctx context.Context) error { sub := p.pubsubClient.Subscription(b.DecoupleQueue.Subscription) sub.ReceiveSettings = p.options.PubsubReceiveSettings + h := NewHandler( + sub, + processors.ChainProcessors( + &fanout.Processor{MaxConcurrency: p.options.MaxConcurrencyPerEvent, Targets: p.targets}, + &filter.Processor{Targets: p.targets}, + &deliver.Processor{ + DeliverClient: p.deliverClient, + Targets: p.targets, + RetryOnFailure: true, + DeliverRetryClient: p.deliverRetryClient, + DeliverTimeout: p.options.DeliveryTimeout, + StatsReporter: p.statsReporter, + }, + ), + p.options.TimeoutPerEvent, + p.options.RetryPolicy, + ) hc := &fanoutHandlerCache{ - Handler: handler.Handler{ - Timeout: p.options.TimeoutPerEvent, - Subscription: sub, - RetryLimiter: workqueue.NewItemExponentialFailureRateLimiter(p.options.RetryPolicy.MinBackoff, p.options.RetryPolicy.MaxBackoff), - Processor: processors.ChainProcessors( - &fanout.Processor{MaxConcurrency: p.options.MaxConcurrencyPerEvent, Targets: p.targets}, - &filter.Processor{Targets: p.targets}, - &deliver.Processor{ - DeliverClient: p.deliverClient, - Targets: p.targets, - RetryOnFailure: true, - DeliverRetryClient: p.deliverRetryClient, - DeliverTimeout: p.options.DeliveryTimeout, - StatsReporter: p.statsReporter, - }, - ), - }, - b: b, + Handler: *h, + b: b, } // Start the handler with broker key in context. diff --git a/pkg/broker/handler/pool/fanout_test.go b/pkg/broker/handler/fanout_test.go similarity index 99% rename from pkg/broker/handler/pool/fanout_test.go rename to pkg/broker/handler/fanout_test.go index f5749814c5..39cb556bb2 100644 --- a/pkg/broker/handler/pool/fanout_test.go +++ b/pkg/broker/handler/fanout_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package pool +package handler import ( "context" @@ -28,7 +28,7 @@ import ( "github.com/google/knative-gcp/pkg/broker/config" "github.com/google/knative-gcp/pkg/broker/eventutil" - pooltesting "github.com/google/knative-gcp/pkg/broker/handler/pool/testing" + pooltesting "github.com/google/knative-gcp/pkg/broker/handler/testing" reportertest "github.com/google/knative-gcp/pkg/metrics/testing" _ "knative.dev/pkg/metrics/testing" diff --git a/pkg/broker/handler/handler.go b/pkg/broker/handler/handler.go index 9cf7a6587f..7aeabe1600 100644 --- a/pkg/broker/handler/handler.go +++ b/pkg/broker/handler/handler.go @@ -44,16 +44,31 @@ type Handler struct { // Timeout is the timeout for processing each individual event. Timeout time.Duration - // RetryLimiter limits how fast to retry failed events. - RetryLimiter workqueue.RateLimiter - - DelayNack func(time.Duration) - + // retryLimiter limits how fast to retry failed events. + retryLimiter workqueue.RateLimiter + // delayNack defaults to time.Sleep; could be overriden in test. + delayNack func(time.Duration) // cancel is function to stop pulling messages. cancel context.CancelFunc alive atomic.Value } +// NewHandler creates a new Handler. +func NewHandler( + sub *pubsub.Subscription, + processor processors.Interface, + timeout time.Duration, + retryPolicy RetryPolicy, +) *Handler { + return &Handler{ + Subscription: sub, + Processor: processor, + Timeout: timeout, + retryLimiter: workqueue.NewItemExponentialFailureRateLimiter(retryPolicy.MinBackoff, retryPolicy.MaxBackoff), + delayNack: time.Sleep, + } +} + // Start starts the handler. // done func will be called if the pubsub inbound is closed. func (h *Handler) Start(ctx context.Context, done func(error)) { @@ -92,13 +107,13 @@ func (h *Handler) receive(ctx context.Context, msg *pubsub.Message) { defer cancel() } if err := h.Processor.Process(ctx, event); err != nil { - backoffPeriod := h.RetryLimiter.When(msg.ID) + backoffPeriod := h.retryLimiter.When(msg.ID) logging.FromContext(ctx).Error("failed to process event; backoff nack", zap.Any("event", event), zap.Float64("backoffPeriod", backoffPeriod.Seconds()), zap.Error(err)) - h.DelayNack(backoffPeriod) + h.delayNack(backoffPeriod) msg.Nack() return } - h.RetryLimiter.Forget(msg.ID) + h.retryLimiter.Forget(msg.ID) msg.Ack() } diff --git a/pkg/broker/handler/handler_test.go b/pkg/broker/handler/handler_test.go index 0b77b93540..9d34585160 100644 --- a/pkg/broker/handler/handler_test.go +++ b/pkg/broker/handler/handler_test.go @@ -30,7 +30,6 @@ import ( "github.com/google/go-cmp/cmp" "google.golang.org/api/option" "google.golang.org/grpc" - "k8s.io/client-go/util/workqueue" "github.com/google/knative-gcp/pkg/broker/handler/processors" ) @@ -80,13 +79,7 @@ func TestHandler(t *testing.T) { eventCh := make(chan *event.Event) processor := &processors.FakeProcessor{PrevEventsCh: eventCh} - h := &Handler{ - Subscription: sub, - Processor: processor, - Timeout: time.Second, - RetryLimiter: workqueue.NewItemExponentialFailureRateLimiter(time.Duration(0), time.Duration(0)), - DelayNack: time.Sleep, - } + h := NewHandler(sub, processor, time.Second, RetryPolicy{}) h.Start(ctx, func(err error) {}) defer h.Stop() if !h.IsAlive() { @@ -159,7 +152,7 @@ func (p *alwaysErrProc) Process(_ context.Context, _ *event.Event) error { } func TestRetryBackoff(t *testing.T) { - ctx := context.Background() + ctx, cancel := context.WithCancel(context.Background()) c, close := testPubsubClient(ctx, t, "test-project") defer close() @@ -184,18 +177,12 @@ func TestRetryBackoff(t *testing.T) { } delays := []time.Duration{} - delayNack := func(d time.Duration) { - delays = append(delays, d) - } - desiredErrCount := 8 processor := &alwaysErrProc{desiredErrCount: desiredErrCount} - h := &Handler{ - Subscription: sub, - Processor: processor, - Timeout: time.Second, - RetryLimiter: workqueue.NewItemExponentialFailureRateLimiter(time.Millisecond, 16*time.Millisecond), - DelayNack: delayNack, + h := NewHandler(sub, processor, time.Second, RetryPolicy{MinBackoff: time.Millisecond, MaxBackoff: 16 * time.Millisecond}) + // Mock sleep func to collect nack backoffs. + h.delayNack = func(d time.Duration) { + delays = append(delays, d) } h.Start(ctx, func(err error) {}) defer h.Stop() @@ -213,21 +200,25 @@ func TestRetryBackoff(t *testing.T) { t.Errorf("failed to seed event to pubsub: %v", err) } + // Wait until all desired errors were returned. + // Then stop the handler by cancel the context. time.Sleep(time.Second) + cancel() if len(delays) != desiredErrCount { - t.Errorf("retried times got=%d, want=%d", len(delays), desiredErrCount) + t.Errorf("retry count got=%d, want=%d", len(delays), desiredErrCount) } if delays[0] != time.Millisecond { t.Errorf("initial nack delay got=%v, want=%v", delays[0], time.Millisecond) } + // We expect exponential backoff until MaxBackoff for i := 1; i < len(delays); i++ { wantDelay := 2 * delays[i-1] if wantDelay > 16*time.Millisecond { wantDelay = 16 * time.Millisecond } if delays[i] != wantDelay { - t.Errorf("delay #%d got=%v, want=%v", i+1, delays[i], wantDelay) + t.Errorf("delays[%d] got=%v, want=%v", i, delays[i], wantDelay) } } } diff --git a/pkg/broker/handler/pool/options.go b/pkg/broker/handler/options.go similarity index 99% rename from pkg/broker/handler/pool/options.go rename to pkg/broker/handler/options.go index 0e04f6e10f..c8328553a4 100644 --- a/pkg/broker/handler/pool/options.go +++ b/pkg/broker/handler/options.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package pool +package handler import ( "runtime" diff --git a/pkg/broker/handler/pool/options_test.go b/pkg/broker/handler/options_test.go similarity index 99% rename from pkg/broker/handler/pool/options_test.go rename to pkg/broker/handler/options_test.go index dcf4213ce9..225b4c2e36 100644 --- a/pkg/broker/handler/pool/options_test.go +++ b/pkg/broker/handler/options_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package pool +package handler import ( "testing" diff --git a/pkg/broker/handler/pool/pool.go b/pkg/broker/handler/pool.go similarity index 98% rename from pkg/broker/handler/pool/pool.go rename to pkg/broker/handler/pool.go index 2d4822d197..e57dec54cb 100644 --- a/pkg/broker/handler/pool/pool.go +++ b/pkg/broker/handler/pool.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package pool +package handler import ( "context" diff --git a/pkg/broker/handler/pool/pool_test.go b/pkg/broker/handler/pool_test.go similarity index 99% rename from pkg/broker/handler/pool/pool_test.go rename to pkg/broker/handler/pool_test.go index a0d4254e3a..483dc44d96 100644 --- a/pkg/broker/handler/pool/pool_test.go +++ b/pkg/broker/handler/pool_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package pool +package handler import ( "context" diff --git a/pkg/broker/handler/pool/providers.go b/pkg/broker/handler/providers.go similarity index 99% rename from pkg/broker/handler/pool/providers.go rename to pkg/broker/handler/providers.go index 9a82dfc03e..02c566ce55 100644 --- a/pkg/broker/handler/pool/providers.go +++ b/pkg/broker/handler/providers.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package pool +package handler import ( "context" diff --git a/pkg/broker/handler/pool/retry.go b/pkg/broker/handler/retry.go similarity index 89% rename from pkg/broker/handler/pool/retry.go rename to pkg/broker/handler/retry.go index bb9eea8584..92f1e2c2e0 100644 --- a/pkg/broker/handler/pool/retry.go +++ b/pkg/broker/handler/retry.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package pool +package handler import ( "context" @@ -23,13 +23,11 @@ import ( "sync" "go.uber.org/zap" - "k8s.io/client-go/util/workqueue" "knative.dev/eventing/pkg/logging" "cloud.google.com/go/pubsub" "github.com/google/knative-gcp/pkg/broker/config" - "github.com/google/knative-gcp/pkg/broker/handler" handlerctx "github.com/google/knative-gcp/pkg/broker/handler/context" "github.com/google/knative-gcp/pkg/broker/handler/processors" "github.com/google/knative-gcp/pkg/broker/handler/processors/deliver" @@ -54,7 +52,7 @@ type RetryPool struct { } type retryHandlerCache struct { - handler.Handler + Handler t *config.Target } @@ -136,21 +134,22 @@ func (p *RetryPool) SyncOnce(ctx context.Context) error { sub := p.pubsubClient.Subscription(t.RetryQueue.Subscription) sub.ReceiveSettings = p.options.PubsubReceiveSettings + h := NewHandler( + sub, + processors.ChainProcessors( + &filter.Processor{Targets: p.targets}, + &deliver.Processor{ + DeliverClient: p.deliverClient, + Targets: p.targets, + StatsReporter: p.statsReporter, + }, + ), + p.options.TimeoutPerEvent, + p.options.RetryPolicy, + ) hc := &retryHandlerCache{ - Handler: handler.Handler{ - Timeout: p.options.TimeoutPerEvent, - Subscription: sub, - RetryLimiter: workqueue.NewItemExponentialFailureRateLimiter(p.options.RetryPolicy.MinBackoff, p.options.RetryPolicy.MaxBackoff), - Processor: processors.ChainProcessors( - &filter.Processor{Targets: p.targets}, - &deliver.Processor{ - DeliverClient: p.deliverClient, - Targets: p.targets, - StatsReporter: p.statsReporter, - }, - ), - }, - t: t, + Handler: *h, + t: t, } ctx, err := metrics.AddTargetTags(ctx, t) diff --git a/pkg/broker/handler/pool/retry_test.go b/pkg/broker/handler/retry_test.go similarity index 98% rename from pkg/broker/handler/pool/retry_test.go rename to pkg/broker/handler/retry_test.go index 36f5350fbc..e8a82c00fd 100644 --- a/pkg/broker/handler/pool/retry_test.go +++ b/pkg/broker/handler/retry_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package pool +package handler import ( "context" @@ -27,7 +27,7 @@ import ( "github.com/google/knative-gcp/pkg/broker/config" "github.com/google/knative-gcp/pkg/broker/eventutil" - pooltesting "github.com/google/knative-gcp/pkg/broker/handler/pool/testing" + handlertesting "github.com/google/knative-gcp/pkg/broker/handler/testing" reportertest "github.com/google/knative-gcp/pkg/metrics/testing" _ "knative.dev/pkg/metrics/testing" @@ -43,7 +43,7 @@ func TestRetryWatchAndSync(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() testProject := "test-project" - helper, err := pooltesting.NewHelper(ctx, testProject) + helper, err := handlertesting.NewHelper(ctx, testProject) if err != nil { t.Fatalf("failed to create pool testing helper: %v", err) } @@ -122,7 +122,7 @@ func TestRetrySyncPoolE2E(t *testing.T) { defer cancel() testProject := "test-project" - helper, err := pooltesting.NewHelper(ctx, testProject) + helper, err := handlertesting.NewHelper(ctx, testProject) if err != nil { t.Fatalf("failed to create pool testing helper: %v", err) } diff --git a/pkg/broker/handler/pool/testing/helper.go b/pkg/broker/handler/testing/helper.go similarity index 100% rename from pkg/broker/handler/pool/testing/helper.go rename to pkg/broker/handler/testing/helper.go diff --git a/pkg/broker/handler/pool/wire.go b/pkg/broker/handler/wire.go similarity index 98% rename from pkg/broker/handler/pool/wire.go rename to pkg/broker/handler/wire.go index 4fb28e3380..d0aa1db2e4 100644 --- a/pkg/broker/handler/pool/wire.go +++ b/pkg/broker/handler/wire.go @@ -16,7 +16,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package pool +package handler import ( "context" diff --git a/pkg/broker/handler/pool/wire_gen.go b/pkg/broker/handler/wire_gen.go similarity index 98% rename from pkg/broker/handler/pool/wire_gen.go rename to pkg/broker/handler/wire_gen.go index 4bc0f81609..6c2f92197a 100644 --- a/pkg/broker/handler/pool/wire_gen.go +++ b/pkg/broker/handler/wire_gen.go @@ -3,7 +3,7 @@ //go:generate wire //+build !wireinject -package pool +package handler import ( "cloud.google.com/go/pubsub" From 8ae23159ef391351d29777880ae34eb3944c60ef Mon Sep 17 00:00:00 2001 From: Chen Shou Date: Thu, 4 Jun 2020 18:58:56 +0000 Subject: [PATCH 3/5] resolve conflicts --- cmd/broker/fanout/main.go | 2 +- cmd/broker/retry/main.go | 2 +- pkg/reconciler/brokercell/resources/deployments.go | 10 +++++----- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/cmd/broker/fanout/main.go b/cmd/broker/fanout/main.go index 708908569e..e97a9a3c46 100644 --- a/cmd/broker/fanout/main.go +++ b/cmd/broker/fanout/main.go @@ -92,7 +92,7 @@ func main() { if err != nil { logger.Fatal("Failed to create fanout sync pool", zap.Error(err)) } - if _, err := handler.StartSyncPool(ctx, syncPool, syncSignal, env.MaxStaleDuration, pool.DefaultHealthCheckPort); err != nil { + if _, err := handler.StartSyncPool(ctx, syncPool, syncSignal, env.MaxStaleDuration, handler.DefaultHealthCheckPort); err != nil { logger.Fatalw("Failed to start fanout sync pool", zap.Error(err)) } diff --git a/cmd/broker/retry/main.go b/cmd/broker/retry/main.go index 31c73f6edf..c0d01e77b3 100644 --- a/cmd/broker/retry/main.go +++ b/cmd/broker/retry/main.go @@ -97,7 +97,7 @@ func main() { if err != nil { logger.Fatal("Failed to get retry sync pool", zap.Error(err)) } - if _, err := handler.StartSyncPool(ctx, syncPool, syncSignal, env.MaxStaleDuration, pool.DefaultHealthCheckPort); err != nil { + if _, err := handler.StartSyncPool(ctx, syncPool, syncSignal, env.MaxStaleDuration, handler.DefaultHealthCheckPort); err != nil { logger.Fatal("Failed to start retry sync pool", zap.Error(err)) } diff --git a/pkg/reconciler/brokercell/resources/deployments.go b/pkg/reconciler/brokercell/resources/deployments.go index 37d9b6875a..79b297c80e 100644 --- a/pkg/reconciler/brokercell/resources/deployments.go +++ b/pkg/reconciler/brokercell/resources/deployments.go @@ -19,7 +19,7 @@ package resources import ( "strconv" - "github.com/google/knative-gcp/pkg/broker/handler/pool" + "github.com/google/knative-gcp/pkg/broker/handler" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -77,14 +77,14 @@ func MakeFanoutDeployment(args FanoutArgs) *appsv1.Deployment { container.Ports = append(container.Ports, corev1.ContainerPort{ Name: "http-health", - ContainerPort: pool.DefaultHealthCheckPort, + ContainerPort: handler.DefaultHealthCheckPort, }, ) container.LivenessProbe = &corev1.Probe{ Handler: corev1.Handler{ HTTPGet: &corev1.HTTPGetAction{ Path: "/healthz", - Port: intstr.FromInt(pool.DefaultHealthCheckPort), + Port: intstr.FromInt(handler.DefaultHealthCheckPort), Scheme: corev1.URISchemeHTTP, }, }, @@ -112,14 +112,14 @@ func MakeRetryDeployment(args RetryArgs) *appsv1.Deployment { container.Ports = append(container.Ports, corev1.ContainerPort{ Name: "http-health", - ContainerPort: pool.DefaultHealthCheckPort, + ContainerPort: handler.DefaultHealthCheckPort, }, ) container.LivenessProbe = &corev1.Probe{ Handler: corev1.Handler{ HTTPGet: &corev1.HTTPGetAction{ Path: "/healthz", - Port: intstr.FromInt(pool.DefaultHealthCheckPort), + Port: intstr.FromInt(handler.DefaultHealthCheckPort), Scheme: corev1.URISchemeHTTP, }, }, From 9f59c15f0f6f1c0f551cb5dd33e634cda7617a52 Mon Sep 17 00:00:00 2001 From: Chen Shou Date: Mon, 8 Jun 2020 16:55:35 +0000 Subject: [PATCH 4/5] comments --- cmd/broker/fanout/main.go | 4 ++-- cmd/broker/retry/main.go | 11 +++++++++-- pkg/broker/handler/fanout_test.go | 6 +++--- pkg/broker/handler/handler.go | 2 +- pkg/broker/handler/handler_test.go | 18 ++++++++++++------ 5 files changed, 27 insertions(+), 14 deletions(-) diff --git a/cmd/broker/fanout/main.go b/cmd/broker/fanout/main.go index 312fc6fec2..5432f260d4 100644 --- a/cmd/broker/fanout/main.go +++ b/cmd/broker/fanout/main.go @@ -88,7 +88,7 @@ func main() { volume.WithPath(env.TargetsConfigPath), volume.WithNotifyChan(targetsUpdateCh), }, - buildPoolOptions(env)..., + buildHandlerOptions(env)..., ) if err != nil { logger.Fatal("Failed to create fanout sync pool", zap.Error(err)) @@ -124,7 +124,7 @@ func poolSyncSignal(ctx context.Context, targetsUpdateCh chan struct{}) chan str return ch } -func buildPoolOptions(env envConfig) []handler.Option { +func buildHandlerOptions(env envConfig) []handler.Option { rs := pubsub.DefaultReceiveSettings var opts []handler.Option if env.HandlerConcurrency > 0 { diff --git a/cmd/broker/retry/main.go b/cmd/broker/retry/main.go index 99a24b3d4b..86d79a2d8a 100644 --- a/cmd/broker/retry/main.go +++ b/cmd/broker/retry/main.go @@ -58,6 +58,9 @@ type envConfig struct { // Max to 10m. TimeoutPerEvent time.Duration `envconfig:"TIMEOUT_PER_EVENT"` + + MinRetryBackoff time.Duration `envconfig:"MIN_RETRY_BACKOFF" default:"1s"` + MaxRetryBackoff time.Duration `envconfig:"MAX_RETRY_BACKOFF" default:"30s"` } func main() { @@ -93,7 +96,7 @@ func main() { volume.WithPath(env.TargetsConfigPath), volume.WithNotifyChan(targetsUpdateCh), }, - buildPoolOptions(env)..., + buildHandlerOptions(env)..., ) if err != nil { logger.Fatal("Failed to get retry sync pool", zap.Error(err)) @@ -127,7 +130,7 @@ func poolSyncSignal(ctx context.Context, targetsUpdateCh chan struct{}) chan str return ch } -func buildPoolOptions(env envConfig) []handler.Option { +func buildHandlerOptions(env envConfig) []handler.Option { rs := pubsub.DefaultReceiveSettings // If Synchronous is true, then no more than MaxOutstandingMessages will be in memory at one time. // MaxOutstandingBytes still refers to the total bytes processed, rather than in memory. @@ -144,6 +147,10 @@ func buildPoolOptions(env envConfig) []handler.Option { if env.TimeoutPerEvent > 0 { opts = append(opts, handler.WithTimeoutPerEvent(env.TimeoutPerEvent)) } + opts = append(opts, handler.WithRetryPolicy(handler.RetryPolicy{ + MinBackoff: env.MinRetryBackoff, + MaxBackoff: env.MaxRetryBackoff, + })) opts = append(opts, handler.WithPubsubReceiveSettings(rs)) // The default CeClient is good? return opts diff --git a/pkg/broker/handler/fanout_test.go b/pkg/broker/handler/fanout_test.go index c896c5d995..fc19c65f38 100644 --- a/pkg/broker/handler/fanout_test.go +++ b/pkg/broker/handler/fanout_test.go @@ -28,7 +28,7 @@ import ( "github.com/google/knative-gcp/pkg/broker/config" "github.com/google/knative-gcp/pkg/broker/eventutil" - pooltesting "github.com/google/knative-gcp/pkg/broker/handler/testing" + handlertesting "github.com/google/knative-gcp/pkg/broker/handler/testing" reportertest "github.com/google/knative-gcp/pkg/metrics/testing" _ "knative.dev/pkg/metrics/testing" @@ -44,7 +44,7 @@ func TestFanoutWatchAndSync(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() testProject := "test-project" - helper, err := pooltesting.NewHelper(ctx, testProject) + helper, err := handlertesting.NewHelper(ctx, testProject) if err != nil { t.Fatalf("failed to create pool testing helper: %v", err) } @@ -123,7 +123,7 @@ func TestFanoutSyncPoolE2E(t *testing.T) { defer cancel() testProject := "test-project" - helper, err := pooltesting.NewHelper(ctx, testProject) + helper, err := handlertesting.NewHelper(ctx, testProject) if err != nil { t.Fatalf("failed to create pool testing helper: %v", err) } diff --git a/pkg/broker/handler/handler.go b/pkg/broker/handler/handler.go index 987ae46d1d..3cee1ffdae 100644 --- a/pkg/broker/handler/handler.go +++ b/pkg/broker/handler/handler.go @@ -116,7 +116,7 @@ func (h *Handler) receive(ctx context.Context, msg *pubsub.Message) { } if err := h.Processor.Process(ctx, event); err != nil { backoffPeriod := h.retryLimiter.When(msg.ID) - logging.FromContext(ctx).Error("failed to process event; backoff nack", zap.Any("event", event), zap.Float64("backoffPeriod", backoffPeriod.Seconds()), zap.Error(err)) + logging.FromContext(ctx).Error("failed to process event; backoff nack", zap.String("eventID", event.ID()), zap.Duration("backoffPeriod", backoffPeriod), zap.Error(err)) h.delayNack(backoffPeriod) msg.Nack() return diff --git a/pkg/broker/handler/handler_test.go b/pkg/broker/handler/handler_test.go index 7eb441f514..b82c643396 100644 --- a/pkg/broker/handler/handler_test.go +++ b/pkg/broker/handler/handler_test.go @@ -157,16 +157,18 @@ func TestHandler(t *testing.T) { }) } -type alwaysErrProc struct { +type firstNErrProc struct { processors.BaseProcessor desiredErrCount, currErrCount int + successSignal chan struct{} } -func (p *alwaysErrProc) Process(_ context.Context, _ *event.Event) error { +func (p *firstNErrProc) Process(_ context.Context, _ *event.Event) error { if p.currErrCount < p.desiredErrCount { p.currErrCount++ return errors.New("always error") } + p.successSignal <- struct{}{} return nil } @@ -197,7 +199,11 @@ func TestRetryBackoff(t *testing.T) { delays := []time.Duration{} desiredErrCount := 8 - processor := &alwaysErrProc{desiredErrCount: desiredErrCount} + successSignal := make(chan struct{}) + processor := &firstNErrProc{ + desiredErrCount: desiredErrCount, + successSignal: successSignal, + } h := NewHandler(sub, processor, time.Second, RetryPolicy{MinBackoff: time.Millisecond, MaxBackoff: 16 * time.Millisecond}) // Mock sleep func to collect nack backoffs. h.delayNack = func(d time.Duration) { @@ -206,7 +212,7 @@ func TestRetryBackoff(t *testing.T) { h.Start(ctx, func(err error) {}) defer h.Stop() if !h.IsAlive() { - t.Error("start handler didn't bring it alive") + t.Fatal("start handler didn't bring it alive") } testEvent := event.New() @@ -216,12 +222,12 @@ func TestRetryBackoff(t *testing.T) { testEvent.SetType("type") if err := p.Send(ctx, binding.ToMessage(&testEvent)); err != nil { - t.Errorf("failed to seed event to pubsub: %v", err) + t.Fatalf("failed to seed event to pubsub: %v", err) } // Wait until all desired errors were returned. // Then stop the handler by cancel the context. - time.Sleep(time.Second) + <-successSignal cancel() if len(delays) != desiredErrCount { From 27b559ea32a8b23edb604bc5d9335f1cd80a99ba Mon Sep 17 00:00:00 2001 From: Chen Shou Date: Mon, 8 Jun 2020 20:11:04 +0000 Subject: [PATCH 5/5] to 1m --- cmd/broker/retry/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/broker/retry/main.go b/cmd/broker/retry/main.go index 86d79a2d8a..21e6820a8e 100644 --- a/cmd/broker/retry/main.go +++ b/cmd/broker/retry/main.go @@ -60,7 +60,7 @@ type envConfig struct { TimeoutPerEvent time.Duration `envconfig:"TIMEOUT_PER_EVENT"` MinRetryBackoff time.Duration `envconfig:"MIN_RETRY_BACKOFF" default:"1s"` - MaxRetryBackoff time.Duration `envconfig:"MAX_RETRY_BACKOFF" default:"30s"` + MaxRetryBackoff time.Duration `envconfig:"MAX_RETRY_BACKOFF" default:"1m"` } func main() {