From 9d5a37812accf5a5c90940032d45c0736ae7130c Mon Sep 17 00:00:00 2001 From: Ignacio Cano Date: Wed, 6 Feb 2019 15:54:40 -0800 Subject: [PATCH] Adding RateLimiter to GCP PubSub channel to backoff retries on dispatch failures (#775) * Adding an exponential backoff rate limiter for retrying nack-ed messages. * Desugar-ing logs. Updating comments. * Updating autogen code * Updates after review comments. Adding UTs for retry mechanism. * Starting loop from 0 * Adding comment back. Loop should start from 1. --- Gopkg.lock | 1 + .../pkg/dispatcher/dispatcher/controller.go | 18 ++- .../pkg/dispatcher/dispatcher/reconcile.go | 29 +++-- .../dispatcher/dispatcher/reconcile_test.go | 119 ++++++++++++++---- 4 files changed, 137 insertions(+), 30 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 01e940952d4..d1b5989a639 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1292,6 +1292,7 @@ "k8s.io/client-go/tools/clientcmd", "k8s.io/client-go/tools/record", "k8s.io/client-go/util/flowcontrol", + "k8s.io/client-go/util/workqueue", "k8s.io/code-generator/cmd/client-gen", "k8s.io/code-generator/cmd/deepcopy-gen", "k8s.io/code-generator/cmd/defaulter-gen", diff --git a/contrib/gcppubsub/pkg/dispatcher/dispatcher/controller.go b/contrib/gcppubsub/pkg/dispatcher/dispatcher/controller.go index 756f7123731..1e70c0a7290 100644 --- a/contrib/gcppubsub/pkg/dispatcher/dispatcher/controller.go +++ b/contrib/gcppubsub/pkg/dispatcher/dispatcher/controller.go @@ -19,12 +19,15 @@ package dispatcher import ( "context" "sync" + "time" + + "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/event" + pubsubutil "github.com/knative/eventing/contrib/gcppubsub/pkg/util" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/provisioners" - pubsubutil "github.com/knative/eventing/contrib/gcppubsub/pkg/util" "go.uber.org/zap" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -36,11 +39,20 @@ const ( // controllerAgentName is the string used by this controller to identify itself when creating // events. controllerAgentName = "gcp-pubsub-channel-dispatcher" + + // Exponential backoff constants used to limit the pace at which + // we nack messages in order to throttle the channel retries. + // The backoff is computed as follows: min(expBackoffMaxDelay, expBackoffBaseDelay* 2^#failures). + // expBackoffMaxDelay should be less than subscription.ReceiveSettings.MaxExtension, + // which is the maximum period for which the Subscription extends the ack deadline + // for each message. It is currently set to 10 minutes. + expBackoffBaseDelay = 1 * time.Second + expBackoffMaxDelay = 5 * time.Minute ) // New returns a Controller that represents the dispatcher portion (messages from GCP PubSub are // sent into the cluster) of the GCP PubSub dispatcher. We use a reconcile loop to watch all -// Channels and notice changes to them. +// Channels and notice changes to them. It uses an exponential backoff to throttle the retries. func New(mgr manager.Manager, logger *zap.Logger, stopCh <-chan struct{}) (controller.Controller, error) { // reconcileChan is used when the dispatcher itself needs to force reconciliation of a Channel. reconcileChan := make(chan event.GenericEvent) @@ -58,6 +70,8 @@ func New(mgr manager.Manager, logger *zap.Logger, stopCh <-chan struct{}) (contr subscriptionsLock: sync.Mutex{}, subscriptions: map[channelName]map[subscriptionName]context.CancelFunc{}, + + rateLimiter: workqueue.NewItemExponentialFailureRateLimiter(expBackoffBaseDelay, expBackoffMaxDelay), } c, err := controller.New(controllerAgentName, mgr, controller.Options{ diff --git a/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile.go b/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile.go index 1aa7e47bf0a..c4591b380c8 100644 --- a/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile.go +++ b/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile.go @@ -20,13 +20,16 @@ import ( "context" "errors" "sync" + "time" + + "k8s.io/client-go/util/workqueue" - eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "github.com/knative/eventing/pkg/provisioners" - util "github.com/knative/eventing/pkg/provisioners" ccpcontroller "github.com/knative/eventing/contrib/gcppubsub/pkg/controller/clusterchannelprovisioner" pubsubutil "github.com/knative/eventing/contrib/gcppubsub/pkg/util" "github.com/knative/eventing/contrib/gcppubsub/pkg/util/logging" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/provisioners" + util "github.com/knative/eventing/pkg/provisioners" "go.uber.org/zap" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" @@ -63,6 +66,9 @@ type reconciler struct { // function must be called when we no longer want that subscription to be active. Logically it // is a map from Channel name to Subscription name to CancelFunc. subscriptions map[channelName]map[subscriptionName]context.CancelFunc + + // rateLimiter is used to limit the pace at which we nack a message when it could not be dispatched. + rateLimiter workqueue.RateLimiter } // Verify the struct implements reconcile.Reconciler @@ -290,7 +296,7 @@ func (r *reconciler) receiveMessagesBlocking(ctxWithCancel context.Context, enqu logging.FromContext(ctxWithCancel).Info("subscription.Receive start") receiveErr := subscription.Receive( ctxWithCancel, - receiveFunc(logging.FromContext(ctxWithCancel).Sugar(), sub, defaults, r.dispatcher)) + receiveFunc(logging.FromContext(ctxWithCancel).Sugar(), sub, defaults, r.dispatcher, r.rateLimiter, time.Sleep)) // We want to minimize holding the lock. r.reconcileChan may block, so definitely do not do // it under lock. But, to prevent a race condition, we must delete from r.subscriptions // before using r.reconcileChan. @@ -312,7 +318,7 @@ func (r *reconciler) receiveMessagesBlocking(ctxWithCancel context.Context, enqu } } -func receiveFunc(logger *zap.SugaredLogger, sub pubsubutil.GcpPubSubSubscriptionStatus, defaults provisioners.DispatchDefaults, dispatcher provisioners.Dispatcher) func(context.Context, pubsubutil.PubSubMessage) { +func receiveFunc(logger *zap.SugaredLogger, sub pubsubutil.GcpPubSubSubscriptionStatus, defaults provisioners.DispatchDefaults, dispatcher provisioners.Dispatcher, rateLimiter workqueue.RateLimiter, waitFunc func(duration time.Duration)) func(context.Context, pubsubutil.PubSubMessage) { return func(ctx context.Context, msg pubsubutil.PubSubMessage) { message := &provisioners.Message{ Headers: msg.Attributes(), @@ -320,10 +326,19 @@ func receiveFunc(logger *zap.SugaredLogger, sub pubsubutil.GcpPubSubSubscription } err := dispatcher.DispatchMessage(message, sub.SubscriberURI, sub.ReplyURI, defaults) if err != nil { - logger.Error("Message dispatch failed", zap.Error(err), zap.String("pubSubMessageId", msg.ID())) + // Compute the wait time to nack this message. + // As soon as we nack a message, the GcpPubSub channel will attempt the retry. + // We use this as a mechanism to backoff retries. + sleepDuration := rateLimiter.When(msg.ID()) + // Blocking, might need to run this on a separate go routine to improve throughput. + logger.Desugar().Error("Message dispatch failed, waiting to nack", zap.Error(err), zap.String("pubSubMessageId", msg.ID()), zap.Float64("backoffSec", sleepDuration.Seconds())) + waitFunc(sleepDuration) msg.Nack() } else { - logger.Debug("Message dispatch succeeded", zap.String("pubSubMessageId", msg.ID())) + // If there were any failures for this message, remove it from the rateLimiter backed map. + rateLimiter.Forget(msg.ID()) + // Acknowledge the dispatch. + logger.Desugar().Debug("Message dispatch succeeded", zap.String("pubSubMessageId", msg.ID())) msg.Ack() } } diff --git a/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile_test.go b/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile_test.go index c792d021e49..3ae7efd7dde 100644 --- a/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile_test.go +++ b/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile_test.go @@ -20,10 +20,14 @@ import ( "context" "errors" "fmt" + "math" + "reflect" "sync" "testing" "time" + "k8s.io/client-go/util/workqueue" + "github.com/knative/eventing/contrib/gcppubsub/pkg/util" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -389,15 +393,28 @@ func TestReconcile(t *testing.T) { func TestReceiveFunc(t *testing.T) { testCases := map[string]struct { - ack bool - dispatcherErr error + ack bool + dispatcherErr error + dispatchAttempts int }{ - "dispatch error": { - ack: false, - dispatcherErr: errors.New(testErrorMessage), + "dispatch error with few dispatch attempts": { + ack: false, + dispatcherErr: errors.New(testErrorMessage), + dispatchAttempts: 3, + }, + "dispatch error with capped backoff": { + ack: false, + dispatcherErr: errors.New(testErrorMessage), + dispatchAttempts: 20, }, - "dispatch success": { - ack: true, + "dispatch success on first attempt": { + ack: true, + dispatchAttempts: 1, + }, + "dispatch success after many failed attempts": { + ack: true, + dispatcherErr: errors.New(testErrorMessage), + dispatchAttempts: 10, }, } for n, tc := range testCases { @@ -410,21 +427,42 @@ func TestReceiveFunc(t *testing.T) { defaults := provisioners.DispatchDefaults{ Namespace: cNamespace, } - rf := receiveFunc(zap.NewNop().Sugar(), sub, defaults, &fakeDispatcher{err: tc.dispatcherErr}) - msg := fakepubsub.Message{} - rf(context.TODO(), &msg) - - if msg.MessageData.Ack && msg.MessageData.Nack { - t.Error("Message both Acked and Nacked") + dispatcher := &fakeDispatcher{ack: tc.ack, err: tc.dispatcherErr, errCounter: tc.dispatchAttempts} + rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(expBackoffBaseDelay, expBackoffMaxDelay) + waiter := fakeWaiter{make([]time.Duration, 0)} + rf := receiveFunc(zap.NewNop().Sugar(), sub, defaults, dispatcher, rateLimiter, waiter.sleep) + + // Starting from 1 instead of 0 in order to check when we should expect to ack a message. + for i := 1; i <= tc.dispatchAttempts; i++ { + msg := fakepubsub.Message{} + rf(context.TODO(), &msg) + + if msg.MessageData.Ack && msg.MessageData.Nack { + t.Errorf("Message both Acked and Nacked on attempt %d", i) + } + if tc.ack && i == tc.dispatchAttempts { + if !msg.MessageData.Ack { + t.Errorf("Message should have been Acked on attempt %d. It wasn't.", i) + } + if rateLimiter.NumRequeues(msg.ID()) != 0 { + t.Errorf("Message should have removed from retry queue on attempt %d. It wasn't.", i) + } + } else { + if !msg.MessageData.Nack { + t.Errorf("Message should have been Nacked on attempt %d. It wasn't.", i) + } + } } + var failures int if tc.ack { - if !msg.MessageData.Ack { - t.Error("Message should have been Acked. It wasn't.") - } + failures = tc.dispatchAttempts - 1 } else { - if !msg.MessageData.Nack { - t.Error("Message should have been Nacked. It wasn't.") - } + failures = tc.dispatchAttempts + } + // We validate the backoff times + expectedWaitTimes := computeBackoffs(failures, expBackoffBaseDelay, expBackoffMaxDelay) + if !reflect.DeepEqual(expectedWaitTimes, waiter.WaitTimes) { + t.Errorf("Expected backoff times %d, got %d", getDurationsInSeconds(expectedWaitTimes), getDurationsInSeconds(waiter.WaitTimes)) } }) } @@ -618,9 +656,48 @@ func (cc *cancelChecker) verify(t *testing.T, _ *controllertesting.TestCase) { } type fakeDispatcher struct { - err error + ack bool + err error + errCounter int } func (d *fakeDispatcher) DispatchMessage(_ *provisioners.Message, _, _ string, _ provisioners.DispatchDefaults) error { - return d.err + if !d.ack { + return d.err + } + // Decrease the errCounter to simulate a final ack after many nacks. + d.errCounter-- + if d.errCounter > 0 { + return d.err + } + return nil +} + +type fakeWaiter struct { + WaitTimes []time.Duration +} + +func (fw *fakeWaiter) sleep(duration time.Duration) { + fw.WaitTimes = append(fw.WaitTimes, duration) +} + +func computeBackoffs(failures int, baseDelay, maxDelay time.Duration) []time.Duration { + durations := make([]time.Duration, 0, failures) + for i := 0; i < failures; i++ { + backoff := float64(baseDelay) * math.Pow(2, float64(i)) + calculated := time.Duration(backoff) + if calculated > maxDelay { + calculated = maxDelay + } + durations = append(durations, calculated) + } + return durations +} + +func getDurationsInSeconds(durations []time.Duration) []int { + durationsInSeconds := make([]int, 0, len(durations)) + for _, duration := range durations { + durationsInSeconds = append(durationsInSeconds, int(duration.Seconds())) + } + return durationsInSeconds }