Skip to content

Commit

Permalink
Adding RateLimiter to GCP PubSub channel to backoff retries on dispat…
Browse files Browse the repository at this point in the history
…ch failures (#775)

* Adding an exponential backoff rate limiter for retrying nack-ed messages.

* Desugar-ing logs. Updating comments.

* Updating autogen code

* Updates after review comments. Adding UTs for retry mechanism.

* Starting loop from 0

* Adding comment back. Loop should start from 1.
  • Loading branch information
nachocano authored and knative-prow-robot committed Feb 6, 2019
1 parent b24b2a2 commit 9d5a378
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 30 deletions.
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 16 additions & 2 deletions contrib/gcppubsub/pkg/dispatcher/dispatcher/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ package dispatcher
import (
"context"
"sync"
"time"

"k8s.io/client-go/util/workqueue"

"sigs.k8s.io/controller-runtime/pkg/event"

pubsubutil "github.com/knative/eventing/contrib/gcppubsub/pkg/util"
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/provisioners"
pubsubutil "github.com/knative/eventing/contrib/gcppubsub/pkg/util"
"go.uber.org/zap"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand All @@ -36,11 +39,20 @@ const (
// controllerAgentName is the string used by this controller to identify itself when creating
// events.
controllerAgentName = "gcp-pubsub-channel-dispatcher"

// Exponential backoff constants used to limit the pace at which
// we nack messages in order to throttle the channel retries.
// The backoff is computed as follows: min(expBackoffMaxDelay, expBackoffBaseDelay* 2^#failures).
// expBackoffMaxDelay should be less than subscription.ReceiveSettings.MaxExtension,
// which is the maximum period for which the Subscription extends the ack deadline
// for each message. It is currently set to 10 minutes.
expBackoffBaseDelay = 1 * time.Second
expBackoffMaxDelay = 5 * time.Minute
)

// New returns a Controller that represents the dispatcher portion (messages from GCP PubSub are
// sent into the cluster) of the GCP PubSub dispatcher. We use a reconcile loop to watch all
// Channels and notice changes to them.
// Channels and notice changes to them. It uses an exponential backoff to throttle the retries.
func New(mgr manager.Manager, logger *zap.Logger, stopCh <-chan struct{}) (controller.Controller, error) {
// reconcileChan is used when the dispatcher itself needs to force reconciliation of a Channel.
reconcileChan := make(chan event.GenericEvent)
Expand All @@ -58,6 +70,8 @@ func New(mgr manager.Manager, logger *zap.Logger, stopCh <-chan struct{}) (contr

subscriptionsLock: sync.Mutex{},
subscriptions: map[channelName]map[subscriptionName]context.CancelFunc{},

rateLimiter: workqueue.NewItemExponentialFailureRateLimiter(expBackoffBaseDelay, expBackoffMaxDelay),
}

c, err := controller.New(controllerAgentName, mgr, controller.Options{
Expand Down
29 changes: 22 additions & 7 deletions contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ import (
"context"
"errors"
"sync"
"time"

"k8s.io/client-go/util/workqueue"

eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/provisioners"
util "github.com/knative/eventing/pkg/provisioners"
ccpcontroller "github.com/knative/eventing/contrib/gcppubsub/pkg/controller/clusterchannelprovisioner"
pubsubutil "github.com/knative/eventing/contrib/gcppubsub/pkg/util"
"github.com/knative/eventing/contrib/gcppubsub/pkg/util/logging"
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/provisioners"
util "github.com/knative/eventing/pkg/provisioners"
"go.uber.org/zap"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -63,6 +66,9 @@ type reconciler struct {
// function must be called when we no longer want that subscription to be active. Logically it
// is a map from Channel name to Subscription name to CancelFunc.
subscriptions map[channelName]map[subscriptionName]context.CancelFunc

// rateLimiter is used to limit the pace at which we nack a message when it could not be dispatched.
rateLimiter workqueue.RateLimiter
}

// Verify the struct implements reconcile.Reconciler
Expand Down Expand Up @@ -290,7 +296,7 @@ func (r *reconciler) receiveMessagesBlocking(ctxWithCancel context.Context, enqu
logging.FromContext(ctxWithCancel).Info("subscription.Receive start")
receiveErr := subscription.Receive(
ctxWithCancel,
receiveFunc(logging.FromContext(ctxWithCancel).Sugar(), sub, defaults, r.dispatcher))
receiveFunc(logging.FromContext(ctxWithCancel).Sugar(), sub, defaults, r.dispatcher, r.rateLimiter, time.Sleep))
// We want to minimize holding the lock. r.reconcileChan may block, so definitely do not do
// it under lock. But, to prevent a race condition, we must delete from r.subscriptions
// before using r.reconcileChan.
Expand All @@ -312,18 +318,27 @@ func (r *reconciler) receiveMessagesBlocking(ctxWithCancel context.Context, enqu
}
}

func receiveFunc(logger *zap.SugaredLogger, sub pubsubutil.GcpPubSubSubscriptionStatus, defaults provisioners.DispatchDefaults, dispatcher provisioners.Dispatcher) func(context.Context, pubsubutil.PubSubMessage) {
func receiveFunc(logger *zap.SugaredLogger, sub pubsubutil.GcpPubSubSubscriptionStatus, defaults provisioners.DispatchDefaults, dispatcher provisioners.Dispatcher, rateLimiter workqueue.RateLimiter, waitFunc func(duration time.Duration)) func(context.Context, pubsubutil.PubSubMessage) {
return func(ctx context.Context, msg pubsubutil.PubSubMessage) {
message := &provisioners.Message{
Headers: msg.Attributes(),
Payload: msg.Data(),
}
err := dispatcher.DispatchMessage(message, sub.SubscriberURI, sub.ReplyURI, defaults)
if err != nil {
logger.Error("Message dispatch failed", zap.Error(err), zap.String("pubSubMessageId", msg.ID()))
// Compute the wait time to nack this message.
// As soon as we nack a message, the GcpPubSub channel will attempt the retry.
// We use this as a mechanism to backoff retries.
sleepDuration := rateLimiter.When(msg.ID())
// Blocking, might need to run this on a separate go routine to improve throughput.
logger.Desugar().Error("Message dispatch failed, waiting to nack", zap.Error(err), zap.String("pubSubMessageId", msg.ID()), zap.Float64("backoffSec", sleepDuration.Seconds()))
waitFunc(sleepDuration)
msg.Nack()
} else {
logger.Debug("Message dispatch succeeded", zap.String("pubSubMessageId", msg.ID()))
// If there were any failures for this message, remove it from the rateLimiter backed map.
rateLimiter.Forget(msg.ID())
// Acknowledge the dispatch.
logger.Desugar().Debug("Message dispatch succeeded", zap.String("pubSubMessageId", msg.ID()))
msg.Ack()
}
}
Expand Down
119 changes: 98 additions & 21 deletions contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@ import (
"context"
"errors"
"fmt"
"math"
"reflect"
"sync"
"testing"
"time"

"k8s.io/client-go/util/workqueue"

"github.com/knative/eventing/contrib/gcppubsub/pkg/util"

"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -389,15 +393,28 @@ func TestReconcile(t *testing.T) {

func TestReceiveFunc(t *testing.T) {
testCases := map[string]struct {
ack bool
dispatcherErr error
ack bool
dispatcherErr error
dispatchAttempts int
}{
"dispatch error": {
ack: false,
dispatcherErr: errors.New(testErrorMessage),
"dispatch error with few dispatch attempts": {
ack: false,
dispatcherErr: errors.New(testErrorMessage),
dispatchAttempts: 3,
},
"dispatch error with capped backoff": {
ack: false,
dispatcherErr: errors.New(testErrorMessage),
dispatchAttempts: 20,
},
"dispatch success": {
ack: true,
"dispatch success on first attempt": {
ack: true,
dispatchAttempts: 1,
},
"dispatch success after many failed attempts": {
ack: true,
dispatcherErr: errors.New(testErrorMessage),
dispatchAttempts: 10,
},
}
for n, tc := range testCases {
Expand All @@ -410,21 +427,42 @@ func TestReceiveFunc(t *testing.T) {
defaults := provisioners.DispatchDefaults{
Namespace: cNamespace,
}
rf := receiveFunc(zap.NewNop().Sugar(), sub, defaults, &fakeDispatcher{err: tc.dispatcherErr})
msg := fakepubsub.Message{}
rf(context.TODO(), &msg)

if msg.MessageData.Ack && msg.MessageData.Nack {
t.Error("Message both Acked and Nacked")
dispatcher := &fakeDispatcher{ack: tc.ack, err: tc.dispatcherErr, errCounter: tc.dispatchAttempts}
rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(expBackoffBaseDelay, expBackoffMaxDelay)
waiter := fakeWaiter{make([]time.Duration, 0)}
rf := receiveFunc(zap.NewNop().Sugar(), sub, defaults, dispatcher, rateLimiter, waiter.sleep)

// Starting from 1 instead of 0 in order to check when we should expect to ack a message.
for i := 1; i <= tc.dispatchAttempts; i++ {
msg := fakepubsub.Message{}
rf(context.TODO(), &msg)

if msg.MessageData.Ack && msg.MessageData.Nack {
t.Errorf("Message both Acked and Nacked on attempt %d", i)
}
if tc.ack && i == tc.dispatchAttempts {
if !msg.MessageData.Ack {
t.Errorf("Message should have been Acked on attempt %d. It wasn't.", i)
}
if rateLimiter.NumRequeues(msg.ID()) != 0 {
t.Errorf("Message should have removed from retry queue on attempt %d. It wasn't.", i)
}
} else {
if !msg.MessageData.Nack {
t.Errorf("Message should have been Nacked on attempt %d. It wasn't.", i)
}
}
}
var failures int
if tc.ack {
if !msg.MessageData.Ack {
t.Error("Message should have been Acked. It wasn't.")
}
failures = tc.dispatchAttempts - 1
} else {
if !msg.MessageData.Nack {
t.Error("Message should have been Nacked. It wasn't.")
}
failures = tc.dispatchAttempts
}
// We validate the backoff times
expectedWaitTimes := computeBackoffs(failures, expBackoffBaseDelay, expBackoffMaxDelay)
if !reflect.DeepEqual(expectedWaitTimes, waiter.WaitTimes) {
t.Errorf("Expected backoff times %d, got %d", getDurationsInSeconds(expectedWaitTimes), getDurationsInSeconds(waiter.WaitTimes))
}
})
}
Expand Down Expand Up @@ -618,9 +656,48 @@ func (cc *cancelChecker) verify(t *testing.T, _ *controllertesting.TestCase) {
}

type fakeDispatcher struct {
err error
ack bool
err error
errCounter int
}

func (d *fakeDispatcher) DispatchMessage(_ *provisioners.Message, _, _ string, _ provisioners.DispatchDefaults) error {
return d.err
if !d.ack {
return d.err
}
// Decrease the errCounter to simulate a final ack after many nacks.
d.errCounter--
if d.errCounter > 0 {
return d.err
}
return nil
}

type fakeWaiter struct {
WaitTimes []time.Duration
}

func (fw *fakeWaiter) sleep(duration time.Duration) {
fw.WaitTimes = append(fw.WaitTimes, duration)
}

func computeBackoffs(failures int, baseDelay, maxDelay time.Duration) []time.Duration {
durations := make([]time.Duration, 0, failures)
for i := 0; i < failures; i++ {
backoff := float64(baseDelay) * math.Pow(2, float64(i))
calculated := time.Duration(backoff)
if calculated > maxDelay {
calculated = maxDelay
}
durations = append(durations, calculated)
}
return durations
}

func getDurationsInSeconds(durations []time.Duration) []int {
durationsInSeconds := make([]int, 0, len(durations))
for _, duration := range durations {
durationsInSeconds = append(durationsInSeconds, int(duration.Seconds()))
}
return durationsInSeconds
}

0 comments on commit 9d5a378

Please sign in to comment.